Насколько потокобезопасность - это слишком?

Недавно я прочитал Параллелизм в Java на практике - отличная книга. Если вы думаете, что знаете, как работает параллелизм, но в большинстве случаев сталкиваетесь с реальными проблемами, и вам кажется, что SWAG - это все, что вы можете сделать, то эта книга, безусловно, прольет свет на эту тему. Страшно, как много всего может пойти не так, когда вы пытаетесь обмениваться данными между потоками. Думаю, это заставило меня немного сходить с ума по безопасности потоков. Теперь меня беспокоит то, что из-за слишком большой синхронизации я могу столкнуться с некоторыми проблемами живучести. Вот фрагмент кода для иллюстрации:

   private final Hashtable<String, AtomicInteger> userSessions =
new Hashtable<String, AtomicInteger>();

   public void registerUser(String userLogin) {
       synchronized(userSessions) {
           AtomicInteger sessionCount = userSessions.get(userLogin);
           if (sessionCount != null) {
               sessionCount.incrementAndGet();
           } else {
               userSessions.put(userLogin, new AtomicInteger(1));
           }
       }
   }

   public void unregisterUser(String userLogin) {
       synchronized(userSessions) {
           AtomicInteger sessionCount = userSessions.get(userLogin);
           if (sessionCount != null) {
               sessionCount.decrementAndGet();
           }
       }
   }

   public boolean isUserRegistered(String userLogin) {
       synchronized(userSessions) {
           AtomicInteger sessionCount = userSessions.get(userLogin);
           if (sessionCount == null) {
               return false;
           }
           return sessionCount.intValue() > 0;
       }
   }

Я попытался сделать все правильно: синхронизированная коллекция, созданная в статическом разделе и сохраненная в статической конечной ссылке для безопасной публикации, блокировка коллекции (вместо this - чтобы я не блокировал весь класс, в котором живет код) и использование атомарные классы-оболочки для примитивов. В книге упоминается, что чрезмерное усердие также может вызвать проблемы, но, похоже, мне нужно еще немного времени, чтобы полностью осознать это. Как сделать этот код потокобезопасным и убедиться, что он не страдает от проблем с живостью и производительностью?

РЕДАКТИРОВАТЬ: превратили его в методы и переменные экземпляра, изначально все было объявлено как статическое - плохой, плохой дизайн. Также сделал userSessions приватным (как-то раньше я оставил его публичным).


person lukem00    schedule 30.11.2009    source источник
comment
Отличный вопрос. Я уверен, что вы можете добиться правильной синхронизации, не объявляя таблицу статической. Есть ли причина с точки зрения синхронизации, почему вы выбрали статику?   -  person Buhb    schedule 30.11.2009
comment
Однако пример немного скудный. Как часто в реальном мире может происходить, чтобы один и тот же пользователь одновременно входил в систему из двух мест?   -  person BalusC    schedule 30.11.2009
comment
@ unknown-google: Вы правы, _userSessions не обязательно должен быть статичным с точки зрения синхронизации, я думаю, это просто пример плохого дизайна, как, я уверен, многие скажут;)   -  person lukem00    schedule 30.11.2009
comment
@BalusC: На самом деле это отрывок из реального приложения. Другое дело, хороший это дизайн или нет. Это в основном предназначено для ситуации, когда пользователь, у которого уже открыт сеанс, входит в систему с другого устройства - затем он подключается к существующему сеансу или для него создается новый, в зависимости от обстоятельств. Если вы задумаетесь о том, как работает RDP, возможно, на самом деле не так уж и странно разрешить одному и тому же пользователю иметь одновременные сеансы.   -  person lukem00    schedule 30.11.2009
comment
@ lukem00: правда, но я говорю об одновременности до некоторой степени, так что синхронизация действительно очень нужна. Я не думаю, что это произойдет в реальном мире. Или для входа в систему должно потребоваться 2 секунды времени вместо 2 микросекунд.   -  person BalusC    schedule 30.11.2009
comment
@BalusC: Возможно, я не совсем понимаю, что вы имеете в виду прямо сейчас, но я определенно могу представить себе ситуацию, когда два разных пользователя пытаются войти в систему одновременно, и это уже похоже на ситуацию (учитывая, что каждое соединение обрабатывается отдельный поток), когда доступ к userSessions требуется более чем одному потоку, верно?   -  person lukem00    schedule 30.11.2009
comment
@ lukem00: вас интересует только количество сеансов на пользователя. Вас не интересует общее количество сеансов или около того, в которых задействованы все пользователи, при этом у вас гораздо больше шансов на одновременный доступ.   -  person BalusC    schedule 30.11.2009
comment
Незначительный комментарий: оставление userSessions общедоступным подвергает вас риску того, что вызывающие абоненты могут случайно избежать вашей синхронизации. Да, HashTable синхронизируется, но только для атомарных операций, поэтому дезинформированный вызывающий может быть в середине некоторого составного действия «тест и действие», когда другой вызывающий объект обращается к одному из зарегистрированных методов. Простое решение: сделайте его приватным.   -  person CPerkins    schedule 30.11.2009
comment
@CPerkins: Вы абсолютно правы с этим, на самом деле, в моем коде он частный. Думаю, я как бы вставил его сюда, пока еще не знал, что собираюсь полностью инкапсулировать его. Тем не менее, спасибо за наблюдение.   -  person lukem00    schedule 30.11.2009


