Экземпляр объекта, относящегося к flink Parallelism & Apply Method

  • Сначала позвольте мне задать свой вопрос, тогда не могли бы вы пояснить мое предположение о методе apply?

  • Вопрос: если мое приложение создает 1.500.000 (приблизительно) записей в каждый минутный интервал и задание flink читает эти записи от потребителя kafka с помощью, скажем, 15 ++ различных операторов, тогда эта логика может создать задержку, противодавление и т. Д.? (вы можете предположить, что параллелизм равен 16)

public class Sample{
  //op1 = 
     kafkaSource
                .keyBy(something)
                .timeWindow(Time.minutes(1))
                .apply(new ApplySomething())
                .name("Name")
                          .addSink(kafkaSink);
  //op2 = 
    kafkaSource
                .keyBy(something2)
                .timeWindow(Time.seconds(1)) // let's assume that this one second
                .apply(new ApplySomething2())
                .name("Name")
                          .addSink(kafkaSink);
 // ...

  //op16 = 
    kafkaSource
                .keyBy(something16)
                .timeWindow(Time.minutes(1)) 
                .apply(new ApplySomething16())
                .name("Name")
                          .addSink(kafkaSink);

}
// ..
public class ApplySomething ... {
  private AnyObject object;
  private int threshold = 30, 40, 100 ...;

      @Override
    public void open(Configuration parameters) throws Exception{
        object = new AnyObject();
    }

    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Record> input, Collector<Result> out) throws Exception{
        int counter = 0;
        for (Record each : input){
          counter += each.getValue();
          if (counter > threshold){
            out.collec(each.getResult());
            return;
          }
        }
    }
}
  • Если да, следует ли использовать flatMap с состоянием (RockDB) вместо timeWindow?
  • My prediction is "YES". Let me explain why i am thinking like that:
    • If parallelism is 16 than there will be a 16 different instances of indivudual ApplySomething1(), ApplySomething2()...ApplySomething16() and also there will be sixteen AnyObject() instances for per ApplySomething..() classes.
    • Когда приложение работает, если номер keyBy(something)partition больше 16 (предполагается, что мое приложение имеет 1.000.000 различных something в день), тогда некоторые из ApplySomething..()instances будут обрабатывать разные ключи, поэтому один apply() должен ждать циклов в других перед обработкой. Тогда это приведет к задержке?

person monstereo    schedule 27.05.2020    source источник


Ответы (1)


Временные окна Flink выровнены по эпохе (например, если у вас есть несколько почасовых окон, все они будут срабатывать в час). Поэтому, если вы намерены использовать в своей работе несколько разных окон, как это, вы должны настроить их так, чтобы они имели разные смещения, чтобы они не запускались одновременно. Это позволит распределить нагрузку. Это будет выглядеть примерно так

.window(TumblingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(15))

(или используйте TumblingEventTimeWindows в зависимости от обстоятельств). Это создаст минутные окна, которые срабатывают через 15 секунд после каждой минуты.

Если позволяет ваш вариант использования, вы должны использовать инкрементную агрегацию (через reduce или aggregate), а не использовать WindowFunction (или ProcessWindowFunction), который должен собирать все события, назначенные каждому окну в списке, прежде чем обрабатывать их как своего рода мини -партия.

Ключевое временное окно сохранит свое состояние в RocksDB, если вы настроили RocksDB в качестве серверной части состояния. Вам не нужно переключаться на RichFlatMap, чтобы получить доступ к RocksDB. (Более того, поскольку flatMap не может использовать таймеры, я предполагаю, что вы действительно будете использовать вместо этого функцию процесса.)

В то время как любой из параллельных экземпляров оконного оператора занят выполнением своей оконной функции (одной из ApplySomethings), вы правы, полагая, что эта задача не будет делать ничего другого - и, следовательно, она (если не завершится очень быстро) создаст временное противодавление. При необходимости вы захотите увеличить параллелизм, чтобы задание могло удовлетворить ваши требования к пропускной способности и задержке.

person David Anderson    schedule 27.05.2020