Соединитель приемника MongoDb :: JsonParseException: средство чтения JSON ожидало значения, но обнаружило 'dist'

Я пытаюсь создать поток данных, в котором издатель mosquitto будет отправлять данные брокеру kafka через MQTT Source Connector, а брокер kafka будет перенаправлять входные данные в базу данных MongoDb через MonoDb Sink Connector. Коннектор источника MQTT и коннектор источника MongoDb работают правильно по отдельности. Когда я пытаюсь объединить оба разъема, это дает мне исключение. Я потратил кучу часов на поиски решения, но у меня ничего не вышло. Мне нужна помощь в налаживании этого потока данных.

mqttPublisher.py

data = {
         "time": str(datetime.datetime.now().time()),
          "val": 0
        }

client.publish("dist", json.dumps(data), qos=2)

kafkaConsumer.py

consumer = KafkaConsumer('mqtt.',
                       bootstrap_servers='localhost:9092')
for msg in consumer:
    print(msg)

source-anonymous.properties

name=MQTT-source
tasks.max=1
connector.class=io.confluent.connect.mqtt.MqttSourceConnector
mqtt.server.uri=tcp://127.0.0.1:1883
mqtt.topics=dist
kafka.topics=mqtt.

MongoDbSinkConnector.properties

name=MyMongoDbSinkConnector
topics=mqtt.
tasks.max=1

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connector.class=at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector


mongodb.connection.uri=mongodb://localhost:27017/sample?w=1&journal=true
mongodb.collection=data
mongodb.max.num.retries=3
mongodb.retries.defer.timeout=5000

mongodb.document.id.strategy =at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy


mongodb.post.processor.chain =at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder

mongodb.delete.on.null.values=false


mongodb.writemodel.strategy =at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy
mongodb.max.batch.size=0

Вывод (kafkaConsumer.py)

ConsumerRecord(topic='mqtt.', partition=0, offset=0, timestamp=1545759406558, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x08dist', value=b'\x00\x00\x00\x00\x02J{"time": "23:06:46.548284", "val": 0}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], checksum=None, serialized_key_size=10, serialized_value_size=43, serialized_header_size=62)

Команда CLI для запуска коннекторов:

bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties share/confluent-hub-components/confluentinc-kafka-connect-mqtt/etc/source-anonymous.properties share/confluent-hub-components/hpgrahsl-kafka-connect-mongodb/etc/MongoDbSinkConnector.properties

Журналы

[2018-12-25 23:07:52,280] INFO Created connector MQTT-source (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-12-25 23:07:52,346] INFO Connecting to Mqtt Server. (io.confluent.connect.mqtt.MqttSourceTask:67)
[2018-12-25 23:07:52,371] INFO Subscribing to dist with QOS of 0 (io.confluent.connect.mqtt.MqttSourceTask:76)
[2018-12-25 23:07:52,380] INFO WorkerSourceTask{id=MQTT-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:199)
.
.
.
.
[2018-12-25 23:07:52,615] INFO Creating connector MyMongoDbSinkConnector of type at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector (org.apache.kafka.connect.runtime.Worker:235)
[2018-12-25 23:07:52,616] INFO Instantiated connector MyMongoDbSinkConnector with version 1.2.0 of type class at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector (org.apache.kafka.connect.runtime.Worker:238)
[2018-12-25 23:07:52,616] INFO Finished creating connector MyMongoDbSinkConnector (org.apache.kafka.connect.runtime.Worker:257)
.
.
.
[2018-12-25 23:07:52,706] INFO Created connector MyMongoDbSinkConnector (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-12-25 23:07:52,708] INFO starting MongoDB sink task (at.grahsl.kafka.connect.mongodb.MongoDbSinkTask:78)
.
.
[2018-12-25 23:07:52,943] INFO Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} (org.mongodb.driver.cluster:71)
[2018-12-25 23:07:53,013] INFO WorkerSinkTask{id=MyMongoDbSinkConnector-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:302)
[2018-12-25 23:07:53,037] INFO Cluster ID: VX_AdknXRGGfEWsSdcSpSw (org.apache.kafka.clients.Metadata:285)
[2018-12-25 23:07:53,057] INFO Opened connection [connectionId{localValue:1, serverValue:14}] to localhost:27017 (org.mongodb.driver.connection:71)
[2018-12-25 23:07:53,063] INFO Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 4]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=3937492} (org.mongodb.driver.cluster:71)
[2018-12-25 23:07:53,869] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-12-25 23:07:53,871] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:472)
[2018-12-25 23:07:53,871] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
[2018-12-25 23:07:53,976] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:473)
[2018-12-25 23:07:53,980] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Setting newly assigned partitions [mqtt.-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
[2018-12-25 23:07:53,991] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Resetting offset for partition mqtt.-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:583)
[2018-12-25 23:07:54,189] ERROR WorkerSinkTask{id=MyMongoDbSinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. 
(org.apache.kafka.connect.runtime.WorkerSinkTask:584)
    org.bson.json.JsonParseException: JSON reader was expecting a value but found 'dist'.
    at org.bson.json.JsonReader.readBsonType(JsonReader.java:251)
    at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680)
    at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
    at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
    at org.bson.BsonDocument.parse(BsonDocument.java:62)
    at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:32)
    at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)
    at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$3(MongoDbSinkTask.java:186)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:185)
    at  at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:122)
    .
    .

