Самый быстрый способ инициализировать значение для ConcurrentHashMap

ConcurrentHashMap часто используется в параллельных средах для агрегирования некоторых событий по ключу — например, для подсчета совпадений для некоторых строковых значений. В случае, если мы не знаем ключей заранее, нам нужен хороший способ инициализации ключа по мере необходимости, он должен быть быстрым и безопасным с точки зрения параллелизма. Каков наилучший шаблон (с точки зрения эффективности) для этой проблемы?

Я буду использовать карту модели с объявленным <String, AtomicInteger> следующим образом:

ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();

Но это может быть карта с любой парой ключ-значение, где нам нужно инициализировать пару ключ-значение, если ключ еще не существует в карте, и изменить состояние значения для записи события.

Есть два популярных подхода: Первый использует ConcurrentHashMap.putIfAbsent:

AtomicInteger count = map.get(s);
if (count == null) {
    count = new AtomicInteger(0);
    AtomicInteger prevCount = map.putIfAbsent(s, count);
    if (prevCount != null) {
        count = prevCount;
    }
}
count.incrementAndGet();

Второй использует ConcurrentHashMap.computeIfAbsent:

AtomicInteger count = map.computeIfAbsent(s, (k) -> new AtomicInteger(0));
count.incrementAndGet();

Какой из них лучше подходит для этой задачи? Есть ли другие подходы?


person Krzysztof Cichocki    schedule 07.07.2017    source источник
comment
Поскольку computeIfAbsent и putIfAbsent используются для двух разных вещей, сравнивать их так наивно не имеет смысла. Вы используете computeIfAbsent, когда хотите атомарно вычислить значение (которое может быть основано на ключе). Вы используете putIfAbsent, когда хотите атомарно поместить простое значение на карту. Неудивительно, что более простая операция выполняется быстрее.   -  person Kayaman    schedule 07.07.2017
comment
@Kayaman, не совсем, почему computeIfAbsent просто не вернет текущее значение для этого ключа, когда ключ уже есть? Почему он вообще входит в синхронизированный блок, когда ключ уже присутствует?   -  person Krzysztof Cichocki    schedule 07.07.2017
comment
В последнее время я не просматривал код, но, скорее всего, чтобы обеспечить правильное отношение «происходит до». Метод должен быть атомарным и потокобезопасным. Вы говорите, что происходит ненужная синхронизация? Это была бы настоящая находка.   -  person Kayaman    schedule 07.07.2017
comment
@Kayaman да, именно это я и говорю   -  person Krzysztof Cichocki    schedule 07.07.2017
comment
И вы понимаете все последствия, которые повлечет за собой отсутствие этой синхронизации? Вы понимаете модель памяти Java и все такое? Я просто говорю, маловероятно, что вы нашли бы такую ​​простую ошибку в классе, который существовал так долго. Может быть, @BrianGoetz увидит этот вопрос и сможет дать вам ответ.   -  person Kayaman    schedule 07.07.2017
comment
@Kayaman Я просто говорю, что если ключ уже есть, в блоке synchronized ничего не нужно делать, и я это очень хорошо понимаю.   -  person Krzysztof Cichocki    schedule 07.07.2017
comment
Что ж, похоже, мы оба правы bugs.java.com/bugdatabase/view_bug .do?bug_id=8161372, но в Java 9 он все равно будет изменен.   -  person Kayaman    schedule 07.07.2017
comment
@Kayaman, в каком смысле ты был прав?   -  person Krzysztof Cichocki    schedule 07.07.2017
comment
Что ж, если вы читаете комментарии об ошибках, вы видите, что это компромисс, а не то, что мы случайно синхронизировали это. Вы должны сравнить исходники JDK8 и JDK9 и посмотреть, что они изменили.   -  person Kayaman    schedule 07.07.2017
comment
Ну, они делают и то, и другое, они признают, что это тоже какое-то заблуждение, что это какой-то обмен, мне непонятно, и это поведение обмена должно быть как минимум четко задокументировано в javadoc. С моей точки зрения, это просто ошибка, а не функция, потому что она нелогична и четко не указана в документации. Мы все знаем это объяснение. Это не ошибка, это фича.   -  person Krzysztof Cichocki    schedule 07.07.2017
comment
Вы довольно зациклены на этом, хотя вы даже не смогли найти отчет об ошибке. Вы должны провести сравнение между версиями JDK8/JDK9, чтобы повысить ценность этого вопроса. Был отчет об ошибке, конечным результатом было то, что они сделают исправление для Java 9. Сделайте сравнение! Заканчивайте этот вопрос!   -  person Kayaman    schedule 08.07.2017
comment
@Kayaman Многие люди используют java 8 и будут использовать ее еще долго после официального выпуска java 9, мой вопрос касается самого быстрого способа инициализации пары ключ-значение в ConcurentHashMap, я не нашел ничего прямого по этой теме на Итак, поэтому я просто делюсь своими выводами. Особенно то, что calculateIfAbsent в java 8 очень удобен для короткого лямбда-выражения, но его производительность не так хороша, как можно было бы ожидать.   -  person Krzysztof Cichocki    schedule 08.07.2017
comment
@Kayaman, вы можете добавить свой собственный ответ на этот вопрос, если считаете, что есть что-то важное, чем можно поделиться с другими.   -  person Krzysztof Cichocki    schedule 08.07.2017


