Java: высокопроизводительная передача сообщений (один производитель / один потребитель)

Сначала я задал этот вопрос здесь, но я понял, что мой вопрос не о цикле while-true. Я хочу знать, как правильно выполнять высокопроизводительную асинхронную передачу сообщений в Java?

Что я пытаюсь сделать ...

У меня ~ 10 000 потребителей, каждый из которых получает сообщения из своих частных очередей. У меня есть один поток, который создает сообщения одно за другим и помещает их в очередь нужного потребителя. Каждый потребитель зацикливается на неопределенное время, проверяя появление сообщения в своей очереди и обрабатывая его.

Я считаю, что это термин «один производитель / один потребитель», поскольку существует один производитель, и каждый потребитель работает только в своей частной очереди (несколько потребителей никогда не читают из одной очереди).

Внутри Consumer.java:

@Override
public void run() {
    while (true) {
        Message msg = messageQueue.poll();
        if (msg != null) {
            ... // do something with the message
        }
    }
}

Производитель помещает сообщения в очереди сообщений потребителей в быстром темпе (несколько миллионов сообщений в секунду). Потребители должны обрабатывать эти сообщения как можно быстрее!

Примечание: while (true) { ... } завершается сообщением KILL, отправленным производителем в качестве его последнего сообщения.

Однако у меня вопрос о том, как правильно спроектировать эту передачу сообщений. Какую очередь использовать для messageQueue? Должен ли он быть синхронным или асинхронным? Как следует оформить сообщение? Следует ли использовать цикл while-true? Должен ли Consumer быть потоком или чем-то еще? Будет ли 10 000 потоков замедляться до обхода? Какая альтернатива потокам?

Итак, как правильно выполнять высокопроизводительную передачу сообщений в Java?


person Mr. Burgundy    schedule 29.07.2010    source источник
comment
Почему у вас 10к потоков? При переключении потоков будет много накладных расходов, если только у вас не будет очень много ядер или задач, требующих значительного количества ожидания для каждого потока.   -  person Mike    schedule 30.07.2010
comment
10k потоков далеки от высокой производительности   -  person whiskeysierra    schedule 30.07.2010
comment
@Mike: Существует 10 000 различных символов, и каждый потребитель обрабатывает сообщения для одного символа. Я не знаю, следует ли это реализовывать как потоки, но потребители ничего не разделяют друг с другом и являются хорошим кандидатом для модели акторов.   -  person Mr. Burgundy    schedule 30.07.2010
comment
@Willi Schönborn: У меня вопрос, если не темы, то что? Я бы хотел обойтись без библиотеки / фреймворка; например, каков правильный дизайн?   -  person Mr. Burgundy    schedule 30.07.2010
comment
Чтобы узнать, что такое подходящий дизайн, нам нужно знать, что вы на самом деле делаете. Откуда приходят сообщения? Что они собой представляют и т. Д.   -  person nos    schedule 30.07.2010


Ответы (5)


Я бы сказал, что накладные расходы на переключение контекста в 10 000 потоков будут очень высокими, не говоря уже о накладных расходах памяти. По умолчанию на 32-битных платформах каждый поток использует размер стека по умолчанию 256 КБ, так что это 2,5 ГБ только для вашего стека. Очевидно, вы говорите о 64-битной системе, но даже в этом случае это довольно большой объем памяти. Из-за объема используемой памяти кеш будет перегружен, и процессор будет регулироваться пропускной способностью памяти.

Я бы поискал дизайн, который избегает использования такого количества потоков, чтобы избежать выделения большого количества стека и накладных расходов на переключение контекста. Вы не можете обрабатывать 10 000 потоков одновременно. Текущее оборудование обычно имеет менее 100 ядер.

Я бы создал одну очередь на каждый аппаратный поток и отправлял сообщения циклически. Если время обработки значительно различается, существует опасность, что некоторые потоки завершат обработку своей очереди до того, как им будет предоставлена ​​дополнительная работа, в то время как другие потоки никогда не справятся с отведенной им работой. Этого можно избежать, используя кражу работы, как это реализовано в инфраструктуре JSR-166 ForkJoin.

Поскольку связь от издателя к подписчикам является односторонней, сообщение не требует особого оформления, если подписчик не изменяет сообщение после его публикации.

РЕДАКТИРОВАТЬ: чтение комментариев, если у вас есть 10000 символов, затем создайте несколько общих потоков подписчиков (один поток подписчиков на ядро), которые асинхронно получают сообщения от издателя (например, через свою очередь сообщений). Подписчик извлекает сообщение из очереди, извлекает символ из сообщения и просматривает его в карте обработчиков сообщений, извлекает обработчик и вызывает обработчик для синхронной обработки сообщения. После этого он повторяется, выбирая следующее сообщение из очереди. Если сообщения для одного и того же символа должны обрабатываться по порядку (поэтому я предполагаю, что вам нужно 10 000 очередей), вам необходимо сопоставить символы с подписчиками. Например. если имеется 10 абонентов, то символы 0-999 переходят к абоненту 0, 1000-1999 - к абоненту 1 и т. д. Более усовершенствованная схема состоит в отображении символов в соответствии с их частотным распределением, так что каждый абонент получает примерно одинаковую нагрузку. Например, если 10% трафика составляет символ 0, то абонент 0 будет иметь дело только с этим одним символом, а другие символы будут распределены между другими подписчиками.

