Как заставить поток ждать, пока переменная не достигнет определенного значения (многопоточная Java)

У меня есть серверная программа, которая принимает клиентские соединения. Эти клиентские подключения могут принадлежать многим потокам. Например, два или более клиентов могут принадлежать к одному потоку. Из этих потоков я должен передать одно сообщение, но я должен подождать, пока все потоки не будут установлены. Для этого я поддерживаю следующую структуру данных.

ConcurrentHashMap<Integer, AtomicLong> conhasmap = new ConcurrentHashMap<Integer, AtomicLong>();

Целое число - это идентификатор потока, а Long - номер клиента. Чтобы заставить один поток для данного потока ждать, пока AtomicLong достигнет определенного значения, я использовал следующий цикл. Фактически первый пакет потока помещает в него идентификатор потока и количество соединений для ожидания. С каждым подключением я уменьшаю количество подключений до ожидания.

while(conhasmap.get(conectionID) != new AtomicLong(0)){
       // Do nothing
}

Однако этот цикл блокирует другие потоки. Согласно этому ответу, он выполняет непостоянное чтение. Как я могу изменить код, чтобы дождаться правильного потока для данного потока, пока он не достигнет определенного значения?


person user340    schedule 21.01.2015    source источник
comment
Почему бы не использовать thread.wait () и не разбудить его после того, как ваш сервер примет новое соединение? Еще одно менее элегантное решение - поспать на некоторое время и проверить значение.   -  person PbxMan    schedule 21.01.2015
comment
Возможно использование Condition ?   -  person fge    schedule 21.01.2015
comment
@PbxMan Я жду поток, уже принятый сервером. Да, поспать некоторое время и прочитать значение может сработать, но я ищу лучший способ.   -  person user340    schedule 21.01.2015
comment
Просто подумайте: нельзя ли дождаться значения AtomicLong и уведомить, когда поток ожидает потока, тогда значение достигает нуля?   -  person user340    schedule 21.01.2015
comment
@fge проверит состояние   -  person user340    schedule 21.01.2015
comment
Если вы правильно понимаете, вы можете использовать CountDownLatch или CyclicBarrier, в зависимости от того, что вам больше подходит. Поскольку защелки не подлежат сбросу после достижения состояния завершения.   -  person fmucar    schedule 21.01.2015
comment
while(conhasmap.get(conectionID) != new AtomicLong(0)) ерунда. Что бы ни возвращал get, он никогда не может быть идентичным экземпляру, который вы только что создали с помощью new; так что он будет зацикливаться вечно. Но он не блокирует другие потоки, замедляя работу всей вашей машины, потому что вы выполняете цикл опроса, да, но не блокируете.   -  person Holger    schedule 23.01.2015


Ответы (2)


Если вы используете Java 8, CompletableFuture может вам подойти. Вот полный, надуманный пример, который ожидает, когда 5 клиентов подключатся и отправят сообщение на сервер (смоделировано с использованием BlockingQueue с предложением / опросом).

В этом примере при достижении ожидаемого количества сообщений, подключенных к клиенту, завершается ловушка CompletableFuture, которая затем запускает произвольный код в любом потоке по вашему выбору.

В этом примере у вас нет сложных настроек ожидания / уведомления потока или циклов ожидания при занятости.

package so.thread.state;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class Main {

  public static String CONNETED_MSG = "CONNETED";
  public static Long EXPECTED_CONN_COUNT = 5L;

  public static ExecutorService executor = Executors.newCachedThreadPool();
  public static BlockingQueue<String> queue = new LinkedBlockingQueue<>();

  public static AtomicBoolean done = new AtomicBoolean(false);

  public static void main(String[] args) throws Exception {

    // add a "server" thread
    executor.submit(() -> server());

    // add 5 "client" threads
    for (int i = 0; i < EXPECTED_CONN_COUNT; i++) {
      executor.submit(() -> client());
    }

    // clean shut down
    Thread.sleep(TimeUnit.SECONDS.toMillis(1));
    done.set(true);
    Thread.sleep(TimeUnit.SECONDS.toMillis(1));
    executor.shutdown();
    executor.awaitTermination(1, TimeUnit.SECONDS);

  }

  public static void server() {

    print("Server started up");
    // track # of client connections established
    AtomicLong connectionCount = new AtomicLong(0L);

    // at startup, create my "hook"
    CompletableFuture<Long> hook = new CompletableFuture<>();
    hook.thenAcceptAsync(Main::allClientsConnected, executor);

    // consume messages
    while (!done.get()) {
      try {
        String msg = queue.poll(5, TimeUnit.MILLISECONDS);
        if (null != msg) {
          print("Server received client message");
          if (CONNETED_MSG.equals(msg)) {
            long count = connectionCount.incrementAndGet();

            if (count >= EXPECTED_CONN_COUNT) {
              hook.complete(count);
            }
          }
        }

      } catch (Exception e) {
        e.printStackTrace();
      }
    }

    print("Server shut down");

  }

  public static void client() {
    queue.offer(CONNETED_MSG);
    print("Client sent message");
  }

  public static void allClientsConnected(Long count) {
    print("All clients connected, count: " + count);
  }


  public static void print(String msg) {
    System.out.println(String.format("[%s] %s", Thread.currentThread().getName(), msg));
  }
}

