ключ сообщения kafka в качестве ключевого поля / столбца в HDFS

Поэтому я использую debezium key.field.name в моем исходном коннекторе MySQL, чтобы добавить поле в свою тему.

Сообщение выглядит ниже после перехода по теме.

{"id":20,"__PKtableowner":"reviewDB.review.search_user_02"}:{"before":null,"after":{"search_user_all_shards.Value":{"id":20,"name":{"string":"oliver"},"email":{"string":"[email protected]"},"department":{"string":"sales"},"modified":"2021-01-06T09:27:02Z"}},"source":{"version":"1.0.2.Final","connector":"mysql","name":"reviewDB","ts_ms":1609925222000,"snapshot":{"string":"false"},"db":"review","table":{"string":"search_user_02"},"server_id":1,"gtid":null,"file":"binlog.000002","pos":14630,"row":0,"thread":{"long":13},"query":null},"op":"c","ts_ms":{"long":1609925222956}}

Где, ключ

{"id":20,"__PKtableowner":"reviewDB.review.search_user_02"}

и ценность

{"before":null,"after":{"search_user_all_shards.Value":{"id":20,"name":{"string":"oliver"},"email":{"string":"[email protected]"},"department":{"string":"sales"},"modified":"2021-01-06T09:27:02Z"}},"source":{"version":"1.0.2.Final","connector":"mysql","name":"reviewDB","ts_ms":1609925222000,"snapshot":{"string":"false"},"db":"review","table":{"string":"search_user_02"},"server_id":1,"gtid":null,"file":"binlog.000002","pos":14630,"row":0,"thread":{"long":13},"query":null},"op":"c","ts_ms":{"long":1609925222956}}

Как часть моего приемника hdfsSinkConnector мне нужно получить ключ сообщения "__PKtableowner":"reviewDB.review.search_user_02 как часть столбца или поля в hdfs или кусте.

Единственный SMT, который я нашел, - это ValueToKey, но, похоже, он не подходит для моего варианта использования, потому что он извлекается из значения, а не из ключа сообщения. Я пробовал (InsertField, CreateKey, ExtractField и т.д.) Почти все преобразования, которые вы можете найти здесь, но не повезло. https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html

Я ищу тип SMT KeyToValue или есть другой обходной путь.

Ниже приведены мои конфигурации источника и приемника. Источник:

{
  "name": "REVIEW__MYSQL__search_user__source",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.history.kafka.topic": "review.search_user_logs",
    "database.history.consumer.max.block.ms": "3000",
    "include.schema.changes": "false",
    "database.history.consumer.session.timeout.ms": "30000",
    "database.history.kafka.consumer.group": "compose-connect-group",
    "snapshot.new.tables": "parallel",
    "database.history.kafka.sasl.mechanism": "GSSAPI",
    "database.whitelist": "review",
    "database.history.producer.sasl.mechanism": "GSSAPI",
    "database.user": "root",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "time.precision.mode": "connect",
    "database.server.name": "reviewDB",
    "database.port": "3306",
    "database.history.consumer.heartbeat.interval.ms": "1000",
    "min.row.count.to.stream.results": "0",
    "database.hostname": "mysql",
    "database.password": "example",
    "database.history.consumer.sasl.mechanism": "GSSAPI",
    "snapshot.mode": "when_needed",
    "table.whitelist": "review.search_user_(.*)",
    "transforms": "Reroute",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "reviewDB.review.search_user_(.*)",
    "transforms.Reroute.topic.replacement": "search_user_all_shards",
    "transforms.Reroute.key.field.name": "__PKtableowner"
  }
}

Раковина

{ "name": "REVIEW__MYSQL__search_user__sink",
  "config":
  {
      "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
      "topics.dir": "/_incr_files",
      "flush.size": 1,
      "tasks.max": 1,
      "timezone": "UTC",
      "rotate.interval.ms": 5000,
      "locale": "en",
      "hadoop.home": "/etc/hadoop",
      "logs.dir": "/_incr_files_wal",
      "hive.integration": "false",
      "partition.duration.ms": "20000",
      "hadoop.conf.dir": "/etc/hadoop",
      "topics": "search_user_all_shards",
      "hdfs.url": "hdfs://namenode:9000",
      "transforms": "unwrap,insertTopicOffset,insertTimeStamp",
      "transforms.insertTimeStamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.unwrap.drop.tombstones": "true",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.delete.handling.mode": "rewrite",
      "transforms.insertTimeStamp.timestamp.field": "spdb_landing_timestamp",
      "transforms.insertTopicOffset.offset.field": "spdb_topic_offset",
      "transforms.insertTopicOffset.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "schema.compatibility": "NONE",
      "path.format": "'partition'=YYYY-MM-dd-HH",
      "partitioner.class": "io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner"
  }
}

person Oliver Brylle Majaba    schedule 06.01.2021    source источник


Ответы (2)


Поскольку ваш ключ является структурой, лучший способ, о котором я знаю, - это SMT, который эффективно обертывает ключ и значение в новое вложенное значение.

https://github.com/jcustenborder/kafka-connect-transform-archive

person OneCricketeer    schedule 06.01.2021

В итоге я создал свой собственный. :)

https://github.com/Verdado/kafka-connect-custom-transforms

person Oliver Brylle Majaba    schedule 29.01.2021