Есть ли структура данных, похожая на очередь, которая может выполнять многоадресную рассылку в разные потоки?

Я хотел бы отправить одно и то же сообщение в набор потоков таким образом, чтобы каждый поток получал все сообщения, которые я добавляю в очередь. Это похоже на очередь вещания. Есть ли такая структура данных, желательно на Java?


person JohnPristine    schedule 17.06.2014    source источник
comment
где были бы запущенные потоки?   -  person Typo    schedule 17.06.2014


Ответы (3)


Храните данные в статической ConcurrentLinkedQueue. , и позвольте вашим потокам получить к нему доступ.

person Mifmif    schedule 17.06.2014
comment
Это не работает. Как только поток получит сообщение, другой поток не получит того же сообщения. - person JohnPristine; 17.06.2014
comment
@JohnPristine не голосуйте, просто посмотрите :) - person Mifmif; 17.06.2014
comment
если (queue.peek ()! = null) захватить (queue.poll ()); - person Kyte; 17.06.2014
comment
Я не думаю, что ConcurrentLinkedQueue будет работать ... После того, как вы опросите () очередь, другие потребители пропустят это сообщение. - person rdalmeida; 17.06.2014
comment
Я сказал, что просто просмотрите () ваши данные. и опрашивайте его в соответствии с вашими конкретными критериями. - person Mifmif; 17.06.2014
comment
@Mifmif, но если я отправлю два сообщения в очередь, как это работает? В конце концов мне нужно провести опрос, чтобы пришло следующее сообщение. Кто-то должен будет опросить, и как я могу гарантировать, что сообщение получено после опроса? - person JohnPristine; 17.06.2014
comment
@Mifmif, я думаю, вы упускаете суть. JohnPristine хочет, чтобы каждый новый элемент данных автоматически удалялся из очереди, как только каждый зарегистрированный поток получит его, но не раньше. Также может потребоваться метод, ожидающий появления сообщения, которое вызывающий поток еще не видел. - person Solomon Slow; 17.06.2014
comment
@JohnPristine кажется, что вы хотите, чтобы каждый поток работал с сообщениями, не дожидаясь опроса () от кого-то. Если это так, вам нужно больше, чем ConcurrentLinkedQueue, может быть что-то похожее на систему Pub / Sub, где каждый поток будет иметь это список сообщений. - person Mifmif; 17.06.2014

Вы можете использовать для этого паттерн Disruptor. Если вам нужно что-то похожее на структуру данных, вы можете проверить Splitter из CoralQueue. Это позволяет производителю отправлять сообщения нескольким потребителям таким образом, чтобы каждый потребитель получал и обрабатывал каждое сообщение.

Вот простой пример:

package com.coralblocks.coralqueue.sample.splitter;

import com.coralblocks.coralqueue.splitter.AtomicSplitter;
import com.coralblocks.coralqueue.splitter.Splitter;
import com.coralblocks.coralqueue.util.Builder;

public class Basics {

    private static final int NUMBER_OF_CONSUMERS = 4;

    public static void main(String[] args) {

        Builder<StringBuilder> builder =  new Builder<StringBuilder>() {
            @Override
            public StringBuilder newInstance() {
                return new StringBuilder(1024);
            }
        };

        final Splitter<StringBuilder> splitter = new AtomicSplitter<StringBuilder>(1024, builder, NUMBER_OF_CONSUMERS);

        Thread producer = new Thread(new Runnable() {

            private final StringBuilder getStringBuilder() {

                StringBuilder sb;

                while((sb = splitter.nextToDispatch()) == null) {
                    // splitter can be full if the size of the splitter
                    // is small and/or the consumer is too slow

                    // busy spin (you can also use a wait strategy instead)
                }
                return sb;
            }

            @Override
            public void run() {

                StringBuilder sb;

                while(true) { // the main loop of the thread

                    // (...) do whatever you have to do here...

                    // and whenever you want to send a message to
                    // the other thread you can just do:
                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hello!");
                    splitter.flush();

                    // you can also send in batches to increase throughput:
                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hi!");

                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hi again!");

                    splitter.flush(); // dispatch the two messages above...
                }
            }
        }, "Producer");

        final Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS];

        for(int i = 0; i < consumers.length; i++) {

            final int index = i;

            consumers[i] = new Thread(new Runnable() {

                @SuppressWarnings("unused")
                @Override
                public void run() {

                    while (true) { // the main loop of the thread

                        // (...) do whatever you have to do here...

                        // and whenever you want to check if the producer
                        // has sent a message you just do:

                        long avail;
                        while((avail = splitter.availableToPoll(index)) == 0) {
                            // splitter can be empty!
                            // busy spin (you can also use a wait strategy instead)
                        }

                        for(int i = 0; i < avail; i++) {

                            StringBuilder sb = splitter.poll(index);

                            // (...) do whatever you want to do with the data
                            // just don't call toString() to create garbage...
                            // copy byte-by-byte instead...
                        }

                        splitter.donePolling(index);
                    }
                }
            }, "Consumer" + index);
        }

        for(int i = 0; i < consumers.length; i++) {
            consumers[i].start();
        }

        producer.start();
    }
}
person rdalmeida    schedule 17.06.2014

Вы можете создать поток, чтобы забрать сообщение из очереди, а затем отправить его всему потоку, сохранив сообщение в локальной очереди для каждого потока.

person mohamed sulibi    schedule 18.06.2014