Я хотел бы отправить одно и то же сообщение в набор потоков таким образом, чтобы каждый поток получал все сообщения, которые я добавляю в очередь. Это похоже на очередь вещания. Есть ли такая структура данных, желательно на Java?
Есть ли структура данных, похожая на очередь, которая может выполнять многоадресную рассылку в разные потоки?
comment
где были бы запущенные потоки?
- person Typo   schedule 17.06.2014
comment
Вы видели stackoverflow.com/questions/8891284/ а>? Возможно, вы сможете адаптировать github.com/android/platform_frameworks_base/blob/master/?
- person Ray Toal   schedule 17.06.2014
Ответы (3)
Храните данные в статической ConcurrentLinkedQueue. , и позвольте вашим потокам получить к нему доступ.
person
Mifmif
schedule
17.06.2014
Это не работает. Как только поток получит сообщение, другой поток не получит того же сообщения.
- person JohnPristine; 17.06.2014
@JohnPristine не голосуйте, просто посмотрите :)
- person Mifmif; 17.06.2014
если (queue.peek ()! = null) захватить (queue.poll ());
- person Kyte; 17.06.2014
Я не думаю, что ConcurrentLinkedQueue будет работать ... После того, как вы опросите () очередь, другие потребители пропустят это сообщение.
- person rdalmeida; 17.06.2014
Я сказал, что просто просмотрите () ваши данные. и опрашивайте его в соответствии с вашими конкретными критериями.
- person Mifmif; 17.06.2014
@Mifmif, но если я отправлю два сообщения в очередь, как это работает? В конце концов мне нужно провести опрос, чтобы пришло следующее сообщение. Кто-то должен будет опросить, и как я могу гарантировать, что сообщение получено после опроса?
- person JohnPristine; 17.06.2014
@Mifmif, я думаю, вы упускаете суть. JohnPristine хочет, чтобы каждый новый элемент данных автоматически удалялся из очереди, как только каждый зарегистрированный поток получит его, но не раньше. Также может потребоваться метод, ожидающий появления сообщения, которое вызывающий поток еще не видел.
- person Solomon Slow; 17.06.2014
@JohnPristine кажется, что вы хотите, чтобы каждый поток работал с сообщениями, не дожидаясь опроса () от кого-то. Если это так, вам нужно больше, чем ConcurrentLinkedQueue, может быть что-то похожее на систему Pub / Sub, где каждый поток будет иметь это список сообщений.
- person Mifmif; 17.06.2014
Вы можете использовать для этого паттерн Disruptor. Если вам нужно что-то похожее на структуру данных, вы можете проверить Splitter
из CoralQueue. Это позволяет производителю отправлять сообщения нескольким потребителям таким образом, чтобы каждый потребитель получал и обрабатывал каждое сообщение.
Вот простой пример:
package com.coralblocks.coralqueue.sample.splitter;
import com.coralblocks.coralqueue.splitter.AtomicSplitter;
import com.coralblocks.coralqueue.splitter.Splitter;
import com.coralblocks.coralqueue.util.Builder;
public class Basics {
private static final int NUMBER_OF_CONSUMERS = 4;
public static void main(String[] args) {
Builder<StringBuilder> builder = new Builder<StringBuilder>() {
@Override
public StringBuilder newInstance() {
return new StringBuilder(1024);
}
};
final Splitter<StringBuilder> splitter = new AtomicSplitter<StringBuilder>(1024, builder, NUMBER_OF_CONSUMERS);
Thread producer = new Thread(new Runnable() {
private final StringBuilder getStringBuilder() {
StringBuilder sb;
while((sb = splitter.nextToDispatch()) == null) {
// splitter can be full if the size of the splitter
// is small and/or the consumer is too slow
// busy spin (you can also use a wait strategy instead)
}
return sb;
}
@Override
public void run() {
StringBuilder sb;
while(true) { // the main loop of the thread
// (...) do whatever you have to do here...
// and whenever you want to send a message to
// the other thread you can just do:
sb = getStringBuilder();
sb.setLength(0);
sb.append("Hello!");
splitter.flush();
// you can also send in batches to increase throughput:
sb = getStringBuilder();
sb.setLength(0);
sb.append("Hi!");
sb = getStringBuilder();
sb.setLength(0);
sb.append("Hi again!");
splitter.flush(); // dispatch the two messages above...
}
}
}, "Producer");
final Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS];
for(int i = 0; i < consumers.length; i++) {
final int index = i;
consumers[i] = new Thread(new Runnable() {
@SuppressWarnings("unused")
@Override
public void run() {
while (true) { // the main loop of the thread
// (...) do whatever you have to do here...
// and whenever you want to check if the producer
// has sent a message you just do:
long avail;
while((avail = splitter.availableToPoll(index)) == 0) {
// splitter can be empty!
// busy spin (you can also use a wait strategy instead)
}
for(int i = 0; i < avail; i++) {
StringBuilder sb = splitter.poll(index);
// (...) do whatever you want to do with the data
// just don't call toString() to create garbage...
// copy byte-by-byte instead...
}
splitter.donePolling(index);
}
}
}, "Consumer" + index);
}
for(int i = 0; i < consumers.length; i++) {
consumers[i].start();
}
producer.start();
}
}
person
rdalmeida
schedule
17.06.2014
Вы можете создать поток, чтобы забрать сообщение из очереди, а затем отправить его всему потоку, сохранив сообщение в локальной очереди для каждого потока.
person
mohamed sulibi
schedule
18.06.2014