Как работает приемник коннектора файловой системы

Я использую следующий простой код, чтобы проиллюстрировать поведение коннектора файловой системы. У меня есть два наблюдения, которые я хочу спросить и подтвердить.

  1. Если я не включил контрольную точку, то все сгенерированные файлы part-XXX всегда будут содержать inprogress в имени файла. Означает ли это, что эти файлы не зафиксированы? Кроме того, означает ли это, что если я хочу использовать приемник коннектора файловой системы, мне всегда нужно enable checkpointing, чтобы сгенерированные файлы могли быть зафиксированы, а нисходящий поток (например, hive или flink) мог обнаруживать и читать эти файлы?

  2. Когда inprogress файлы перемещаются в нормальное состояние в разделе? Происходит ли это, когда создается новый раздел, и когда контрольная точка начинает работать, а затем файлы в предыдущем разделе из текущего раздела становятся формальными? Если это так, то для того, чтобы раздел был видимым, может быть отображение (интервал контрольной точки).

  3. Я установил интервал прокрутки в коде равным 20 секундам, но когда я смотрю на сгенерированные файлы part-XXX, разница во времени создания для последующих файлов составляет 25 секунд. Я думал, это должно быть 20 секунд

eg,

part-90e63e04-466f-45ce-94d4-9781065a8a8a-0-10 2021-0‎1-0‎3 ‏‎12:39:04  
part-90e63e04-466f-45ce-94d4-9781065a8a8a-0-11 2021-0‎1-0‎3 ‏‎12:39:29

Код такой:

 val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.enableCheckpointing(10*1000)
    env.setStateBackend(new FsStateBackend("file:///d:/flink-checkpoints"))
    val ds: DataStream[MyEvent] = env.addSource(new InfiniteEventSource(emitInterval = 5 * 1000))
    val tenv = StreamTableEnvironment.create(env)
    tenv.createTemporaryView("sourceTable", ds)

    ds.print()

    val ddl =
      s"""
      create table sinkTable(
      id string,
      p_day STRING,
      p_hour STRING,
      p_min STRING

      ) partitioned by(p_day, p_hour, p_min) with (
        'connector' = 'filesystem',
        'path' = 'D:/csv-${System.currentTimeMillis()}',
        'format' = 'csv',
        'sink.rolling-policy.check-interval' = '5 s',
        'sink.rolling-policy.rollover-interval' = '20 s',
        'sink.partition-commit.trigger'='process-time',
         'sink.partition-commit.policy.kind'='success-file',
        'sink.partition-commit.delay' = '0 s'
      )
      """.stripMargin(' ')

    tenv.executeSql(ddl)

    tenv.executeSql(
      """
     insert into sinkTable
      select id, date_format(occurrenceTime,'yyyy-MM-dd'), date_format(occurrenceTime, 'HH'), date_format(occurrenceTime, 'mm')  from sourceTable

     """.stripMargin(' '))

    env.execute()
  }

person Tom    schedule 03.01.2021    source источник


Ответы (1)


Пункты 1 описаны в документах StreamingFileSink :

ВАЖНО: Контрольные точки должны быть включены при использовании StreamingFileSink. Файлы деталей могут быть завершены только после успешных контрольных точек. Если контрольные точки отключены, файлы деталей навсегда останутся в состоянии in-progress или pending и не могут быть безопасно прочитаны нижестоящими системами.

Для пункта 2 жизненный цикл файла детали задокументирован здесь, в котором объясняется, что in-progress файлы переходят на pending в соответствии с политикой прокрутки и становятся finished только после завершения контрольной точки. Таким образом, в зависимости от политики прокрутки и интервала между контрольными точками, некоторые файлы могут быть pending в течение довольно долгого времени.

Для точки 3 с rollover-interval, равным 20 секундам, и check-interval, равным 5 секундам, ролловер произойдет примерно через 20–25 секунд. См. Политику непрерывного обновления документы для объяснения check-interval:

Интервал проверки политик прокрутки на основе времени. Это контролирует частоту, чтобы проверить, должен ли файл детали опрокидываться на основе'ink.rolling-policy.rollover-interval '.

person David Anderson    schedule 03.01.2021
comment
спасибо @ david-anderson за отличный и полезный ответ! Я только что прочитал документ FileSystem of Table&SQL connector. - person Tom; 04.01.2021