Azure datafactory v2 Execute Pipeline с For Each

Я пытаюсь использовать «Выполнить конвейер» для вызова канала с действием ForEach. Я получаю сообщение об ошибке.

  1. Json для канала Execute:
[
    {
        "name": "pipeline3",
        "properties": {
            "activities": [
                {
                    "name": "Test_invoke1",
                    "type": "ExecutePipeline",
                    "dependsOn": [],
                    "userProperties": [],
                    "typeProperties": {
                        "pipeline": {
                            "referenceName": "MAIN_SA_copy1",
                            "type": "PipelineReference"
                        },
                        "waitOnCompletion": true
                    }
                }
            ],
            "annotations": []
        }
    }
]
  1. Джейсон для вызова канала для каждого действия:
[
    {
        "name": "MAIN_SA_copy1",
        "properties": {
            "activities": [
                {
                    "name": "Collect_SA_Data",
                    "type": "ForEach",
                    "dependsOn": [],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@pipeline().parameters.TableNames",
                            "type": "Expression"
                        },
                        "batchCount": 15,
                        "activities": [
                            {
                                "name": "Sink_SAdata_toDL",
                                "type": "Copy",
                                "dependsOn": [],
                                "policy": {
                                    "timeout": "7.00:00:00",
                                    "retry": 0,
                                    "retryIntervalInSeconds": 30,
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [
                                    {
                                        "name": "Destination",
                                        "value": "@{pipeline().parameters.DLFilePath}/@{item()}"
                                    }
                                ],
                                "typeProperties": {
                                    "source": {
                                        "type": "SqlServerSource",
                                        "sqlReaderQuery": {
                                            "value": "@concat('SELECT * FROM ',item())",
                                            "type": "Expression"
                                        }
                                    },
                                    "sink": {
                                        "type": "AzureBlobFSSink"
                                    },
                                    "enableStaging": false,
                                    "parallelCopies": 1,
                                    "dataIntegrationUnits": 4
                                },
                                "inputs": [
                                    {
                                        "referenceName": "SrcDS_StructuringAnalytics",
                                        "type": "DatasetReference"
                                    }
                                ],
                                "outputs": [
                                    {
                                        "referenceName": "ADLS",
                                        "type": "DatasetReference",
                                        "parameters": {
                                            "FilePath": "@pipeline().parameters.DLFilePath",
                                            "FileName": {
                                                "value": "@concat(item(),'.orc')",
                                                "type": "Expression"
                                            }
                                        }
                                    }
                                ]
                            }
                        ]
                    }
                }
            ],
            "parameters": {
                "DLFilePath": {
                    "type": "string",
                    "defaultValue": "extracts/StructuringAnalytics"
                },
                "TableNames": {
                    "type": "array",
                    "defaultValue": [
                        "fom.FOMLineItem_manual"
                    ]
                }
            },
            "variables": {
                "QryTableColumn": {
                    "type": "String"
                },
                "QryTable": {
                    "type": "String"
                }
            },
            "folder": {
                "name": "StructuringAnalytics"
            },
            "annotations": []
        },
        "type": "Microsoft.DataFactory/factories/pipelines"
    }
]

Я получаю сообщение об ошибке:

[
    {
        "errorCode": "BadRequest",
        "message": "Operation on target Collect_SA_Data failed: The execution of template action 'Collect_SA_Data' failed: the result of the evaluation of 'foreach' expression '@pipeline().parameters.TableNames' is of type 'String'. The result must be a valid array.",
        "failureType": "UserError",
        "target": "Test_invoke1",
        "details": ""
    }
]

Вход:

"pipeline": {
    "referenceName": "MAIN_SA_copy1",
    "type": "PipelineReference"
},
"waitOnCompletion": true,
"parameters": {
    "DLFilePath": "extracts/StructuringAnalytics",
    "TableNames": "[\"fom.FOMLineItem_manual\"]"
}

