Как избежать занятого цикла while в управляемой событиями Java

В настоящее время я разрабатываю свое программное обеспечение Java, управляемое событиями, следующим образом (это суть моего основного метода):

while(true) {
    Event event = eventListener.poll();
    if(event != null) {
        // do something
    } else {
        // do nothing as usual, but burn CPU time.
    }
}

В зависимости от того, что я создаю, eventListener может быть чем-то, что прослушивает внешний веб-сокет, опрашивает канал Redis на наличие обновлений или ожидает сообщения от другого процесса, сидящего в том же поле (возможно, через UDP/TCP/shm).

Я думаю, что этот подход с циклом занятости тратит впустую много времени процессора, когда eventListener возвращает null (что происходит большую часть времени), поскольку он просто сидит и вращается. Однако я не знаю, как еще подойти к этому дизайну, кроме как ставить Thread.sleep на каждой итерации, что не является отличным решением.

В идеале я хотел бы иметь метод:

void run(Event event) {
    // do something
}

где run вызывается каждый раз, когда событие достигает eventListener. Если такое событие недоступно, процесс в идеале должен просто сидеть без дела.

Теперь я знаю, что есть библиотеки веб-сокетов, которые действительно могут это сделать, и я хочу знать, как я могу создать что-то подобное для себя и освободить свой процессор от того, чтобы сидеть там и ничего не делать?


person ABC    schedule 10.10.2019    source источник
comment
Лучший способ сделать это — реализовать систему передачи событий, в которой poll не нужно ждать. Например, если poll сводится к select обращениям к некоторым базовым файловым дескрипторам (сокетам, именованным каналам, ...), то вы тратите очень мало процессорного времени, потому что ядро ​​​​знает, что нужно отправить ваш процесс в спящий режим, пока он ожидает события. .   -  person Joachim Sauer    schedule 10.10.2019
comment
@JoachimSauer Возможно ли, что вы могли бы немного уточнить, возможно, с коротким примером? Я не знаком с системой передачи событий, в которой опрос не требует ожидания. Пример, который я привел в своем посте, - это степень моих знаний.   -  person ABC    schedule 10.10.2019
comment
Это не тот уровень, на котором вы делаете ожидание. Вам нужно (например) обернуть связь сокета в классе и выполнить незанятое ожидание при чтении сокета. Когда вы получаете что-то в сокете, вы генерируете событие (в своем классе-оболочке), на которое могут подписаться другие компоненты вашего приложения.   -  person ed22    schedule 10.10.2019
comment
@ABC: в конце концов, если ваш poll где-то сводится к InputStream.read(), то это уже не ожидание занятости: ваш процесс не будет использовать значительное время ЦП, пока он ожидает поступления новых данных. Другие механизмы, такие как NIO/select, обеспечивают еще большую функциональность (например, одновременное чтение из нескольких сокетов и возврат, когда любой из них возвращает данные), но это не является строго необходимым.   -  person Joachim Sauer    schedule 10.10.2019


Ответы (2)


Вам необходимо использовать неблокирующий ввод-вывод Java и может быть некоторая библиотека, которая поддерживает связь высокого уровня через java NIO (например, netty, которая поддерживает связь в стиле NIO для HTTP, веб-сокетов и Redis среди многие другие).

Вот краткое описание того, как работает NIO. То, что вы ищете, это Selector. Это позволяет дождаться, пока данные о канале (который является абстракцией для файла или сетевого соединения и т. д.) станут доступными. Это ожидание (метод Selector.select) блокируется, и процесс возобновляется ОС, когда некоторые данные доступны для чтения или выходной буфер для записи может получить новые данные.

Схематически код выглядит так:

Selector selector = createSelector();
Channel channel = createChannelForSocket();

SelectionKey key = channel.register(selector);

while(true) {

  int readyChannels = selector.select(TIMEOUT);

  if(readyChannels == 0) continue;

  Set<SelectionKey> selectedKeys = selector.selectedKeys();

  for(SelectionKey key : selectedKeys) {

    if (key.isReadable()) {
        readDataFromChannel(key.channel())
    } else if (key.isWritable()) {
        writeDataToChannel(key.channel())
    }

  }
}

С netty у вас есть код более высокого уровня, где вы определяете Handler, у которого есть метод, подобный void channelRead(ChannelHandlerContext ctx, Object msg), который является своего рода прослушивателем событий чтения, который вы можете реализовать для прослушивания событий чтения.

netty имеет встроенный цикл, похожий на приведенный выше пример, но для многих прослушивателей событий, и он передает эти события определенным прослушивателям.

person Roman Konoval    schedule 10.10.2019

Если вы заинтересованы в использовании управляемой событиями архитектуры в масштабе. Возможно, вы захотите использовать надежную «шину событий», например Apache Kafka или AWS SNS+SQS. Чтобы упростить задачу, вы можете использовать kalium.alkal.io. Это позаботится о десериализации объектов POJO или protobuf без проблем.

kalium.on(Event.class, event -> {

   //doSomething with the event
});

person Ziv    schedule 13.11.2019