Менеджер ресурсов с ReentrantLocks

Я пытаюсь реализовать класс обработчика ресурсов, который назначает ресурсы (строки, хранящиеся в массиве) нескольким клиентам, которые могут попытаться получить блокировку набора ресурсов и разблокировать их с помощью идентификатора, заданного методом блокировки.

Я пытаюсь использовать справедливые ReentrantReadWriteLock-ы, по одному для каждого ресурса.

Я вижу только журнал клиента.

Есть несколько проблем: иногда поток не прекращает запрашивать и получать ресурсы, иногда возникает взаимоблокировка, а иногда не удается выполнить releaseLock. Любые советы приветствуются.

public class ResHandler {

//ID-s of the granted resource lists
private static long lockNum = 0;

//Resources are identified by strings, each client has a list of demanded resources
//we store these when granted, along with an ID
private static ConcurrentHashMap<Long, Set<String>> usedResources 
    = new ConcurrentHashMap<Long, Set<String>>();

//We store a lock for each resource
private static ConcurrentHashMap<String, ReentrantReadWriteLock> resources 
    = new ConcurrentHashMap<String, ReentrantReadWriteLock>();

//Filling our resources map with the resources and their locks
static {
    for (int i = 0; i < SharedValues.RESOURCE_LIST.length; ++i) {
        String res = SharedValues.RESOURCE_LIST[i];
        //Fair reentrant lock
        ReentrantReadWriteLock lc = new ReentrantReadWriteLock(true);
        resources.put(res, lc);
    }
}

//We get a set of the required resources and the type of lock we have to use
public static long getLock(Set<String> mNeededRes, boolean mMethod) {
    //!!!
    if (mMethod == SharedValues.READ_METHOD) {

        //We try to get the required resources
        for (String mn : mNeededRes)
            resources.get(mn).readLock().lock();

        //After grandted, we put them in the usedResources map
        ++lockNum;
        usedResources.put(lockNum, mNeededRes);
        return lockNum;         
    }

    //Same thing, but with write locks
    else {

        for (String mn : mNeededRes)
            resources.get(mn).writeLock().lock();

        ++lockNum;
        usedResources.put(lockNum, mNeededRes);
        return lockNum;         
    }
}

//Releasing a set of locks by the set's ID
public static void releaseLock(long mLockID) {
    if (!usedResources.containsKey(mLockID)) {
        System.out.println("returned, no such key as: " + mLockID);
        return; 
    }

    Set<String> toBeReleased = usedResources.get(mLockID);

    //Unlocking every lock from this set
    for (String s : toBeReleased) {
        if (resources.get(s).isWriteLockedByCurrentThread())
            resources.get(s).writeLock().unlock();
        else 
            resources.get(s).readLock().unlock();
    }

    //Deleting from the map
    usedResources.remove(mLockID);
}   
}

person user1875619    schedule 27.04.2013    source источник
comment
Есть одна вещь, которая меня озадачивает: строки неизменяемы в Java. Другими словами, блокировка или отсутствие блокировки, вы все равно не можете писать в строку! Но в любом случае, давайте предположим, что это просто примеры. В этом случае важно, чтобы все блокировки были получены в одном и том же порядке, чтобы избежать взаимоблокировок, убедитесь, что Set ‹› гарантирует это. Затем вы можете сохранить не только идентификатор блокировки, но и идентификатор потока, которому они принадлежат. Таким образом вы можете убедиться, что поток не выпускает ничего, чем он не владеет, и что поток не блокируется снова перед освобождением, что может снова заблокироваться из-за порядка.   -  person Ulrich Eckhardt    schedule 27.04.2013
comment
Строки не изменяются, они представляют только ресурсы, клиенты просто ждут их. Я попробовал ваши предложения, и вместо наборов я использую вектор для хранения имен ресурсов и пар блокировок, а другой вектор - для хранения выделений, поэтому блокировки приобретаются в том же порядке. При выпуске я тоже проверяю нить. Я также сделал lockNum изменчивым. Проблемы все еще сохраняются.   -  person user1875619    schedule 27.04.2013
comment
Я не думаю, что использование вектора помогает, потому что он точно не отсортирован. Хуже того, я считаю, что в наборе у вас есть гарантия, что нет дубликатов, а в векторе - нет. Тем не менее, в коде есть условие гонки с lockNum: вы увеличиваете его, второй поток увеличивает его, и оба затем используют одно и то же значение. volatile здесь не помогает, возможно, синхронизация функции поможет. Кстати: использование единой блокировки для всего управления ресурсами решит вашу проблему, хотя вам придется реализовать несколько функций мелкозернистой блокировки самостоятельно.   -  person Ulrich Eckhardt    schedule 27.04.2013
comment
Хорошо, я немного поправил и заметил, что клиенты могут ждать произвольное время, которое может быть равно 0, а затем ждать бесконечно. В этом случае ресурсы все еще заблокированы, несмотря на ReentrantReadWriteLocks, и они никогда не теряют время ожидания, когда они являются последним живым потоком. Как я могу помочь в этом в моем классе?   -  person user1875619    schedule 28.04.2013
comment
Я не уверен, что понимаю тебя. На всякий случай, если потоки не возвращают ресурсы до завершения, любой другой поток, ожидающий этого ресурса, будет заблокирован навсегда. Ничто в вашем дизайне этому не мешает, вам нужно, чтобы клиенты вели себя правильно. Тем не менее, можете ли вы обновить код, чтобы отразить текущее состояние? Наконец, вы слишком хорошо относитесь к клиентам, которые хотят вернуть ресурсы, которыми они не владеют, бросьте исключение!   -  person Ulrich Eckhardt    schedule 28.04.2013


