Сначала позвольте мне задать свой вопрос, тогда не могли бы вы пояснить мое предположение о методе apply?
Вопрос: если мое приложение создает 1.500.000 (приблизительно) записей в каждый минутный интервал и задание flink читает эти записи от потребителя kafka с помощью, скажем, 15 ++ различных операторов, тогда эта логика может создать задержку, противодавление и т. Д.? (вы можете предположить, что параллелизм равен 16)
public class Sample{
//op1 =
kafkaSource
.keyBy(something)
.timeWindow(Time.minutes(1))
.apply(new ApplySomething())
.name("Name")
.addSink(kafkaSink);
//op2 =
kafkaSource
.keyBy(something2)
.timeWindow(Time.seconds(1)) // let's assume that this one second
.apply(new ApplySomething2())
.name("Name")
.addSink(kafkaSink);
// ...
//op16 =
kafkaSource
.keyBy(something16)
.timeWindow(Time.minutes(1))
.apply(new ApplySomething16())
.name("Name")
.addSink(kafkaSink);
}
// ..
public class ApplySomething ... {
private AnyObject object;
private int threshold = 30, 40, 100 ...;
@Override
public void open(Configuration parameters) throws Exception{
object = new AnyObject();
}
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Record> input, Collector<Result> out) throws Exception{
int counter = 0;
for (Record each : input){
counter += each.getValue();
if (counter > threshold){
out.collec(each.getResult());
return;
}
}
}
}
- Если да, следует ли использовать flatMap с состоянием (RockDB) вместо timeWindow?
- My prediction is "YES". Let me explain why i am thinking like that:
- If parallelism is 16 than there will be a 16 different instances of indivudual
ApplySomething1(), ApplySomething2()...ApplySomething16()
and also there will be sixteenAnyObject()
instances for perApplySomething..()
classes. - Когда приложение работает, если номер
keyBy(something)
partition больше 16 (предполагается, что мое приложение имеет 1.000.000 различныхsomething
в день), тогда некоторые изApplySomething..()
instances будут обрабатывать разные ключи, поэтому одинapply()
должен ждать циклов в других перед обработкой. Тогда это приведет к задержке?
- If parallelism is 16 than there will be a 16 different instances of indivudual