Ответы (5)


Используйте ConcurrentHashMap, чтобы можно было использовать putIfAbsent. Вам не нужно AtomicInteger код для синхронизации.

   public final ConcurrentMap<String, AtomicInteger> userSessions =
       new ConcurrentHashMap<String, AtomicInteger>();

   public void registerUser(String userLogin) {
       AtomicInteger newCount = new AtomicInteger(1);
       AtomicInteger oldCount = userSessions.putIfAbsent(userLogin, newCount);
       if (oldCount != null) {
           oldCount.incrementAndGet();
       }
   }

   public void unregisterUser(String userLogin) {
       AtomicInteger sessionCount = userSessions.get(userLogin);
       if (sessionCount != null) {
           sessionCount.decrementAndGet();
       }
   }

   public boolean isUserRegistered(String userLogin) {
       AtomicInteger sessionCount = userSessions.get(userLogin);
       return sessionCount != null && sessionCount.intValue() > 0;
   }

Обратите внимание, это утечка ...

Попытка получить версию без утечек:

   public final ConcurrentMap<String, Integer> userSessions =
       new ConcurrentHashMap<String, Integer>();

   public void registerUser(String userLogin) {
       for (;;) {
           Integer old = userSessions.get(userLogin);
           if (userSessions.replace(userLogin, old, old==null ? 1 : (old+1)) {
                break;
           }
       }
   }
   public void unregisterUser(String userLogin) {
       for (;;) {
           Integer old = userSessions.get(userLogin);
           if (old == null) {
               // Wasn't registered - nothing to do.
               break;
           } else if (old == 1) {
               // Last one - attempt removal.
               if (userSessions.remove(userLogin, old)) {
                   break;
               }
           } else {
               // Many - attempt decrement.
               if (userSessions.replace(userLogin, old, old-1) {
                   break;
               } 
           }
       }
   }
   public boolean isUserRegistered(String userLogin) {serLogin);
       return userSessions.containsKey(userLogin);
   }
person Tom Hawtin - tackline    schedule 30.11.2009
comment
rsp: Это действительно не имеет значения. Преодолей это! Вы можете написать код, который пытается этого не делать, но он может быть не быстрее (хотя и будет более раздутым). - person Tom Hawtin - tackline; 30.11.2009
comment
Вы бы почувствовали себя лучше, если бы мы встроили переменную newCount? - person Adriaan Koster; 30.11.2009
comment
@Adriaan, нет, я бы не стал чувствовать себя лучше (это был вопрос с подвохом, верно?) @Tom шаблон кода, который показал OP, мне не кажется раздутым. Что касается чистого кода; нечего переживать. - person rsp; 30.11.2009
comment
Исходный код использует блокировки, так что его производительность в любом случае не будет хорошей. - person Tom Hawtin - tackline; 30.11.2009
comment
@Tom Hawtin - tacklin: Чем лучше запускать инструкции в цикле, пока он не заработает, по сравнению с использованием механизма блокировки? Связано ли это с тем, что другой поток вряд ли что-то изменит в середине метода? Он работает лучше? Стоит ли торговать на читабельности? Как вы думаете, можно было бы добавить к своему ответу еще какие-нибудь пояснения? Я уверен, что те, кто менее знаком с параллелизмом, могли бы воспользоваться вашими комментариями. - person lukem00; 01.12.2009
comment
@ lukem00: такой цикл является стандартной методологией для алгоритмов без блокировки - person Jason S; 01.12.2009
comment
lukem00: В моем первом примере registerUser имеет две независимые атомарные операции. Первая попытка вставить запись на карту. Если для этого ключа заранее не было сопоставления, то все готово. Если было сопоставление, значит, мы не меняли карту, и все, что осталось сделать, это обновить счетчик атомарно. Есть утечка, потому что, когда unregisterUser сбрасывает счетчик до нуля, запись карты все еще существует. - person Tom Hawtin - tackline; 01.12.2009
comment
lukem00: Для получения блокировки действительно требуется цикл. Алгоритм без блокировок должен выполнить цикл максимум один раз для другого потока , который выполняет работу (технически это может быть так же плохо, как O (n ^ 2) для n потоков). Алгоритмы блокировки / имеют тенденцию / работать лучше, чем алгоритмы блокировки (для быстрых операций). Стоит ли это, зависит от ситуации. - person Tom Hawtin - tackline; 01.12.2009
comment
@Tom Hawtin - tacklin: Должен признаться, когда я впервые увидел твой ответ, он меня полностью озадачил. Это было похоже на один из тех моментов, которые я знаю, что ничего не знаю. Так что я потратил некоторое время, чтобы проанализировать ваши отрывки и немного почитать справочную информацию, но, боюсь, я все еще далек от просветления. Я понимаю, что могу использовать метод putIfAbsent ConcurrentHashMap и что AtomicInteger не требует дополнительной синхронизации, но для работы registerUser по-прежнему должен быть атомарным, верно? Если да, то ваша первая версия кажется неработающей. Это то, что вы имели в виду, когда сказали, что утечка? - person lukem00; 01.12.2009
comment
О, теперь я понимаю вашу точку зрения на утечку. Ну, если честно, я полагаю, что я просто ленился в этой части, потому что я думал, что это не будет иметь большого значения с очень ограниченным количеством пользователей, которые у меня есть. Я совершенно не должен был так оставлять! Мне стыдно. Спасибо, что указали на это. - person lukem00; 01.12.2009
comment
Кстати. спасибо за дополнительную информацию. Теперь, когда вы его написали, кажется очевидным, что ваша реализация registerUser с независимыми атомарными операциями работает нормально. Не знаю, почему я думал, что с этим что-то может пойти не так. Думаю, мне просто некомфортно, когда другие потоки что-либо делают с моими данными в середине операции, даже если они не обязательно что-то ломают. - person lukem00; 01.12.2009
comment
@ TomHawtin-tackline У меня вопрос по безопасности потоков здесь, так что хотел узнать, сможешь ли ты мне помочь? Застрял на этом какое-то время. - person john; 30.10.2017

Прежде всего: не используйте Hashtable! Он старый и очень медленный.
Дополнительно: синхронизация на более низком уровне не требуется, если вы уже синхронизируете на более высоком уровне (это верно и для элемента AtomicInteger).

Я вижу здесь разные подходы в зависимости от того, какой вариант использования здесь нужен.

Подход чтения / записи

Предполагая, что вы вызываете метод isUserRegistered очень часто, а другие методы только время от времени, хорошим способом является блокировка чтения-записи: разрешено иметь несколько операций чтения одновременно, но только одна блокировка записи, чтобы управлять ими всеми. (может быть получено только в том случае, если не требуется другой блокировки).

private static final Map<String, Integer> _userSessions =
  new HashMap<String, Integer>();

private ReadWriteLock rwLock =
  new ReentrantReadWriteLock(false); //true for fair locks

public static void registerUser(String userLogin) {
  Lock write = rwLock.writeLock();
  write.lock();
  try {
       Integer sessionCount = _userSessions.get(userLogin);
       if (sessionCount != null) {
           sessionCount = Integer.valueOf(sessionCount.inValue()+1);
       } else {
           sessionCount = Integer.valueOf(1)
       }
       _userSessions.put(userLogin, sessionCount);
   } finally {
     write.unlock();
   }
}

public static void unregisterUser(String userLogin) {
  Lock write = rwLock.writeLock();
  write.lock();
  try {
       Integer sessionCount = _userSessions.get(userLogin);
       if (sessionCount != null) {
           sessionCount = Integer.valueOf(sessionCount.inValue()-1);
       } else {
           sessionCount = Integer.valueOf(0)
       }
       _userSessions.put(userLogin, sessionCount);
   } finally {
     write.unlock();
   }
}

public static boolean isUserRegistered(String userLogin) {
  boolean result;

  Lock read = rwLock.readLock();
  read.lock();
  try {
       Integer sessionCount = _userSessions.get(userLogin);
       if (sessionCount != null) {
           result = sessionCount.intValue()>0
       } else {
           result = false;
       }
   } finally {
     read.unlock();
   }

   return false;
}

Плюсы: просто понять
Минусы: не будет масштабироваться, если методы записи вызываются часто.

Подход небольших атомарных операций

Идея состоит в том, чтобы делать маленькие шаги, которые все атомарны. В любом случае это приведет к очень хорошей производительности, но здесь есть много скрытых ловушек.

public final ConcurrentMap<String, AtomicInteger> userSessions =
   new ConcurrentHashMap<String, AtomicInteger>();
//There are other concurrent Maps for different use cases

public void registerUser(String userLogin) {
  AtomicInteger count;
  if (!userSession.containsKey(userLogin)){
    AtomicInteger newCount = new AtomicInteger(0);
    count = userSessions.putIfAbsent(userLogin, newCount);
    if (count == null){
      count=newCount;
    }
    //We need ifAbsent here, because another thread could have added it in the meantime
  } else {
    count = userSessions.get(userLogin);
  }
  count.incrementAndGet();
}

public void unregisterUser(String userLogin) {
  AtomicInteger sessionCount = userSessions.get(userLogin);
  if (sessionCount != null) {
    sessionCount.decrementAndGet();
  }
}

public boolean isUserRegistered(String userLogin) {
  AtomicInteger sessionCount = userSessions.get(userLogin);
  return sessionCount != null && sessionCount.intValue() > 0;
}

Плюсы: очень хорошо масштабируется
Минусы: не интуитивно понятен, быстро будет сложным, не всегда возможно, много скрытых ловушек

Подход "Блокировка на пользователя"

Это создаст блокировки для разных пользователей при условии, что существует много разных пользователей. Вы можете создавать блокировки или мониторы с некоторыми небольшими атомарными операциями и блокировать их вместо полного списка.
Это было бы излишним для этого небольшого примера, но для очень сложных структур это может быть элегантным решением.

person Hardcoded    schedule 30.11.2009
comment
У второго registedUser баг. - person Tom Hawtin - tackline; 30.11.2009
comment
(И эта блокировка r / w, вероятно, неуместна.) - person Tom Hawtin - tackline; 30.11.2009
comment
(И переменная карты должна быть объявлена ​​как ConcurrentMap для putIfAbsent.) - person Tom Hawtin - tackline; 30.11.2009
comment
Ты прав. Исправлено (надеюсь). ;-) Спасибо за обзор. - person Hardcoded; 30.11.2009
comment
Вот что я имел в виду, говоря о скрытых ловушках: большой блок синхронизации легко поддерживать, даже если он плохо масштабируется. Работать с кучей атомарных операций всегда сложно. - person Hardcoded; 30.11.2009