Ответы (4)


В вашей программе происходит несколько проблем, которые являются причиной зависаний и ошибок:

  • В общем: объявите глобальные переменные окончательными. Не стоит с ними случайно связываться. Вдобавок это позволяет использовать их в качестве объектов синхронизации.

  • Не гарантируется, что long будет атомарным, как и оператор ++. 32-разрядная JVM должна записать это в два этапа и, следовательно, теоретически может вызвать серьезный сбой в вашей системе. Лучше использовать AtomicLong.

  • GetLock не является потокобезопасным. Пример:

Поток A вызывает getLock для ресурса 1, 3, 5
Поток B вызывает getLock одновременно для ресурса 2,5,3
Потоку A предоставляется блокировка на 1, 3, затем он ставится на паузу
Поток B получает блокировку на 2, 5, затем ставится на паузу
Поток A теперь ожидает 5 от потока B, а поток B теперь ожидает 3 от потока A.
Тупик.

Обратите внимание, что метод выпуска не требует синхронизации, так как он не может блокировать другие потоки.

  • ++lockNum приведет к тому, что два потока испортят свое значение блокировки, если будут вызваны одновременно, поскольку это глобальная переменная.

Вот код в рабочем состоянии:

  private static final AtomicLong lockNum = new AtomicLong(0);
  private static final ConcurrentHashMap<Long, Set<String>> usedResources = new ConcurrentHashMap<Long, Set<String>>();
  private static final ConcurrentHashMap<String, ReentrantReadWriteLock> resources = new ConcurrentHashMap<String, ReentrantReadWriteLock>();

  static {
    for (int i = 0; i < SharedValues.RESOURCE_LIST.length; ++i) {
      String res = SharedValues.RESOURCE_LIST[i];
      ReentrantReadWriteLock lc = new ReentrantReadWriteLock(true);
      resources.put(res, lc);
    }
  }

  public static long getLock(Set<String> mNeededRes, boolean mMethod) {
    synchronized (resources) {
      if (mMethod == SharedValues.READ_METHOD) {
        for (String mn : mNeededRes) {
          resources.get(mn).readLock().lock();
        }
      } else {
        for (String mn : mNeededRes) {
          resources.get(mn).writeLock().lock();
        }
      }
    }
    final long lockNumber = lockNum.getAndIncrement();
    usedResources.put(lockNumber, mNeededRes);
    return lockNumber;
  }

  public static void releaseLock(final long mLockID) {
    if (!usedResources.containsKey(mLockID)) {
      System.out.println("returned, no such key as: " + mLockID);
      return;
    }

    final Set<String> toBeReleased = usedResources.remove(mLockID);

    for (String s : toBeReleased) {
      final ReentrantReadWriteLock lock = resources.get(s);
      if (lock.isWriteLockedByCurrentThread()) {
        lock.writeLock().unlock();
      } else {
        lock.readLock().unlock();
      }
    }
  }
person TwoThe    schedule 13.10.2013

Я предполагаю, что разные клиенты могут вызывать getLock из разных потоков. Если это так, то первая проблема заключается в том, что доступ к lockNum не синхронизирован. Два потока могут вызывать getLock одновременно, поэтому, в зависимости от времени, они оба могут закончить возвращение одного и того же номера блокировки. Это могло бы объяснить, почему блокировка разблокировки иногда не работает.

Если вы сможете решить эту проблему, вам будет легче понять, что еще происходит.

person matt helliwell    schedule 12.10.2013

Чтобы избежать взаимоблокировки, ваши ресурсы должны быть получены в том же порядке, поэтому вам нужно отсортировать Set<String> mNeededRes перед выполнением циклических блокировок. Метод сортировки особого значения не имеет.

Это подробно описано в Chapter10. Как избежать опасности для жизни от Java Concurrency In Practice Brian Göetz.

Я рекомендую вам удалить getLock и releaseLock или сделать их личными. И оберните всю свою логику в Runnable. Если вы контролируете все блокировки, вы не сможете забыть их разблокировать. Сделайте что-то вроде этого:

