версия: RocketMQ-все-4.1.0-инкубации
Мы отправляем msg 1000 QPS, синхронизируем отправку, но выдаем исключение: -
[TIMEOUT_CLEAN_QUEUE] брокер занят, запустите управление потоком на некоторое время
Есть соответствующий код:
while (true) {
try {
if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) {
if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
Я считаю, что у брокера значение sendMessageThreadPoolNums по умолчанию равно 1,
/**
* thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1.
*/
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
но предыдущая версия не 1, и если я настрою sendMessageThreadPoolNums = 100
, можно решить этот вопрос? Это приведет к тому, что отличается от значения по умолчанию. Благодарность