Судя по вашему коду, вашей синхронизации на _userSessions должно быть достаточно, потому что вы не открываете AtomicInteger объекты.

Дополнительная безопасность, предлагаемая AtomicInteger, в этом случае не нужна, поэтому, по сути, вы используете ее здесь как изменяемую Integer. Вы можете разместить вложенный статический класс, содержащий счетчик сеансов, в качестве единственного атрибута на карте, если вас беспокоят дополнительные накладные расходы в AtomicInteger (или немного уродливее: добавьте на карту int [1], если они не выставлен за пределами этого класса.)

person rsp    schedule 30.11.2009
comment
@rsp: Я действительно ожидал, что AtomicInteger будет здесь несущественным, но, как вы и сказали, я подумал, что это сделает удобное изменяемое целое число. Я предполагаю, что здесь он может передать неправильное сообщение - например, настоящую причину его использования. Как вы думаете, было бы лучше (даже не учитывая возможные накладные расходы) заменить его собственным статическим вложенным классом? - person lukem00; 30.11.2009
comment
Поскольку вложенный класс может быть тривиально маленьким, я, вероятно, пошел бы по этому пути, и, как упоминалось в Hardcoded, я буду использовать HashMap вместо Hashtable. (Такой код часто бывает более элегантным и производительным, просто делая это простым способом.) - person rsp; 30.11.2009