public void performMethod(List<String> mNeededRes, boolean mMethod, Runnable r){
    List sortd = Collections.sort(mNeededRes);
    try{
        getLock(mNeededRes, mMethod);
        r.run();
    }finally {
        releaseLock(mNeededRes);
    }
}
person Mikhail    schedule 13.10.2013
comment
да, но класс ResHandler должен содержать интерфейсы блокировки и освобождения. Задание включает его. - person Adam; 18.10.2013
comment
Тогда замки и выпуски нужно просто рассортировать. Это необходимо, чтобы избежать тупиковых ситуаций. Прочтите ссылку, которую я предоставил. - person Mikhail; 18.10.2013

Обновленное решение, попробуйте:

public class ResHandler {

private static AtomicLong lockNum = new AtomicLong(0);
private static Map<Long, Set<String>> usedResources = new ConcurrentHashMap<Long, Set<String>>();
private static final Map<String, ReentrantReadWriteLock> resources = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
// "priorityResources" to avoid deadlocks and starvation
private static final Map<String, PriorityBlockingQueue<Long>> priorityResources = new ConcurrentHashMap<String, PriorityBlockingQueue<Long>>();

static {
    for (int i = 0; i < SharedValues.RESOURCE_LIST.length; ++i) {
        String res = SharedValues.RESOURCE_LIST[i];
        ReentrantReadWriteLock lc = new ReentrantReadWriteLock(true);
        resources.put(res, lc);
        priorityResources.put(res, new PriorityBlockingQueue<Long>());
    }
}

public static long getLock(Set<String> mNeededRes, boolean mMethod) {
    long lockNumLocal = lockNum.addAndGet(1);
    for (String mn : mNeededRes) {
        priorityResources.get(mn).offer(lockNumLocal);
    }
    boolean tryLockResult;
    List<String> lockedList = new ArrayList<String>();
    boolean allLocked = false;
    while (!allLocked) {
        allLocked = true;
        for (String mn : mNeededRes) {
            if (lockedList.contains(mn) == true) {
                continue;//because we already have the lock
            }
            try {
                if (mMethod == SharedValues.READ_METHOD) {
                    tryLockResult = resources.get(mn).readLock().tryLock(1, TimeUnit.MILLISECONDS);
                } else {
                    tryLockResult = resources.get(mn).writeLock().tryLock(1, TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException ex) {
                Logger.getLogger(ResHandler.class.getName()).log(Level.SEVERE, null, ex);
                tryLockResult = false;
            }

            if (tryLockResult) {
                lockedList.add(mn);
            } else {
                allLocked = false;
                for (int i = lockedList.size() - 1; i >= 0; i--) {
                    //if the lock failed, all previous locked resources need to be released, but only if they will be used by higher priority lock operations
                    if (priorityResources.get(lockedList.get(i)).peek() != lockNumLocal) {
                        if (mMethod == SharedValues.READ_METHOD) {
                            resources.get(lockedList.get(i)).readLock().unlock();
                        } else {
                            resources.get(lockedList.get(i)).writeLock().unlock();
                        }
                        lockedList.remove(i);
                    }
                }
                break;
            }
        }
    }
    usedResources.put(lockNumLocal, mNeededRes);
    for (String mn : mNeededRes) {
        priorityResources.get(mn).remove(lockNumLocal);
    }
    return lockNumLocal;
}

public static void releaseLock(long mLockID) {
    if (!usedResources.containsKey(mLockID)) {
        System.out.println("returned, no such key as: " + mLockID);
        return;
    }

    Set<String> toBeReleased = usedResources.get(mLockID);

    //Unlocking every lock from this set
    for (String s : toBeReleased) {
        if (resources.get(s).isWriteLockedByCurrentThread()) {
            resources.get(s).writeLock().unlock();
        } else {
            resources.get(s).readLock().unlock();
        }
    }

    //Deleting from the map
    usedResources.remove(mLockID);
}

}

person Marcos Zolnowski    schedule 11.10.2013
comment
Я пробовал ваш код, но программа снова зависла и раньше, чем раньше. Вдобавок в большинстве случаев клиенты не получают требуемых ресурсов сейчас, а если они и получают, я даже не могу видеть прошедшее время, что я должен видеть. Вы пробовали свой код? Вы запускали программу? - person Adam; 12.10.2013
comment
Извините, это была дикая догадка. Глядя на это сейчас, да, у вас серьезный тупик. Поскольку вы пытаетесь заблокировать набор (а не отдельный элемент), они рано или поздно зайдут в тупик. - person Marcos Zolnowski; 14.10.2013
comment
Я пробовал новый код много раз, и программа зависала только один раз. Так что лучше, еще лучше, но не идеально :) Срок сдачи завтра. Я думаю, вы заработали очки, которые заслужили. - person Adam; 18.10.2013
comment
Вы попали в тупик. По причинам, описанным в моем ответе. - person Mikhail; 18.10.2013
comment
Я попробовал вашу тестовую среду (из другого вопроса), изменил методы wait() на Thread.sleep(), потому что все становилось странно. Но у меня мало времени, извините за дерьмовый код, мне еще многое нужно сделать, прежде чем он будет готов к использованию в производственной среде. - person Marcos Zolnowski; 18.10.2013