Конвейер потока данных, считывающий csv из GCS и записывающий в BigBuery с вызовами Vision и NL API

Я хочу написать программу потока данных (реализация Java и maven). Вот шаги, которые я хочу выполнить:

  1. Поток данных должен считывать CSV-файл из облачного хранилища Google. CSV-файл имеет следующий формат:

    Название продукта, URL-адрес изображения, категория, описание1, описание2 Sakura 30062, набор из 6 чернильных ручек Pigma Micron, https://images-na.ssl-images-amazon.com/images/I/71CkvpG3FEL.SY355.jpg< /a>, Искусство, включает 1 шт. размера: #005 (0,20 мм)

    Миниатюрный термоклеевой пистолет CCbetter с клеевыми стержнями 25 шт. Комплект высокотемпературного плавящегося клеевого пистолета Гибкий триггер для небольших поделок, герметизации и быстрого ремонта (20 Вт, синий), https://images-na.ssl-images-amazon.com/images/I/61iFrMg4%2B3L.SY355.jpg, безопасный и удобный выключатель питания с режимом светодиодной подсветки. Со съемной и гибкой опорой, обеспечивающей устойчивость и вертикальное положение пистолета. Благодаря высококачественному и изолированному соплу пистолет не деформируется даже при длительном использовании при температуре ниже 500 ℉.

    . . . .

  2. Для каждой из строк в csv мне нужно выбрать URL-адрес изображения и запустить API видения и получить 2 верхних ярлыка (например, мы получаем метки L1 и L2 из API видения для первого продукта/строки и L3 и L4 для второго продукта/строки)

  3. Для каждой строки в csv мне нужно объединить название продукта, категорию, описание1 и описание2 и передать их в NL API. Из ответа NL API мне нужно выбрать 2 лучших объекта в категории потребительских товаров (например, мы получаем E1 и E2 из первой строки и E3 и E4 для второй строки)

  4. Мне нужно создать следующую структуру из полученного ответа:

    Название продукта, Тема Sakura 30062 Набор из 6 чернил Pigma Micron, L1 Sakura 30062 Набор из 6 чернил Pigma Micron, L2 Sakura 30062 Набор из 6 чернил Pigma Micron, E1 Sakura 30062 Набор из 6 чернил Pigma Micron , Е2

    CCbetter Мини-клеевой пистолет-расплав с 25 клеевыми стержнями Комплект высокотемпературного плавящегося клеевого пистолета Гибкий триггер для небольших поделок, герметизация и быстрый ремонт (20 Вт, синий), L3 CCbetter Мини-клеевой пистолет-расплав с 25 клеевыми стержнями Высокотемпературный плавящийся клеевой пистолет Комплект Гибкий триггер для небольших поделок, герметизации и быстрого ремонта (20 Вт, синий), L4 CCbetter Мини-пистолет для клея-расплава с клеевыми стержнями 25 шт. , Синий), E3 CCbetter Mini Термоклеевой пистолет с 25 клеевыми стержнями Комплект высокотемпературного плавящегося клеевого пистолета Гибкий триггер для небольших поделок, герметизации и быстрого ремонта (20 Вт, синий), E4. . . .

  5. Я хочу записать эту сетку (структура на шаге 4) в таблицу Bigquery.

Я новичок в Dataflow, поэтому любая помощь, фрагмент кода или весь исходный код или ссылка будут высоко оценены.


person Chirag Giri    schedule 22.11.2016    source источник


Ответы (1)


Вам следует начать с прочтения одного из кратких руководств и изучения некоторых из примеров конвейеров.

Основываясь на вашем описании, общий план может быть таким:

  1. Используйте TextIO.read для чтения контента из GCS. Обратите внимание, что он не поддерживает игнорирование заголовка, поэтому вам, вероятно, придется обнаружить и удалить его самостоятельно.
  2. Напишите DoFn, который использует API видения для URL-адреса из каждой строки файла. Вы даже можете разделить это на несколько DoFn — один для преобразования строки в URL-адрес, затем DoFn для использования API видения, затем DoFn для извлечения двух верхних тегов.
  3. Напишите еще один DoFn или ряд DoFn, который выполняет конкатенацию и использует NL API.
  4. Напишите еще одну DoFn или серию DoFn, которые генерируют строки с желаемым выходным форматом как TableRow.
  5. Используйте преобразование BigQueryIO.write, чтобы записать их в BigQuery.
person Ben Chambers    schedule 22.11.2016