Хорошая книга, недавно сам читал.

В приведенном выше коде единственное замечание, которое у меня есть, это то, что AtomicInteger не нужен в синхронизированном блоке, но я сомневаюсь, что производительность будет заметна.

Лучший способ отслеживать производительность - это протестировать. Настройте автоматизированный интегрированный нагрузочный тест для ключевых областей вашего фреймворка и отслеживайте производительность. Нагрузочный тест, если он содержит достаточно широкие временные окна и богатое использование рабочего процесса, также может выявить любые тупиковые ситуации, которые вы создали.

Хотя может показаться, что тупиковых ситуаций легко избежать, они могут легко проявиться в довольно простом шаблоне рабочего процесса.

Класс A блокирует ресурсы, затем вызывает B (может быть так же просто, как get / set), который также блокирует ресурс. Другой поток вызывает B, который блокирует ресурсы, а затем вызывает A, вызывая тупик.

При работе с богатой структурой полезно наметить рабочий процесс, чтобы увидеть, как взаимодействуют классы. Возможно, вы сможете обнаружить этот тип проблемы. Однако с действительно большими рамками они могут проскользнуть мимо. Лучшая защита, которую я нашел, - это изолировать блокировки до минимально возможной области и внимательно следить за вызовом вне класса, находясь внутри синхронизированного блока. Также помогает создание значительного количества нагрузочных тестов.

