RocketMQ бросает исключение [TIMEOUT_CLEAN_QUEUE]брокер занят, запустите управление потоком на некоторое время

версия: RocketMQ-все-4.1.0-инкубации

Мы отправляем msg 1000 QPS, синхронизируем отправку, но выдаем исключение: -

[TIMEOUT_CLEAN_QUEUE] брокер занят, запустите управление потоком на некоторое время

Есть соответствующий код:

while (true) {
        try {
            if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
                final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek();
                if (null == runnable) {
                    break;
                }
                final RequestTask rt = castRunnable(runnable);
                if (rt == null || rt.isStopRun()) {
                    break;
                }

                final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) {
                    if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) {
                        rt.setStopRun(true);
                        rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size()));
                    }
                } else {
                    break;
                }
            } else {
                break;
            }
        } catch (Throwable ignored) {
        }
    }
}

Я считаю, что у брокера значение sendMessageThreadPoolNums по умолчанию равно 1,

/**
 * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1.
 */
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;

но предыдущая версия не 1, и если я настрою sendMessageThreadPoolNums = 100, можно решить этот вопрос? Это приведет к тому, что отличается от значения по умолчанию. Благодарность


person uestc lyb    schedule 11.12.2017    source источник


Ответы (1)


КРАТКИЙ ОТВЕТ:

у вас есть два варианта:

установите для sendMessageThreadPoolNums небольшое число, скажем, 1, которое является значением по умолчанию после версии 4.1.x. И оставьте значение по умолчанию useReentrantLockWhenPutMessage=false, которое введено после версии 4.1.x.

 sendMessageThreadPoolNums=1 
 useReentrantLockWhenPutMessage=false

Если вам нужно использовать большое количество потоков для обработки отправки сообщения, вам лучше использовать useReentrantLockWhenPutMessage=true

 sendMessageThreadPoolNums=128//large thread numbers
 useReentrantLockWhenPutMessage=true  // indicating that do NOT use spin lock but use ReentrantLock when putting message
person JaskeyLam    schedule 28.03.2018