Scio JobTest, PubSubIO, pubsubSubscriptionWithAttributes, timestampAttribute и проблема с оконным режимом

Я создаю конвейер для резервного копирования данных из PubSub в GCS и хотел создать тест с использованием JobTest, и я изо всех сил пытаюсь заставить PubSubIO правильно получить время события.

PubSub читается с помощью sc.pubsubSubscriptionWithAttributes[String]("path/to/subscription", timestampAttribute = "doc_timestamp"). После этого я применяю оконную обработку и отправляю ее на CustomIO

Тест выглядит так:

JobTest[PubSub2GCS.type]
  .args("--subscription=input", "--targetDir=output")
  .input(PubsubIO[(String, Map[String, String])]("input"), Seq(("Contents", Map[String, String]("doc_timestamp" -> "2001-01-01T09:10:11.332Z"))))
  .output(CustomIO[KV[String, WindowedDoc]]("output"))(_.debug())
  .run()

и в результате значение помещается в окно -290308-12-21T20:00:00.000Z..-290308-12-21T21:00:00.000Z!!. Возможно, потому что дата "doc_timestamp" неправильно истолкована. На самом деле окно никогда не меняется, независимо от значения ключа "doc_timestamp".

К счастью, работа отлично работает в продакшене, но я бы хотел, чтобы эти тесты были написаны.


person Carlos    schedule 02.03.2018    source источник


Ответы (1)


Это связано с тем, что атрибуты Map[String, String] в ScioContext#pubsubSubscriptionWithAttributes не заполняются в JobTest.

Вероятно, мы можем добавить здесь условие и установить метку времени, если ScioContext#isTest и timestampAttribute != null https://github.com/spotify/scio/blob/master/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala#L572

Похоже на исправление мелочи. Не могли бы вы сообщить о проблеме здесь и, возможно, отправить PR?

person Neville Li    schedule 21.09.2018