person Jim Rush    schedule 30.11.2009

Я наткнулся на этот давний вопрос в поисках совета о том, как создать то, что можно было бы назвать «картой параллельного подсчета», - в частности, поиск использования ConcurrentHashMap с AtomicInteger.

Вот модифицированная версия ответа с наивысшей оценкой, в котором используется AtomicInteger и не происходит утечек. В моем (ограниченном) тестировании это кажется намного быстрее, чем версия Integer. Я также отмечу, что использование ConcurrentMap.get() до ConcurrentMap.putIfAbsent(), похоже, значительно экономит время.

private final ConcurrentMap<String, AtomicInteger> userSessions =
   new ConcurrentHashMap<String, AtomicInteger>();

public void registerUser(String userLogin) {
    AtomicInteger oldCount = userSessions.get(key);
    if(oldCount!=null && getAndIncrementIfNonZero(oldCount)>0) return;
    AtomicInteger newCount = new AtomicInteger(1);
    while(true) {
        oldCount = userSessions.putIfAbsent(key, newCount);
        if(oldCount==null) return;
        if(getAndIncrementIfNonZero(oldCount)>0) return;
    }
}

public void unregisterUser(String userLogin) {
   AtomicInteger sessionCount = userSessions.get(userLogin);
   if (sessionCount != null) {
       int endingCount = sessionCount.decrementAndGet();
       if(endingCount==0) userSessions.remove(userLogin);
   }
}

public boolean isUserRegistered(String userLogin) {
   AtomicInteger sessionCount = userSessions.get(userLogin);
   return sessionCount != null && sessionCount.intValue() > 0;
}

private static int getAndIncrementIfNonZero(AtomicInteger ai) {
    while(true) {
        int current = ai.get();
        if(current==0) return 0;
        if(ai.compareAndSet(current, current+1)) return current;
    }
}

Скорость может быть не так важна для исходной задачи плаката, но другие применения такой «карты счета» могут выиграть от эффективности.

person Robert Tupelo-Schneck    schedule 06.05.2012