Ответы (1)


К сожалению, вплоть до jdk1.8.0_131 computeIfAbsent всегда помещается в блок synchronized, независимо от того, находится ли ключ уже там или нет, что делает его намного медленнее, чем putIfAbsent.

Этот тест подтверждает это, оказывается, что в зависимости от уровня конкуренции putIfAbsent от 2 до 50 раз быстрее, чем computeIfAbsent.

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ConcurrentHashMapTest {
    private final static int numberOfRuns = 1000000;
    private final static int numberOfThreads = Runtime.getRuntime().availableProcessors();
    private final static int keysSize = 10;
    private final static String[] strings = new String[keysSize];
    static {
        for (int n = 0; n < keysSize; n++) {
            strings[n] = "" + (char) ('A' + n);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int n = 0; n < 20; n++) {
            testPutIfAbsent();
            testComputeIfAbsentLambda();
        }
    }

    private static void testPutIfAbsent() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
        final Random random = new Random();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        String s = strings[random.nextInt(strings.length)];
                        start = System.nanoTime();

                        AtomicInteger count = map.get(s);
                        if (count == null) {
                            count = new AtomicInteger(0);
                            AtomicInteger prevCount = map.putIfAbsent(s, count);
                            if (prevCount != null) {
                                count = prevCount;
                            }
                        }
                        count.incrementAndGet();
                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }

    private static void testComputeIfAbsentLambda() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
        final Random random = new Random();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        String s = strings[random.nextInt(strings.length)];
                        start = System.nanoTime();

                        AtomicInteger count = map.computeIfAbsent(s, (k) -> new AtomicInteger(0));
                        count.incrementAndGet();

                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }

}

выход:

Test testPutIfAbsent average time per run: 115.756501 ns
Test testComputeIfAbsentLambda average time per run: 276.9667055 ns
Test testPutIfAbsent average time per run: 134.2332435 ns
Test testComputeIfAbsentLambda average time per run: 223.222063625 ns
Test testPutIfAbsent average time per run: 119.968893625 ns
Test testComputeIfAbsentLambda average time per run: 216.707419875 ns
Test testPutIfAbsent average time per run: 116.173902375 ns
Test testComputeIfAbsentLambda average time per run: 215.632467375 ns
Test testPutIfAbsent average time per run: 112.21422775 ns
Test testComputeIfAbsentLambda average time per run: 210.29563725 ns
Test testPutIfAbsent average time per run: 120.50643475 ns
Test testComputeIfAbsentLambda average time per run: 200.79536475 ns