person mdma    schedule 29.07.2010
comment
Есть ли способ написать мою программу так, чтобы концептуально это было 10 000 отдельных потребителей, каждый из которых работал со своей собственной очередью? Но работать как несколько потоков с несколькими очередями? - person Mr. Burgundy; 30.07.2010
comment
@ Mr.Burgundy Конечно, есть много подходов. например в качестве простого подхода вы можете инкапсулировать логику потребителя в класс (не связанный с потоком потребителя), набить 10 тыс. из них в списке, попросить один поток потребителя найти нужный и вызвать логику для этого конкретного потребителя сообщения. - person nos; 30.07.2010
comment
@nos - это будет однопоточный. Это ограничит достижимую производительность при наличии нескольких ядер. - person mdma; 30.07.2010
comment
как уже говорилось, это был простой подход. Если вы используете двухъядерный процессор, и ваш производитель выполняет столько же работы, сколько и потребитель, это все, что вам нужно. Следующий немного менее простой подход - создать N потоков-потребителей. Или оставьте один поток потребителей и отправьте работу подходящему сконфигурированному ExecutorService, или проиграйте потоку потребителя и отправьте работу непосредственно исполнителю. - person nos; 30.07.2010

Вы можете использовать это (кредит принадлежит Какой ThreadPool в Java мне следует использовать?):

class Main {
    ExecutorService threadPool = Executors.newFixedThreadPool(
                                     Runtime.availableProcessors()*2);

    public static void main(String[] args){
        Set<Consumer> consumers = getConsumers(threadPool);
        for(Consumer consumer : consumers){
            threadPool.execute(consumer);
        }
    }
}

а также

class Consumer {
    private final ExecutorService tp;
    private final MessageQueue messageQueue;
    Consumer(ExecutorService tp,MessageQueue queue){
        this.tp = tp;
        this.messageQueue = queue;
    }
    @Override
    public void run(){
              Message msg = messageQueue.poll();

              if (msg != null) {
                  try{
                       ... // do something with the message
                  finally{
                       this.tp.execute(this);
                  }
              }
           }
    }
}    

Таким образом, у вас может быть хорошее планирование с очень небольшими хлопотами.

person Enno Shioji    schedule 29.07.2010

Во-первых, не существует единственного правильного ответа, если вы не разместите полную проектную документацию или не попробуете разные подходы для себя.

Я предполагаю, что ваша обработка не будет требовательной к вычислениям, иначе вы бы не подумали об одновременной обработке 10000 очередей. Одно из возможных решений - свести к минимуму переключение контекста, имея один-два потока на процессор. Если ваша система не будет обрабатывать данные в строгом режиме реального времени, это может вызвать большие задержки в каждой очереди, но в целом лучшую пропускную способность.

Например, пусть ваш поток-производитель запускается на собственном процессоре и помещает пакеты сообщений в потоки-потребители. Затем каждый потребительский поток распределяет сообщения в свои N частных очередей, выполняет этап обработки, получает новый пакет данных и так далее. Опять же, это зависит от вашего допуска к задержке, поэтому этап обработки может означать обработку всех очередей, фиксированного количества очередей или столько очередей, сколько может, если не будет достигнут порог времени. Возможность легко определить, какая очередь принадлежит какому потребительскому потоку (например, если очереди пронумерованы последовательно: int consumerThreadNum = queueNum & 0x03), будет полезна, поскольку поиск их в хеш-таблице каждый раз может быть медленным.

Чтобы свести к минимуму нехватку памяти, создание / уничтожение очередей все время может быть не очень хорошей идеей, поэтому вы можете предварительно выделить (максимальное количество очередей / количество ядер) объекты очереди для каждого потока. Когда очередь завершена, а не уничтожена, ее можно очистить и использовать повторно. Вы же не хотите, чтобы gc слишком часто и слишком долго мешал вам.

Еще неизвестно, производит ли ваш производитель полные наборы данных для каждой очереди или отправляет данные порциями до тех пор, пока не будет получена команда KILL. Если ваш производитель отправляет полные наборы данных, вы можете полностью отказаться от концепции очереди и просто обрабатывать данные по мере их поступления в поток потребителя.

person Gilead    schedule 29.07.2010

Имейте пул потребительских потоков относительно оборудования и емкости операционной системы. Эти потребительские потоки могут опрашивать вашу очередь сообщений.

Я бы либо сообщил, что сообщения умеют обрабатывать самих себя, либо зарегистрировал бы процессоры в классах потребительских потоков при их инициализации.

person Jacob Tomaw    schedule 29.07.2010

Из-за отсутствия более подробной информации об ограничениях обработки символов трудно дать конкретный совет.

Вам следует взглянуть на эту статью о слэшдоте:

http://developers.slashdot.org/story/10/07/27/1925209/Java-IO-Faster-Than-NIO

В нем довольно много обсуждений и фактических данных измерений по аргументам многих потоков по сравнению с одним выбором и пулом потоков.

person James Branigan    schedule 29.07.2010