Вы получаете такой результат

[pool-1-thread-1] Server started up
[pool-1-thread-5] Client sent message
[pool-1-thread-3] Client sent message
[pool-1-thread-2] Client sent message
[pool-1-thread-6] Client sent message
[pool-1-thread-4] Client sent message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-4] All clients connected, count: 5
[pool-1-thread-1] Server shut down
person Alex    schedule 21.01.2015
comment
Я заранее не знаю количество подключений клиентов. Соединений может быть много, и эти соединения имеют разные потоки. Я должен ждать в конкретном потоке. - person user340; 21.01.2015
comment
@kani, это надуманный пример, вы можете использовать этот тип шаблона с любой логикой (та же бизнес-логика, что у вас есть сейчас), а затем использовать CompletableFuture.complete(..), я просто пытаюсь показать новую парадигму, которую представила Java, чтобы сделать композиция сложного асинхронного поведения более плавная, чем рассуждения вокруг Object.wait() и Object.notify() - person Alex; 21.01.2015
comment
Мои клиентские потоки сгруппированы по идентификатору связи. например, если имеется 15 клиентских подключений, 10 принадлежат одному сообщению A, а 5 - сообщению B. Я должен ждать 10 потоков для A и 5 потоков для B. Возможно ли это с CompletableFuture? Я вижу, что для этого требуется, чтобы потоки собирались с помощью исполнителя, и я не могу различить поток связи при запуске потока. - person user340; 22.01.2015

Ваше выражение:

conhasmap.get(conectionID) != new AtomicLong(0)

всегда будет истинным, потому что вы сравниваете ссылки на объекты, которые никогда не будут равны, вместо значений. Лучшее выражение было бы:

conhasmap.get(conectionID).longValue() != 0L)

, но такой цикл без логики ожидания / уведомления внутри цикла не является хорошей практикой, поскольку он постоянно использует время процессора. Вместо этого каждый поток должен вызывать .wait () в экземпляре AtomicLong, а когда он уменьшается или увеличивается, вы должны вызывать .notifyAll () в экземпляре AtomicLong, чтобы разбудить каждый ожидающий поток для проверки выражения. Класс AtomicLong может уже вызывать метод notifyAll () всякий раз, когда он изменяется, но я не знаю наверняка.

AtomicLong al = conhasmap.get(conectionID);
synchronized(al) {
    while(al.longValue() != 0L) {
        al.wait(100); //wait up to 100 millis to be notified
    }
}

В коде, который увеличивается / уменьшается, это будет выглядеть так:

AtomicLong al = conhasmap.get(conectionID);
synchronized(al) {
    if(al.decrementAndGet() == 0L) {
        al.notifyAll();
    }
}

Я лично не стал бы использовать AtomicLong для этого счетчика, потому что вы не получаете выгоды от безблокировочного поведения AtomicLong. Просто используйте вместо этого java.lang.Long, потому что вам в любом случае потребуется синхронизировать объект счетчика для логики wait () / notify ().

person Palamino    schedule 21.01.2015
comment
Спасибо, что указали на сравнение ссылок на объекты. Однако решение все еще не работает. - person user340; 21.01.2015
comment
Считаете ли вы, что замок al устойчив, как замок? Код переходит в бесконечный цикл в цикле while - person user340; 21.01.2015
comment
У вас может быть ситуация, когда счетчик уменьшается слишком часто. Измените условие while на: while (al.longValue () ›0L) И условие if на: if (al.decrementAndGet ()‹ = 0L) - person Palamino; 21.01.2015
comment
Также убедитесь, что вы создаете экземпляр AtomicLong только один раз для потока подключения. Каждое соединение должно увеличивать / уменьшать один и тот же экземпляр AtomicLong, иначе они будут блокировать экземпляры разностных объектов, что приведет к бесконечному циклу, как вы видите. - person Palamino; 21.01.2015
comment
Операции ›&‹ = тоже работают. Однако оба метода работают только для трех потоков. За это время в бесконечный цикл переходит более трех программ. В чем может быть проблема? Далее я использовал conectionID как блокировку для синхронизации и ожидания - person user340; 22.01.2015
comment
Если conectionID определен как java.lang.Integer или java.lang.Long, у вас может быть некоторый код, который автоматически распаковывает значение в собственное целое или длинное значение, которое позже автоматически упаковывается обратно в другой java.lang.Integer или экземпляр java.lang.Long. - person Palamino; 22.01.2015