Поточно-безопасная сериализуемая коллекция с атомарной заменой

Я столкнулся с проблемой в моей программе, когда несколько потоков обращаются к одному и тому же серверу через RMI. Сервер содержит список в качестве кеша и выполняет некоторые дорогостоящие вычисления, иногда изменяя этот список. После завершения вычислений список будет сериализован и отправлен клиенту.

Первая проблема: если список изменяется во время сериализации (например, другим клиентом, запрашивающим некоторые данные), (вероятно) выбрасывается ConcurrentModificationException, что приводит к EOFException для вызова RMI/десериализации на клиенте -боковая сторона.

Поэтому мне нужна какая-то структура списка, которая стабильна для сериализации, но, возможно, изменяется другим потоком.

Решения, которые мы пробовали:

  • обычный ArrayList/Set - не работает из-за параллелизма
  • глубокое копирование всей структуры перед каждой сериализацией - слишком дорого
  • CopyOnWriteArrayList - тоже дорого, так как копирует список и

раскрывая Вторую проблему: нам нужно иметь возможность атомарно заменить любой элемент в списке, который в настоящее время не является потокобезопасным (сначала удалить, затем добавить (что еще дороже)) или выполнимо только с помощью блокирует список и, следовательно, выполняет только разные потоки последовательно.

Поэтому мой вопрос таков:

Знаете ли вы о Collection реализации, которая позволяет нам serialize сделать Коллекцию потокобезопасной, в то время как другие потоки модифицируют ее and, которая содержит некоторые элементы atomically replacing?

Бонусом было бы, если бы список not должен был быть copied перед сериализацией! Создание моментального снимка для каждой сериализации было бы нормально, но все равно :/

Иллюстрация проблемы (C=вычислить, A=добавить в список, R=удалить из списка, S=сериализовать)

Thread1 Thread2
      C 
      A
      A C
      C A
      S C
      S R <---- Remove and add have to be performed without Thread1 serializing 
      S A <---- anything in between (atomically) - and it has to be done without 
      S S       blocking other threads computations and serializations for long
        S       and not third thread must be allowed to start serializing in this
        S       in-between state
        S

person luk2302    schedule 28.09.2015    source источник
comment
Когда вы удаляете, а затем добавляете, обязательно ли, чтобы замена была в конце списка?   -  person Paul Boddington    schedule 28.09.2015
comment
@PaulBoddington Нет, порядок не имеет значения, мы также можем использовать набор.   -  person luk2302    schedule 28.09.2015
comment
Что вы имеете в виду под удалением? Вы удаляете элемент с известным индексом (например, remove(index)) или используете поиск и удаление (например, remove(Object))?   -  person Tagir Valeev    schedule 28.09.2015
comment
@TagirValeev replaceWithObj(Object, Object), не по индексу, а путем проверки каждого объекта на равенство. И в настоящее время, позвонив remove(Object), а затем add(slightlyDifferentObj)   -  person luk2302    schedule 28.09.2015


Ответы (2)


Самым простым решением будет внешняя синхронизация с ArrayList, возможно, через блокировку чтения-записи, например:

public class SyncList<T> implements Serializable {
    private static final long serialVersionUID = -6184959782243333803L;

    private List<T> list = new ArrayList<>();
    private transient Lock readLock, writeLock;

    public SyncList() {
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        readLock = readWriteLock.readLock();
        writeLock = readWriteLock.writeLock();
    }

    public void add(T element) {
        writeLock.lock();
        try {
            list.add(element);
        } finally {
            writeLock.unlock();
        }
    }

    public T get(int index) {
        readLock.lock();
        try {
            return list.get(index);
        } finally {
            readLock.unlock();
        }
    }

    public String dump() {
        readLock.lock();
        try {
            return list.toString();
        } finally {
            readLock.unlock();
        }
    }

    public boolean replace(T old, T newElement) {
        writeLock.lock();
        try {
            int pos = list.indexOf(old);
            if (pos < 0)
                return false;
            list.set(pos, newElement);
            return true;
        } finally {
            writeLock.unlock();
        }
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        readLock.lock();
        try {
            out.writeObject(list);
        } finally {
            readLock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    private void readObject(ObjectInputStream in) throws IOException,
            ClassNotFoundException {
        list = (List<T>) in.readObject();
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        readLock = readWriteLock.readLock();
        writeLock = readWriteLock.writeLock();
    }
}

Предоставьте любые операции, которые вам нравятся, просто правильно используйте либо блокировку чтения, либо блокировку записи.

person Tagir Valeev    schedule 28.09.2015
comment
Спасибо за ответ, завтра попробую на работе. Использование двух отдельных замков для чтения и записи выглядит многообещающе. Я должен буду проверить, удовлетворяет ли это все мои потребности! - person luk2302; 28.09.2015
comment
Но, учитывая информацию, раскрытую в комментариях («мы могли бы использовать и Set»), замена ArrayList на HashSet очень рекомендуется, так как это позволяет избавиться от линейного поиска в методе replace. - person Holger; 29.09.2015

Моя неправильная первоначальная мысль заключалась в том, что CopyOnWriteArrayList была плохой идеей, поскольку она копирует все. Но, конечно, он выполняет только поверхностную копию, копируя только ссылки, а не глубокую копию, копируя также все объекты.

Поэтому мы явно выбрали CopyOnWriteArrayList, потому что он уже предлагал множество необходимых функций. Единственной оставшейся проблемой была replace, которая стала еще более сложной, чтобы быть addIfAbsentOrReplace.

Мы попробовали CopyOnWriteArraySet, но это не соответствовало нашим потребностям, потому что предлагало только addIfAbsent. Но в нашем случае у нас был экземпляр класса C с именем c1, который нам нужно было сохранить, а затем заменить обновленным новым экземпляром c2. Конечно, мы перезаписываем equals и hashCode. Теперь нам нужно было выбрать, хотим ли мы, чтобы равенство возвращало true или false для двух минимально разных объектов. Оба варианта не сработали, т.к.

  • true будет означать, что объекты одинаковые, и набор даже не удосужится добавить новый объект c2, потому что c1 уже есть в наборе.
  • false будет означать, что c2 будет добавлено, но c1 не будет удалено.

Поэтому CopyOnWriteArrayList. Этот список уже предлагает

public void replaceAll(UnaryOperator<E> operator) { ... }

что несколько соответствует нашим потребностям. Это позволяет нам заменить нужный объект с помощью пользовательского сравнения.

Мы использовали его следующим образом:

protected <T extends OurSpecialClass> void addIfAbsentOrReplace(T toAdd, List<T> elementList) {
    OurSpecialClassReplaceOperator<T> op = new OurSpecialClassReplaceOperator<>(toAdd);
    synchronized (elementList) {
        elementList.replaceAll(op);
        if (!op.isReplaced()) {
            elementList.add(toAdd);
        }
    }
}

private class OurSpecialClassReplaceOperator<T extends OurSpecialClass> implements UnaryOperator<T> {

    private boolean replaced = false;

    private T toAdd;

    public OurSpecialClassReplaceOperator(T toAdd) {
        this.toAdd = toAdd;
    }

    @Override
    public T apply(T toAdd) {
        if (this.toAdd.getID().equals(toAdd.getID())) {
            replaced = true;
            return this.toAdd;
        }

        return toAdd;
    }

    public boolean isReplaced() {
        return replaced;
    }
}
person luk2302    schedule 26.12.2015