Мы можем использовать подход putIfAbsent для создания более быстрого computeIfAbsent. Единственная разница будет заключаться в том, что этот новый computeIfAbsent может вызывать initialization function более одного раза в случае одновременной инициализации одного и того же ключа. Результаты бенчмарка идентичны результатам 'putIfAbsent', так как это тот же самый код, это не является большим сюрпризом, но на случай, если кто-то захочет это проверить, вот бенчмарк:

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public class CocnurrentHashMap2Benchmark {
    private final static int numberOfRuns = 1000000;
    private final static int numberOfThreads = Runtime.getRuntime().availableProcessors();
    private final static int keysSize = 10;
    private final static String[] strings = new String[keysSize];
    static {
        for (int n = 0; n < keysSize; n++) {
            strings[n] = "" + (char) ('A' + n);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int n = 0; n < 20; n++) {
            testPutIfAbsent();
            testComputeIfAbsent2Lambda();
        }
    }

    private static void testPutIfAbsent() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
        final Random random = new Random();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        String s = strings[random.nextInt(strings.length)];
                        start = System.nanoTime();

                        AtomicInteger count = map.get(s);
                        if (count == null) {
                            count = new AtomicInteger(0);
                            AtomicInteger prevCount = map.putIfAbsent(s, count);
                            if (prevCount != null) {
                                count = prevCount;
                            }
                        }
                        count.incrementAndGet();
                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }


     private static void testComputeIfAbsent2Lambda() throws InterruptedException {
            final AtomicLong totalTime = new AtomicLong();
            final ConcurrentHashMap2<String, AtomicInteger> map = new ConcurrentHashMap2<String, AtomicInteger>();
            final Random random = new Random();
            ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
            for (int i = 0; i < numberOfThreads; i++) {
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        long start, end;
                        for (int n = 0; n < numberOfRuns; n++) {
                            String s = strings[random.nextInt(strings.length)];
                            start = System.nanoTime();

                            AtomicInteger count = map.computeIfAbsent2(s, (k) -> new AtomicInteger(0));
                            count.incrementAndGet();

                            end = System.nanoTime();
                            totalTime.addAndGet(end - start);
                        }
                    }
                });
            }
            executorService.shutdown();
            executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
            System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                    + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
        }

        public static class ConcurrentHashMap2<K,V> extends ConcurrentHashMap<K,V> {

            /**
             * If there is no mapping for the key then computes and puts the mapping,
             * otherwise it simply return the value for that key.
             * In case of concurrent initialization of the same key the mappingFunction can be called more than once.
             * @param key - the key to be initialized or retrieved
             * @param mappingFunction - the function to be called for computation of initial value.
             * @return computed value if the key wasn't already in the map otherwise return the actual value for provided key.
             */
            public V computeIfAbsent2(K key, Function<K,V> mappingFunction) {
                V value = get(key);
                if (value == null) {
                    value = mappingFunction.apply(key);
                    V prevValue = putIfAbsent(key, value);
                    if (prevValue != null) {
                        value = prevValue;
                    }
                }
                return value;
            }
        }
}

результаты, достижения:

Test testComputeIfAbsent2Lambda average time per run: 138.1053415 ns
Test testPutIfAbsent average time per run: 129.45236425 ns
Test testComputeIfAbsent2Lambda average time per run: 128.48006825 ns
Test testPutIfAbsent average time per run: 118.733798375 ns
Test testComputeIfAbsent2Lambda average time per run: 134.038046625 ns
Test testPutIfAbsent average time per run: 119.7947695 ns
Test testComputeIfAbsent2Lambda average time per run: 134.183876375 ns
Test testPutIfAbsent average time per run: 137.969932625 ns
Test testComputeIfAbsent2Lambda average time per run: 137.97531275 ns
Test testPutIfAbsent average time per run: 136.904379125 ns
Test testComputeIfAbsent2Lambda average time per run: 148.899750125 ns
Test testPutIfAbsent average time per run: 129.788293125 ns
Test testComputeIfAbsent2Lambda average time per run: 141.50586625 ns
Test testPutIfAbsent average time per run: 129.081558875 ns
Test testComputeIfAbsent2Lambda average time per run: 122.36628625 ns
Test testPutIfAbsent average time per run: 127.1215535 ns
Test testComputeIfAbsent2Lambda average time per run: 108.129917625 ns
Test testPutIfAbsent average time per run: 133.630786875 ns
Test testComputeIfAbsent2Lambda average time per run: 134.978805625 ns
Test testPutIfAbsent average time per run: 132.7747585 ns
Test testComputeIfAbsent2Lambda average time per run: 132.4352885 ns
Test testPutIfAbsent average time per run: 133.753792875 ns
Test testComputeIfAbsent2Lambda average time per run: 134.09569175 ns
Test testPutIfAbsent average time per run: 145.610141125 ns
Test testComputeIfAbsent2Lambda average time per run: 139.437622125 ns

