org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsyncImpl вызывает слишком быстро

У меня возникла критическая проблема, я использую клиент Rocket MQ (инкубационный v4.1.0) следующим образом:

2017-10-16 16:18:12:457[ERROR][SimpleProducer$1.onException(SimpleProducer.java:44)] - send message to mq fail:
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsyncImpl invoke too fast
    at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeAsyncImpl(NettyRemotingAbstract.java:422)
    at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:488)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:368)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:455)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:156)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:417)
    at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:51)
    at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:275)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Не знаю, как это исправить, даже если много гулю без правильного ответа.

Вот мой асинхронный код производителя:

public class SimpleProducer {

static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
static AtomicInteger total = new AtomicInteger(0);
private final static CountDownLatch mCountDownLatch = new CountDownLatch(1);
public static void main(String[] args){
    logger.info("Bootstrap start...");
    DefaultMQProducer producer = new DefaultMQProducer("Producer");
    producer.setNamesrvAddr("192.168.137.112:9876");
    try {
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(3);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10000000; i++)
            try {
                {
                    Message msg = new Message("newTopic",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                   producer.send(msg, new SendCallback(){
                       public void onSuccess(SendResult sendResult) {
                           if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
                               //MsgSendResponseCounter.factory.getInstance().onSuccess();
                               logger.info("Succeeded send {} message, total {}", sendResult.getMsgId(), total.getAndIncrement());
                           }
                       }
                       public void onException(Throwable ee) {
                           logger.error("send message to mq fail:", ee);
                       }
                   });
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        mCountDownLatch.await();
        long end = System.currentTimeMillis();
        logger.info("Finished 10000000 msgs! elapsed time {} in all", (end - start)/1000);
    } catch (Exception e) {
        e.printStackTrace();
    }finally{
        producer.shutdown();
    }
}

}


person Alex Cao    schedule 16.10.2017    source источник
comment
если я установил значение 0 для Manufacturer.setRetryTimesWhenSendAsyncFailed (0); новое исключение: org.apache.rocketmq.client.exception.MQClientException: время ожидания ответа 3000 мс. Для получения дополнительных сведений посетите URL-адрес rocketmq.apache.org/docs/faq на org.apache.rocketmq.client.impl.MQClientAPIImpl $ 1.operationComplete (MQClientAPIImpl.java:416) на org.apache.rocketmq. remoting.netty.ResponseFuture.executeInvokeCallback (ResponseFuture.java:51)   -  person Alex Cao    schedule 16.10.2017


Ответы (1)


Ответ на самом деле прост: вы отправляете слишком быстро, что достигает порога управления потоком.

Когда вы отправляете сообщение aysnc, клиент пытается получить разрешение, а после получения ответа брокера он освобождает разрешение.

Поскольку вы отправляете сообщения асинхронным способом, где producer.send() будет возвращаться очень быстро, и вы продолжаете отправлять сообщения в цикле for без каких-либо спящих mili, которые столкнутся с проблемой.

Просто поместите Thread.sleep (100) после вызова producer.send или используйте метод отправки синхронизации, который не требует SendCallBack

person JaskeyLam    schedule 09.11.2017