person twinkle    schedule 08.04.2020    source источник
comment
Я получил еще одну ошибку после прочтения первого действия в цикле foreach, errorCode: 2200, message: Ошибка произошла на стороне источника. 'Type = System.Data.SqlClient.SqlException, Message = Invalid object name' \ lib_service \, \ lib_transaction_type \, \ lib_utility \, \ lib_utility_state \, \ account_service \, \ payment_term \ '. , Source = .Net SqlClient Data Provider, SqlErrorNumber = 208, Class = 16, ErrorCode = -2146232060, State = 1, Errors = [{Class = 16, Number = 208, State = 1, Message = Invalid object name '\ lib_service \, \ тип_транзакции_либ \, \ служебная_либ \   -  person twinkle    schedule 09.04.2020
comment
, \ lib_utility_state \, \ account_service \, \ payment_term \ '.,},],', failureType: UserError, target: Sink_Atlantic_toDL, детали: []}   -  person twinkle    schedule 09.04.2020
comment
escape-символ '\' добавляется автоматически, как мне решить эту проблему?   -  person twinkle    schedule 09.04.2020


Ответы (2)


Попробуйте обновить динамическое выражение ForEach Items, как показано ниже:

{
    "value": "@array(pipeline().parameters.TableNames)",
    "type": "Expression"
}

введите здесь описание изображения

Надеюсь это поможет.

person KranthiPakala-MSFT    schedule 08.04.2020
comment
Спасибо, это сработало. Мне интересно, почему мои другие каналы (имеющие несколько таблиц или массив) с использованием pipeline (). Parameters.TableNames работали без указания массива до сегодняшнего дня? - - person twinkle; 09.04.2020
comment
Пожалуйста, посмотрите на проблему с escape-символами, с которой я столкнулся сейчас. выбор * из таблицы не работает - person twinkle; 09.04.2020
comment
Решение @array не сработало. Фактически он создает массив всего, что содержится в item (), а затем оператор select объединяет все таблицы в одном операторе. Цель состоит в том, чтобы в каждом цикле было одно имя таблицы. - person twinkle; 09.04.2020
comment
Функция @array не должна создавать никаких проблем. Для каждой итерации действия ForEach у вас будет один оператор выбора. Также, пожалуйста, включите Последовательную правильность активности Foreach. У меня есть конвейер с такими же настройками и без проблем. Не могли бы вы опубликовать свою проблему на форуме ADF MSDN, чтобы я мог предоставить вам больше снимков экрана и образец кода конвейера для тестирования: social.msdn.microsoft.com/Forums/en-US/ - Спасибо. - person KranthiPakala-MSFT; 10.04.2020
comment
Я включил Sequential, это помогло, спасибо. Но escape-символ не распознается ошибкой SQL: код: 11000, сообщение: сбой произошел на стороне источника. 'Type = System.Data.SqlClient.SqlException, Message = Invalid object name' \ fom.FOMLineItem_manual \ '., Source = .Net SqlClient Data Provider, SqlErrorNumber = 208, Class = 16, ErrorCode = -2146232060, State = 1, Ошибки = [{Class = 16, Number = 208, State = 1, Message = Invalid object name '\ fom.FOMLineItem_manual \'.,},], - person twinkle; 11.04.2020
comment
Вы можете ясно видеть, что проблема заключается в escape-символе, и я ввел имена таблиц, поэтому опечатка исключена. Входной {источник: {тип: SqlServerSource, sqlReaderQuery: SELECT * FROM [\ fom.FOMLineItem_manual \]}, приемник: {тип: AzureBlobFSSink - person twinkle; 11.04.2020
comment
social.msdn.microsoft.com/Forums/en-US/ - person twinkle; 11.04.2020

Я предполагаю, что вы использовали пользовательский интерфейс для установки конвейера и его параметров, и я предполагаю, что вы ожидали поместить параметр массива вызываемого конвейера, как и везде, например: (Это все мое предположение, потому что я сделал то же самое с тот же результат)

Неправильно

Уловка состоит в том, чтобы определить массив в коде ([table1, table2]): Код

Ввод в пользовательском интерфейсе будет выглядеть так:

Вправо

Теперь это работает!
Похоже, что в противном случае Datafactory обрабатывает весь массив как один элемент некоторого массива. Следовательно, решение с функцией array () иногда срабатывает.
Похоже на ошибку, определение ввода параметра массива ..

(Пришлось отредактировать ответ, сначала я подумал, что будет достаточно опустить двоеточия во вводе пользовательского интерфейса)

person Mike S    schedule 13.01.2021