Не удалось создать потребителя kafka с помощью Apache Drill

Я использую в своем приложении драйвер JDBC Apache Drill (1.14), который потребляет данные из Kafka. Приложение какое-то время работает нормально, а после нескольких итераций не запускается из-за следующей Too many files open проблемы. Я убедился, что в моем коде нет утечек дескрипторов файлов, но все еще не уверен, почему возникает эта проблема?

Похоже, проблема возникает из-за использования библиотек детализации Apache при создании потребителя Kafka. Может ли кто-нибудь помочь мне решить эту проблему?

Проблема исчезает, когда я перезапускаю буровую установку Apache, но очень скоро это происходит снова. Я проверил количество файловых дескрипторов на своей машине unix, используя ulimit -a | wc -l & lsof -a -p <PID> | wc -l до и после перезапуска процесса детализации, и похоже, что процесс детализации значительно занимает много файловых дескрипторов. Я попытался увеличить количество файловых дескрипторов в системе, но мне все равно не повезло.

Я следовал документации подключаемого модуля хранилища Apache Drill при настройке подключаемого модуля Kafka в Apache Drill по адресу https://drill.apache.org/docs/kafka-storage-plugin/

Любая помощь по этому вопросу приветствуется. Спасибо.

URL-адрес JDBC: jdbc:drill:drillbit=localhost:31010;schema=kafka

ПРИМЕЧАНИЕ. Я опускаю фильтры в моем запросе SELECT * FROM myKafkaTopic WHERE kafkaMsgTimestamp > 1560210931626

org.apache.drill.common.exceptions.UserException: DATA_READ ERROR: Failed to fetch start/end offsets of the topic  myKafkaTopic

Failed to construct kafka consumer

[Error Id: 73f896a7-09d4-425b-8cd5-f269c3a6e69a ]
    at org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:633) ~[drill-common-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.store.kafka.KafkaGroupScan.init(KafkaGroupScan.java:198) [drill-storage-kafka-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.store.kafka.KafkaGroupScan.<init>(KafkaGroupScan.java:98) [drill-storage-kafka-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.store.kafka.KafkaStoragePlugin.getPhysicalScan(KafkaStoragePlugin.java:83) [drill-storage-kafka-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.store.AbstractStoragePlugin.getPhysicalScan(AbstractStoragePlugin.java:111) [drill-java-exec-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.planner.logical.DrillTable.getGroupScan(DrillTable.java:99) [drill-java-exec-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.planner.logical.DrillScanRel.<init>(DrillScanRel.java:89) [drill-java-exec-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.planner.logical.DrillScanRel.<init>(DrillScanRel.java:69) [drill-java-exec-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.planner.logical.DrillScanRel.<init>(DrillScanRel.java:62) [drill-java-exec-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.planner.logical.DrillScanRule.onMatch(DrillScanRule.java:38) [drill-java-exec-1.14.0.jar:1.14.0]
    at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) [calcite-core-1.16.0-drill-r6.jar:1.16.0-drill-r6]
    at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:652) [calcite-core-1.16.0-drill-r6.jar:1.16.0-drill-r6]
    at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) [calcite-core-1.16.0-drill-r6.jar:1.16.0-drill-r6]
    at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.transform(DefaultSqlHandler.java:429) [drill-java-exec-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.transform(DefaultSqlHandler.java:369) [drill-java-exec-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.convertToRawDrel(DefaultSqlHandler.java:255) [drill-java-exec-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.convertToDrel(DefaultSqlHandler.java:318) [drill-java-exec-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.getPlan(DefaultSqlHandler.java:180) [drill-java-exec-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.planner.sql.DrillSqlWorker.getQueryPlan(DrillSqlWorker.java:145) [drill-java-exec-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.planner.sql.DrillSqlWorker.getPlan(DrillSqlWorker.java:83) [drill-java-exec-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.work.foreman.Foreman.runSQL(Foreman.java:567) [drill-java-exec-1.14.0.jar:1.14.0]
    at org.apache.drill.exec.work.foreman.Foreman.run(Foreman.java:266) [drill-java-exec-1.14.0.jar:1.14.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_181]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_181]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:765) ~[kafka-clients-0.11.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:633) ~[kafka-clients-0.11.0.1.jar:na]
    at org.apache.drill.exec.store.kafka.KafkaGroupScan.init(KafkaGroupScan.java:168) [drill-storage-kafka-1.14.0.jar:1.14.0]
    ... 23 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
    at org.apache.kafka.common.network.Selector.<init>(Selector.java:129) ~[kafka-clients-0.11.0.1.jar:na]
    at org.apache.kafka.common.network.Selector.<init>(Selector.java:156) ~[kafka-clients-0.11.0.1.jar:na]
    at org.apache.kafka.common.network.Selector.<init>(Selector.java:160) ~[kafka-clients-0.11.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701) ~[kafka-clients-0.11.0.1.jar:na]
    ... 25 common frames omitted
Caused by: java.io.IOException: Too many open files
    at sun.nio.ch.EPollArrayWrapper.epollCreate(Native Method) ~[na:1.8.0_181]
    at sun.nio.ch.EPollArrayWrapper.<init>(EPollArrayWrapper.java:130) ~[na:1.8.0_181]
    at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:69) ~[na:1.8.0_181]
    at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36) ~[na:1.8.0_181]
    at java.nio.channels.Selector.open(Selector.java:227) ~[na:1.8.0_181]
    at org.apache.kafka.common.network.Selector.<init>(Selector.java:127) ~[kafka-clients-0.11.0.1.jar:na]```

person user2052801    schedule 11.06.2019    source источник


Ответы (1)


Это связано с тем, что плагин хранилища kafka никогда явно не закрывает свои соединения с kafka. Он полагается на тайм-аут простоя на сервере (connections.max.idle.ms), который по умолчанию равен 10 минутам. Я видел точно такую ​​же проблему при выполнении запросов в течение нескольких минут - и мы уменьшили тайм-аут по умолчанию, чтобы убедиться, что это так.

person user12113901    schedule 24.09.2019