Я пытаюсь прослушать изменения в базе данных Aurora с помощью Amazon DMS и отправить изменения в поток Kinesis, где функция Lambda, прослушивающая поток, будет выполнять обработку.
Я ссылался на приведенную ниже документацию, чтобы написать свои правила.
https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html https://aws.amazon.com/blogs/database/use-the-aws-database-migration-service-to-stream-change-data-to-amazon-kinesis-data-streams/
Вот мое сопоставление правил для задачи непрерывной репликации DMS (CDC).
{
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "1",
"object-locator": {
"schema-name": "my_db",
"table-name": "my_table"
},
"rule-action": "include"
},
{
"rule-type": "object-mapping",
"rule-id": "2",
"rule-name": "2",
"rule-action": "map-record-to-record",
"object-locator": {
"schema-name": "my_db",
"table-name": "my_table"
},
"mapping-parameters": {
"partition-key": {
"attribute-name": "my_id",
"value": "${my_id}"
}
}
}
]
}
Однако, когда я вношу изменение в исходную таблицу, задача DMS завершается с ошибкой (ями) ниже.
2019-02-05T10:36:55 [TARGET_APPLY ]E: Error allocating memory for Json document [1020100] (field_mapping_utils.c:382)
2019-02-05T10:36:55 [TARGET_APPLY ]E: Failed while looking for object mapping for table my_table [1020100] (kinesis_utils.c:258)
2019-02-05T10:36:55 [TARGET_APPLY ]E: Error executing data handler [1020100] (streamcomponent.c:1778)
2019-02-05T10:36:55 [TASK_MANAGER ]E: Stream component failed at subtask 0, component st_0_some_random_id [1020100] (subtask.c:1366)
2019-02-05T10:36:55 [TASK_MANAGER ]E: Task error notification received from subtask 0, thread 1 [1020100] (replicationtask.c:2661)
2019-02-05T10:36:55 [TASK_MANAGER ]W: Task 'some_random_task_id' encountered a fatal error (repository.c:4704)
Когда я пробую без правила object-mapping
, Kinesis получит запись с "partitionKey": "my_db.my_table"
с правильными значениями, что является поведением по умолчанию для приемника от таблицы к таблице, но нам нужен приемник от таблицы к кинезису.
Почему меня так сильно волнует partition-key
? Потому что мне нужно использовать все осколки в моем потоке Kinesis.
Кто-нибудь может мне помочь?
ОБНОВЛЕНИЕ:
Когда я добавляю "partition-key-type": "schema-table"
к "mapping-parameters"
, это не сработает, задача не завершится неудачно, но игнорирует атрибут "partition-key"
и будет иметь "partitionKey": "my_db.my_table"
, как и раньше.
Неопределенные моменты:
- При погружении от стола к столу он использует
"partition-key-type": "schema-table"
, но никогда не упоминает, какое значение имеет преобразование от стола к кинезису. - Примеры и пояснения в документации очень ограничены и даже ошибочны (т.е. некоторые правила JSON недействительны).