Видео на YouTube и репозиторий github, на которые я ссылался в этом ответе, охватывают ряд похожих сценариев. Но лучший способ загрузить состояние Flink - это предварительно загрузить данные в точку сохранения с помощью тега API обработчика состояний.
Имейте в виду, что MapState
Flink - это своего рода состояние с разделением по ключам. Таким образом, если вы используете MapState<Metadata::Id, Metadata>
, это фактически Map<KEY, MapState<Metadata::Id, Metadata>>
, сегментированный по кластеру с помощью KEY.
Вот пример, показывающий, как создать точку сохранения, содержащую ValueState<Integer>
:
public class Bootstrap {
public static void main( String[] args ) throws Exception {
ExecutionEnvironment bEnv =
ExecutionEnvironment.getExecutionEnvironment();
BootstrapTransformation<Integer> transform =
OperatorTransformation.bootstrapWith(bEnv.fromElements(1, 2, 3))
.keyBy(String::valueOf)
.transform(new SimplestTransform());
Savepoint
.create(new FsStateBackend("file:///tmp/checkpoints"), 256)
.withOperator("my-operator-uid", transform)
.write("file:///tmp/savepoints/");
bEnv.execute();
}
static public class SimplestTransform
extends KeyedStateBootstrapFunction<String, Integer> {
ValueState<Integer> state;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> descriptor = new
ValueStateDescriptor<>("total", Types.INT);
state = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Integer value, Context ctx) throws Exception {
state.update(value);
}
}
}
Это создает сегментированную карту ключ / значение, содержащую {"1": 1, "2": 2, "3": 3}
.
person
David Anderson
schedule
23.11.2020