Если мы сравним скорость размещения сопоставления в карте, когда ключ еще не существует, окажется, что новый «computeIfAbsent2» также намного быстрее. Эталон:

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public class CocnurrentHashMap2PutBenchmark {
    private final static int numberOfRuns = 1000000;
    private final static int numberOfThreads = Runtime.getRuntime().availableProcessors();

    public static void main(String[] args) throws InterruptedException {
        for (int n = 0; n < 20; n++) {
            testComputeIfAbsent2();
            testComputeIfAbsent();
        }
    }

    private static void testComputeIfAbsent2() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap2<Integer, String> map = new ConcurrentHashMap2<Integer, String>();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        Integer key = Integer.valueOf(n);
                        start = System.nanoTime();

                        String value = map.computeIfAbsent2(key, (k) -> "value");

                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }

    private static void testComputeIfAbsent() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<Integer, String>();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        Integer key = Integer.valueOf(n);
                        start = System.nanoTime();

                        String value = map.computeIfAbsent(key, (k) -> "value");

                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }

    public static class ConcurrentHashMap2<K, V> extends ConcurrentHashMap<K, V> {

        /**
         * If there is no mapping for the key then computes and puts the
         * mapping, otherwise it simply return the value for that key. In case
         * of concurrent initialization of the same key the mappingFunction can
         * be called more than once.
         * 
         * @param key
         *            - the key to be initialized or retrieved
         * @param mappingFunction
         *            - the function to be called for computation of initial
         *            value.
         * @return computed value if the key wasn't already in the map otherwise
         *         return the actual value for provided key.
         */
        public V computeIfAbsent2(K key, Function<K, V> mappingFunction) {
            V value = get(key);
            if (value == null) {
                value = mappingFunction.apply(key);
                V prevValue = putIfAbsent(key, value);
                if (prevValue != null) {
                    value = prevValue;
                }
            }
            return value;
        }
    }
}

результаты, достижения:

Test testComputeIfAbsent2 average time per run: 445.077932375 ns
Test testComputeIfAbsent average time per run: 784.786391 ns
Test testComputeIfAbsent2 average time per run: 294.10136375 ns
Test testComputeIfAbsent average time per run: 314.8724765 ns
Test testComputeIfAbsent2 average time per run: 236.56533275 ns
Test testComputeIfAbsent average time per run: 350.863664625 ns
Test testComputeIfAbsent2 average time per run: 346.19498275 ns
Test testComputeIfAbsent average time per run: 641.995172625 ns
Test testComputeIfAbsent2 average time per run: 255.441646125 ns
Test testComputeIfAbsent average time per run: 326.399150125 ns
Test testComputeIfAbsent2 average time per run: 275.626666125 ns
Test testComputeIfAbsent average time per run: 201.207314125 ns
Test testComputeIfAbsent2 average time per run: 289.19059725 ns
Test testComputeIfAbsent average time per run: 318.448059 ns
Test testComputeIfAbsent2 average time per run: 225.19701825 ns
Test testComputeIfAbsent average time per run: 306.461814125 ns
Test testComputeIfAbsent2 average time per run: 213.460366 ns
Test testComputeIfAbsent average time per run: 334.325044625 ns
Test testComputeIfAbsent2 average time per run: 256.4048955 ns
Test testComputeIfAbsent average time per run: 256.366700625 ns
Test testComputeIfAbsent2 average time per run: 231.88875575 ns
Test testComputeIfAbsent average time per run: 246.076624 ns
Test testComputeIfAbsent2 average time per run: 222.4649485 ns
Test testComputeIfAbsent average time per run: 266.505719625 ns
Test testComputeIfAbsent2 average time per run: 228.708391375 ns
Test testComputeIfAbsent average time per run: 261.866442625 ns
Test testComputeIfAbsent2 average time per run: 198.614718875 ns
Test testComputeIfAbsent average time per run: 225.43031925 ns
Test testComputeIfAbsent2 average time per run: 300.478359 ns
Test testComputeIfAbsent average time per run: 306.03640225 ns
Test testComputeIfAbsent2 average time per run: 195.0444215 ns
Test testComputeIfAbsent average time per run: 271.461982625 ns
Test testComputeIfAbsent2 average time per run: 224.306529875 ns
Test testComputeIfAbsent average time per run: 334.52790425 ns
Test testComputeIfAbsent2 average time per run: 212.217131625 ns
Test testComputeIfAbsent average time per run: 184.541579125 ns
Test testComputeIfAbsent2 average time per run: 265.417909625 ns
Test testComputeIfAbsent average time per run: 213.9811425 ns
Test testComputeIfAbsent2 average time per run: 298.76602575 ns
Test testComputeIfAbsent average time per run: 347.883728125 ns
person Krzysztof Cichocki    schedule 07.07.2017