Коннектор приемника MongoDb: https://github.com/hpgrahsl/kafka-connect-mongodb

Демонстрация коннектора исходного кода MQTT Kafka-Connect: https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example/blob/master/live-demo-kafka-connect-iot-mqtt-connector.adoc

ОБНОВЛЕНИЕ:

Ниже приведен вывод, который я получаю на kafka-avro-console-consumer:

"dist"  "J{\"time\": \"23:06:46.548284\", \"val\": 0}"

Используемая команда cli:

bin/kafka-avro-console-consumer --topic mqtt. --bootstrap-server localhost:9092 --property print.key=true

Схема, зарегистрированная в реестре схем:

{"subject":"mqtt.-key","version":1,"id":1,"schema":"\"string\""}
{"subject":"mqtt.-value","version":1,"id":2,"schema":"\"bytes\""}

команда, используемая для получения схем:

curl --silent -X GET http://localhost:8081/subjects/mqtt.-[key|value]/versions/latest

comment
Предлагаю попробовать для отладки коннектор приемника консоли. Но проблема, похоже, связана с коннектором Mongo. Например, вы должны использовать BsonOidStrategy?   -  person OneCricketeer    schedule 26.12.2018
comment
Коннектор приемника Mongo работает правильно, когда я отправляю данные от производителя kafka, но здесь с mosquitto_publisher он дает исключения. Я использую BsonOidStrategy по умолчанию, так как мы не можем оставить его пустым.   -  person Shubham    schedule 26.12.2018
comment
А рабочая версия использует Avro для ключей и значений? Я хотел сказать, что трассировка стека начинается с BsonDocument.parse, поэтому некоторые данные Bson генерируются неправильно.   -  person OneCricketeer    schedule 26.12.2018
comment
да, я использую конвертеры "ключ-значение" Avro   -  person Shubham    schedule 27.12.2018
comment
"J{\"time\": \"23:06:46.548284\", \"val\": 0}" - это необработанная строка, и в реестре схемы указано, что это значение просто байты, а не запись Avro ... Итак, я не думаю, что AvroConverter будет работать для этой записи   -  person OneCricketeer    schedule 27.12.2018
comment
В выводе kafkaConsumer.py к ключу и значению добавлено 5 байтов, что соответствует формат передачи, используемый для сообщений avro. Что вы посоветуете мне сделать, чтобы эта работа заработала? Это происходит из-за неправильных схем, конвертеров или чего-то еще?   -  person Shubham    schedule 28.12.2018
comment
Что ж, вам следует использовать AvroConsumer, но я говорю о потребителе консоли avro, а не о вашем коде Python или его выводе ... Все, что я говорю, это то, что значение явно просто байты или строка, а не вложенная запись значений ключей для времени и val . Если бы это было так, то так бы сказал реестр схемы, а не только байты.   -  person OneCricketeer    schedule 28.12.2018
comment
Я изменил используемые конвертеры и использовал SMT, который решил мою проблему.   -  person Shubham    schedule 07.01.2019
comment
Не стесняйтесь добавить свой полный ответ ниже   -  person OneCricketeer    schedule 08.01.2019
comment
Пожалуйста, не используйте Commnet. Используйте поле для ответа ниже.   -  person OneCricketeer    schedule 10.01.2019


Ответы (1)


Итак, это сработало для меня:

  1. Добавлен этот SMT в source-anonymous.properties
    transforms=requiredKeyExample transforms.requiredKeyExample.type=io.confluent.connect.transforms.ExtractTopic$Key

  2. Преобразователи "ключ-значение" в connect-standalone.properties изменены как key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.converters.ByteArrayConverter.

  3. Изменены преобразователи "ключ-значение" в MongoDbSinkConnector.properties как: key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter

person Shubham    schedule 11.01.2019