Поэтому я использую debezium key.field.name
в моем исходном коннекторе MySQL, чтобы добавить поле в свою тему.
Сообщение выглядит ниже после перехода по теме.
{"id":20,"__PKtableowner":"reviewDB.review.search_user_02"}:{"before":null,"after":{"search_user_all_shards.Value":{"id":20,"name":{"string":"oliver"},"email":{"string":"[email protected]"},"department":{"string":"sales"},"modified":"2021-01-06T09:27:02Z"}},"source":{"version":"1.0.2.Final","connector":"mysql","name":"reviewDB","ts_ms":1609925222000,"snapshot":{"string":"false"},"db":"review","table":{"string":"search_user_02"},"server_id":1,"gtid":null,"file":"binlog.000002","pos":14630,"row":0,"thread":{"long":13},"query":null},"op":"c","ts_ms":{"long":1609925222956}}
Где, ключ
{"id":20,"__PKtableowner":"reviewDB.review.search_user_02"}
и ценность
{"before":null,"after":{"search_user_all_shards.Value":{"id":20,"name":{"string":"oliver"},"email":{"string":"[email protected]"},"department":{"string":"sales"},"modified":"2021-01-06T09:27:02Z"}},"source":{"version":"1.0.2.Final","connector":"mysql","name":"reviewDB","ts_ms":1609925222000,"snapshot":{"string":"false"},"db":"review","table":{"string":"search_user_02"},"server_id":1,"gtid":null,"file":"binlog.000002","pos":14630,"row":0,"thread":{"long":13},"query":null},"op":"c","ts_ms":{"long":1609925222956}}
Как часть моего приемника hdfsSinkConnector мне нужно получить ключ сообщения "__PKtableowner":"reviewDB.review.search_user_02
как часть столбца или поля в hdfs или кусте.
Единственный SMT, который я нашел, - это ValueToKey, но, похоже, он не подходит для моего варианта использования, потому что он извлекается из значения, а не из ключа сообщения. Я пробовал (InsertField, CreateKey, ExtractField и т.д.) Почти все преобразования, которые вы можете найти здесь, но не повезло. https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html
Я ищу тип SMT KeyToValue или есть другой обходной путь.
Ниже приведены мои конфигурации источника и приемника. Источник:
{
"name": "REVIEW__MYSQL__search_user__source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.history.kafka.topic": "review.search_user_logs",
"database.history.consumer.max.block.ms": "3000",
"include.schema.changes": "false",
"database.history.consumer.session.timeout.ms": "30000",
"database.history.kafka.consumer.group": "compose-connect-group",
"snapshot.new.tables": "parallel",
"database.history.kafka.sasl.mechanism": "GSSAPI",
"database.whitelist": "review",
"database.history.producer.sasl.mechanism": "GSSAPI",
"database.user": "root",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"time.precision.mode": "connect",
"database.server.name": "reviewDB",
"database.port": "3306",
"database.history.consumer.heartbeat.interval.ms": "1000",
"min.row.count.to.stream.results": "0",
"database.hostname": "mysql",
"database.password": "example",
"database.history.consumer.sasl.mechanism": "GSSAPI",
"snapshot.mode": "when_needed",
"table.whitelist": "review.search_user_(.*)",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "reviewDB.review.search_user_(.*)",
"transforms.Reroute.topic.replacement": "search_user_all_shards",
"transforms.Reroute.key.field.name": "__PKtableowner"
}
}
Раковина
{ "name": "REVIEW__MYSQL__search_user__sink",
"config":
{
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"topics.dir": "/_incr_files",
"flush.size": 1,
"tasks.max": 1,
"timezone": "UTC",
"rotate.interval.ms": 5000,
"locale": "en",
"hadoop.home": "/etc/hadoop",
"logs.dir": "/_incr_files_wal",
"hive.integration": "false",
"partition.duration.ms": "20000",
"hadoop.conf.dir": "/etc/hadoop",
"topics": "search_user_all_shards",
"hdfs.url": "hdfs://namenode:9000",
"transforms": "unwrap,insertTopicOffset,insertTimeStamp",
"transforms.insertTimeStamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.insertTimeStamp.timestamp.field": "spdb_landing_timestamp",
"transforms.insertTopicOffset.offset.field": "spdb_topic_offset",
"transforms.insertTopicOffset.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"schema.compatibility": "NONE",
"path.format": "'partition'=YYYY-MM-dd-HH",
"partitioner.class": "io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner"
}
}