Устранение ошибки видимости и параллелизма потока Java с помощью вычислений карты

Я использую Java 8. У меня есть обработчик событий, который принимает события с высокой скоростью (n в секунду), и я хочу сбросить их в хранилище, когда их так много (в этом упрощенном примере 1000)

Есть ли у меня ошибка видимости в строке 25 myCache.get(event.getKey()).add(event.getBean());? Должен ли я синхронизироваться по методу handleEvent()?

public class myClass extends MySimpleEventHanlder {
    private Map<String, List<MyBean>> myCache;
    private ScheduledExecutorService scheduler;

    public void MyClass() {
        myCache = new ConcurrentHashMap<String, List<MyBean>>();
        scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> {

            for (Iterator<Map.Entry<String, List<MyBean>>> it = myCache.entrySet().iterator(); it.hasNext();) {
                Map.Entry<String, List<MyBean>> entry = it.next();
                if (entry.getValue().size() >= 1000) {
                    it.remove();
                    //do some more processing , flush to storage
                }
            }
        }, 0, 60, TimeUnit.SECONDS);
    }

    @Override
    public void handleEvent(Event event) {

        if (myCachetCache.containsKey(event.getKey())) {
            myCache.get(event.getKey()).add(event.getBean());
        }
        else{
            List<MyBean> beans = new ArrayList<MyBeans>();
            beans.add(event.getBean());
            myCache.put(event.key, beans);
        }
    }
}

person JavaHead    schedule 24.03.2020    source источник
comment
какая версия джавы? разве вы не можете использовать вычисления вместо изменения списка?   -  person Nathan Hughes    schedule 24.03.2020
comment
Натан Хьюз, я использую Java 8, что вы имеете в виду под использованием вычислений?   -  person JavaHead    schedule 24.03.2020
comment
проблема с видимостью - компилятор будет жаловаться! ... если вы имеете в виду проблему параллелизма - скорее нет, потому что это ConcurrentHashMap.   -  person xerx593    schedule 24.03.2020
comment
Compute — это метод CHM docs.oracle.com/javase/8/docs/api/java/util/concurrent/   -  person Nathan Hughes    schedule 24.03.2020
comment
Используется ли сам handleEvent несколькими потоками или у вас есть один поток диспетчеризации событий?   -  person Joni    schedule 24.03.2020


Ответы (1)


У вас определенно есть проблемы с видимостью: вы добавляете элементы в ArrayList в одном потоке и читаете size() из этого ArrayList в другом потоке без синхронизации между ними.

Другая проблема заключается в том, что ключ может быть удален между вызовами myCache.containsKey и myCache.get. Это вызовет исключение NullPointerException. Это можно решить с помощью вычислений, которые гарантированно будут атомарными.

    myCache.compute(event.getKey(), (key, value) -> {
        if (value == null) {
            value = new ArrayList<>();
        }
        value.add(event.getBean());
        return value;
    });
person Joni    schedule 24.03.2020