Я использую следующий простой код, чтобы проиллюстрировать поведение коннектора файловой системы. У меня есть два наблюдения, которые я хочу спросить и подтвердить.
Если я не включил контрольную точку, то все сгенерированные файлы part-XXX всегда будут содержать
inprogress
в имени файла. Означает ли это, что эти файлы не зафиксированы? Кроме того, означает ли это, что если я хочу использовать приемник коннектора файловой системы, мне всегда нужноenable checkpointing
, чтобы сгенерированные файлы могли быть зафиксированы, а нисходящий поток (например, hive или flink) мог обнаруживать и читать эти файлы?Когда
inprogress
файлы перемещаются в нормальное состояние в разделе? Происходит ли это, когда создается новый раздел, и когда контрольная точка начинает работать, а затем файлы в предыдущем разделе из текущего раздела становятся формальными? Если это так, то для того, чтобы раздел был видимым, может быть отображение (интервал контрольной точки).Я установил интервал прокрутки в коде равным 20 секундам, но когда я смотрю на сгенерированные файлы part-XXX, разница во времени создания для последующих файлов составляет 25 секунд. Я думал, это должно быть 20 секунд
eg,
part-90e63e04-466f-45ce-94d4-9781065a8a8a-0-10 2021-01-03 12:39:04
part-90e63e04-466f-45ce-94d4-9781065a8a8a-0-11 2021-01-03 12:39:29
Код такой:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(10*1000)
env.setStateBackend(new FsStateBackend("file:///d:/flink-checkpoints"))
val ds: DataStream[MyEvent] = env.addSource(new InfiniteEventSource(emitInterval = 5 * 1000))
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("sourceTable", ds)
ds.print()
val ddl =
s"""
create table sinkTable(
id string,
p_day STRING,
p_hour STRING,
p_min STRING
) partitioned by(p_day, p_hour, p_min) with (
'connector' = 'filesystem',
'path' = 'D:/csv-${System.currentTimeMillis()}',
'format' = 'csv',
'sink.rolling-policy.check-interval' = '5 s',
'sink.rolling-policy.rollover-interval' = '20 s',
'sink.partition-commit.trigger'='process-time',
'sink.partition-commit.policy.kind'='success-file',
'sink.partition-commit.delay' = '0 s'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
tenv.executeSql(
"""
insert into sinkTable
select id, date_format(occurrenceTime,'yyyy-MM-dd'), date_format(occurrenceTime, 'HH'), date_format(occurrenceTime, 'mm') from sourceTable
""".stripMargin(' '))
env.execute()
}