ConcurrentDictionary GetOrAdd async

Я хочу использовать что-то вроде GetOrAdd с ConcurrentDictionary в качестве кеша для веб-службы. Есть ли асинхронная версия этого словаря? GetOrAdd будет делать веб-запрос, используя HttpClient, поэтому было бы неплохо, если бы существовала версия этого словаря, где GetOrAdd был асинхронным.

Чтобы устранить некоторую путаницу, содержимое словаря будет ответом на вызов веб-службы.

ConcurrentDictionary<string, Response> _cache
    = new ConcurrentDictionary<string, Response>();

var response = _cache.GetOrAdd("id",
    (x) => { _httpClient.GetAsync(x).GetAwaiter().GetResponse(); });

person Zeus82    schedule 09.01.2019    source источник
comment
Для меня это звучит так, будто async GetOrAdd не имеет большого смысла. Этот метод может выполняться только синхронно.   -  person Yeldar Kurmangaliyev    schedule 09.01.2019
comment
Добавление в словарь не является операцией, связанной с вводом-выводом, нет смысла иметь его асинхронную версию.   -  person JohanP    schedule 09.01.2019
comment
Если вам нужно чего-то подождать, я бы посоветовал проверить, находится ли ключ в словаре, а если нет, то дождитесь вызова Http, затем вызовите GeOrAdd с результатом. В конечном итоге вам придется снова проверить, если что-то еще вставило ключ, пока вы ждали ввода-вывода.   -  person juharr    schedule 09.01.2019
comment
@juharr: Это именно то, что делает ConcurrentDictionary. Он начинается с проверки, затем генерирует новое значение, а затем снова проверяет перед добавлением.   -  person Poul Bak    schedule 08.06.2021


Ответы (4)


GetOrAdd не станет асинхронной операцией, потому что доступ к значению словаря не является длительной операцией.

Однако вы можете просто сохранять задачи в словаре, а не материализованный результат. Затем любой, кому нужны результаты, может дождаться выполнения этой задачи.

Однако вам также необходимо убедиться, что операция запускается только один раз, а не несколько раз. Чтобы некоторые операции выполнялись только один раз, а не несколько раз, вам также необходимо добавить Lazy:

ConcurrentDictionary<string, Lazy<Task<Response>>> _cache = new ConcurrentDictionary<string, Lazy<Task<Response>>>();

var response = await _cache.GetOrAdd("id", url => new Lazy<Task<Response>>(_httpClient.GetAsync(url))).Value;
person Servy    schedule 09.01.2019
comment
Это помещает неполный Task в кеш. Что произойдет, если Task откажет или будет отменен? Задача представляет собой HTTP-запрос к удаленному ресурсу, вероятность его сбоя немалая. - person odyss-jii; 10.01.2019
comment
@ odyss-jii Да, им нужно будет обработать случай ошибки, и это, скорее всего, потребует удаления его из кеша. - person Servy; 10.01.2019
comment
Совершенно ужасный дизайн для кеша. Это полностью разрушает абстракцию. Если я получаю значение из подсистемы, я не отвечаю за очистку ее внутреннего кеша, потому что она имеет неработающую реализацию. - person odyss-jii; 10.01.2019
comment
Он не обязательно должен быть конечным потребителем кеша, который его обрабатывает, это может быть оболочка вокруг этого кода, который записывает OP. Код в этом ответе не является полностью готовым к эксплуатации полнофункциональным кешем. Он показывает, как решить заданный вопрос, который OP необходимо будет завершить в своем собственном кэше упаковки, чтобы сделать его продуктивным кодом. Точно так же, как в вашем ответе есть проблемы с тем, чтобы сделать его не завершенным готовым кодом, а просто решением заданного вопроса. - person Servy; 10.01.2019
comment
Я чувствую, что Ленивый лишний. Выполнение чего-то вроде _httpClient.GetAsync(url) немедленно вернет Задачу. - person Darragh; 29.06.2020
comment
@ Darragh Но тогда ты проводишь операцию не один раз. Это очень часто неприемлемо. Lazy не гарантирует, что операция вернется быстрее, она гарантирует, что она никогда не будет выполняться более одного раза. - person Servy; 29.06.2020

Метод GetOrAdd не очень подходит для этой цели. Поскольку он не гарантирует, что фабрика запускается только один раз, единственная цель, которую он имеет, - это небольшая оптимизация (незначительная, поскольку добавления в любом случае редки), поскольку ей не нужно дважды хешировать и находить правильный сегмент (что произойдет дважды, если вы получаете и устанавливаете с двумя отдельными вызовами).

Я бы посоветовал вам сначала проверить кеш, если вы не нашли значение в кеше, затем введите какую-либо форму критического раздела (блокировка, семафор и т. Д.), Повторно проверьте кеш, если все еще отсутствует, выберите значение и вставляем в кеш.

Это гарантирует, что ваш резервный магазин будет задействован только один раз; даже если несколько запросов получают промах кеша одновременно, только первый запрос фактически получит значение, другие запросы будут ожидать семафор, а затем вернутся раньше, поскольку они повторно проверяют кеш в критической секции.

Псевдо-код (с использованием SemaphoreSlim со счетчиком 1, так как вы можете ожидать его асинхронно):

async Task<TResult> GetAsync(TKey key)
{
    // Try to fetch from catch
    if (cache.TryGetValue(key, out var result)) return result;

    // Get some resource lock here, for example use SemaphoreSlim 
    // which has async wait function:
    await semaphore.WaitAsync();    
    try 
    {
        // Try to fetch from cache again now that we have entered 
        // the critical section
        if (cache.TryGetValue(key, out result)) return result;

        // Fetch data from source (using your HttpClient or whatever), 
        // update your cache and return.
        return cache[key] = await FetchFromSourceAsync(...);
    }
    finally
    {
        semaphore.Release();
    }
}
person odyss-jii    schedule 09.01.2019
comment
Если вы собираетесь явно заблокировать, вам необходимо явно заблокировать все остальное, что также использует эту коллекцию, чтобы гарантировать, что операция логически атомарна. - person Servy; 09.01.2019
comment
Коллекция называется ConcurrentDictionary, сама коллекция является поточно-ориентированной. Вы блокируете здесь по другой причине. - person odyss-jii; 09.01.2019
comment
Коллекция не будет генерировать какое-либо исключение индекса за пределами границ или возвращать данные мусора, потому что она предназначена для использования из нескольких потоков, но теперь вы пытаетесь выполнить несколько операций с ней последовательно и не полагаетесь ни на какие изменения в коллекцию в течение того времени, которое она вам не предоставит. Вам нужно будет явно заблокировать не только здесь, но и повсюду, используя коллекцию, чтобы гарантировать, что кто-то еще не добавит значение после того, как вы обнаружите, что оно отсутствует, или что-то в этом роде. - person Servy; 09.01.2019
comment
Я не уверен, о каком гипотетическом сценарии вы думаете, но он не применим к данному конкретному случаю. Это выборка из источника с кешем в памяти, не имеет значения, изменится ли коллекция за это время. Цель блокировки - защитить источник от всплеска, если есть несколько одновременных промахов кеша; цель не в том, чтобы синхронизировать доступ к коллекции. - person odyss-jii; 09.01.2019
comment
Да, имеет значение, если коллекция изменится. Например, это может привести к многократному выполнению работы, которую нельзя повторять. - person Servy; 10.01.2019

Попробуйте этот метод расширения:

/// <summary>
/// Adds a key/value pair to the <see cref="ConcurrentDictionary{TKey, TValue}"/> by using the specified function 
/// if the key does not already exist. Returns the new value, or the existing value if the key exists.
/// </summary>
public static async Task<TResult> GetOrAddAsync<TKey,TResult>(
    this ConcurrentDictionary<TKey,TResult> dict,
    TKey key, Func<TKey,Task<TResult>> asyncValueFactory)
{
    if (dict.TryGetValue(key, out TResult resultingValue))
    {
        return resultingValue;
    }
    var newValue = await asyncValueFactory(key);
    return dict.GetOrAdd(key, newValue);
}

Вместо dict.GetOrAdd(key,key=>something(key)) вы используете await dict.GetOrAddAsync(key,async key=>await something(key)). Очевидно, что в этой ситуации вы просто пишете это как await dict.GetOrAddAsync(key,something), но я хотел прояснить это.

Что касается опасений по поводу сохранения порядка операций, у меня есть следующие наблюдения:

  1. Использование обычного GetOrAdd даст тот же эффект, если вы посмотрите, как он реализован. Я буквально использовал тот же код и заставил его работать для async. Ссылка говорит

делегат valueFactory вызывается вне блокировок, чтобы избежать проблем, которые могут возникнуть при выполнении неизвестного кода под блокировкой. Следовательно, GetOrAdd не является атомарным по отношению ко всем другим операциям в классе ConcurrentDictionary ‹TKey, TValue›.

  1. SyncRoot не поддерживается в ConcurrentDictionary, они используют внутренний механизм блокировки, поэтому блокировка невозможна. Однако использование собственного механизма блокировки работает только для этого метода расширения. Если вы используете другой поток (например, с помощью GetOrAdd), вы столкнетесь с той же проблемой.
person Siderite Zackwehdex    schedule 12.12.2020
comment
Эта реализация GetOrAddAsync не сохраняет порядок операций. Сценарий: рабочий процесс-1 вызывает .GetOrAddAsync("Key", GetAsync("A")), затем рабочий процесс-2 вызывает .GetOrAddAsync("Key", GetAsync("B")), затем рабочий процесс-3 вызывает .TryRemove("Key", out _). Наконец, словарь может иметь либо значение A, либо B, либо вообще не иметь значения. Это происходит потому, что эта GetOrAddAsync реализация откладывает сохранение чего-либо в словаре до завершения асинхронного делегата. - person Theodor Zoulias; 13.01.2021
comment
Вы имеете в виду, если вы используете его без ожидания? - person Siderite Zackwehdex; 13.01.2021
comment
Сидерита нет, я имею в виду, если вы правильно дождетесь метода. Мой сценарий включает три независимых асинхронных рабочих процесса, каждый из которых вызывает API и await возвращает возвращенную задачу. В этом сценарии задача, ожидаемая рабочим процессом-1, может занять больше времени, чем задача, ожидаемая рабочим процессом-2, и в этом случае рабочий процесс-1 перезапишет значение, введенное в словарь рабочим процессом-2. Такое поведение было бы, мягко говоря, удивительно. - person Theodor Zoulias; 13.01.2021
comment
Я обновил свой ответ. Я ценю уровень внимания к вашей реализации, но я считаю, что это слишком сложно по причинам, которые я указал в ответе. Вопрос SO касался использования GetOrAdd с асинхронным делегатом, что подразумевает принятие ограничений исходного метода. - person Siderite Zackwehdex; 13.01.2021
comment
Сидерите Я понимаю вашу точку зрения. Ваша реализация действительно имеет поведение, аналогичное собственному GetOrAdd (исходный код). Я предполагаю, что типичная асинхронная работа имеет большую продолжительность, чем типичная синхронная работа, и это может сделать недостатки нативного поведения более заметными. В любом случае мой голос против был необоснованным, и я его отозвал. - person Theodor Zoulias; 13.01.2021

Возможно, используется выделенный кеш памяти (например, new или старый MemoryCache классы или эту стороннюю библиотеку) предпочтительнее к использованию простого ConcurrentDictionary. Если вам действительно не нужны часто используемые функции, такие как истечение срока действия, сжатие на основе размера, автоматическое исключение записей, которые зависят от других записей, срок действия которых истек, или зависят от изменяемых внешних ресурсов (например, файлов, баз данных и т. Д.). Однако следует отметить, что _3 _ все еще может потребоваться некоторая работа для правильной обработки асинхронных делегатов, поскольку его нестандартное поведение не идеально.

Ниже приведен собственный метод расширения GetOrAddAsync для ConcurrentDictionary, которые имеют Task<TValue> значения. Он принимает фабричный метод и гарантирует, что метод будет вызван не более одного раза. Это также гарантирует, что неудачные задачи будут удалены из словаря. Эта реализация оптимизирована для случая, когда получение существующей задачи происходит часто, а создание новой - редко.

/// <summary>
/// Returns an existing task from the concurrent dictionary, or adds a new task
/// using the specified asynchronous factory method. Concurrent invocations for
/// the same key are prevented, unless the task is removed before the completion
/// of the delegate. Failed tasks are evicted from the concurrent dictionary.
/// </summary>
public static Task<TValue> GetOrAddAsync<TKey, TValue>(
    this ConcurrentDictionary<TKey, Task<TValue>> source, TKey key,
    Func<TKey, Task<TValue>> valueFactory)
{
    if (!source.TryGetValue(key, out var currentTask))
    {
        Task<TValue> newTask = null;
        var newTaskTask = new Task<Task<TValue>>(async () =>
        {
            try { return await valueFactory(key).ConfigureAwait(false); }
            catch
            {
                ((ICollection<KeyValuePair<TKey, Task<TValue>>>)source)
                    .Remove(new KeyValuePair<TKey, Task<TValue>>(key, newTask));
                //source.TryRemove(KeyValuePair.Create(key, newTask)); // .NET 5
                throw;
            }
        });
        newTask = newTaskTask.Unwrap();
        currentTask = source.GetOrAdd(key, newTask);
        if (currentTask == newTask)
            newTaskTask.RunSynchronously(TaskScheduler.Default);
    }
    return currentTask;
}

Пример использования:

var cache = new ConcurrentDictionary<string, Task<HttpResponseMessage>>();

var response = await cache.GetOrAddAsync("https://stackoverflow.com", async url =>
{
    return await _httpClient.GetAsync(url);
});

Для удаления проблемных задач эта реализация использует явно реализованный _ 9_. Более подробную информацию об этом API можно найти здесь. Начиная с .NET 5 и далее, новый person Theodor Zoulias    schedule 13.01.2021

comment
У меня несколько вопросов по поводу вашего решения. Unwrap то же самое, что await newTaskTask? Почему бы просто не использовать source.TryRemove(key, out _) вместо преобразования source в ICollection<>? Я немного запутался в newTaskTask.RunSynchronously(TaskScheduler.Default). Это выглядит немного странно. Нам это нужно? Вызывающий сделает await. Обеспечит ли это вызов задачи? - person Sebastian Schumann; 16.02.2021
comment
Возникает вопрос: эта реализация более или менее эквивалентна вашему примеру? Да, я знаю, что выполнение откладывается (не совсем) до тех пор, пока первый вызывающий не дождется возвращенной задачи. - person Sebastian Schumann; 16.02.2021
comment
@SebastianSchumann, конечно, и спасибо за вопрос. newTaskTask.Unwrap() действительно совпадает с await newTaskTask, при условии, что задача горячая (т.е. она уже запущена). Здесь дело обстоит не так, и если мы попытаемся await выполнить задачу, возникнет тупик. Задача намеренно холодная, потому что мы хотим запустить задачу только после того, как она будет успешно вставлена ​​в словарь. В противном случае, если гонка за обновлением словаря была проиграна, холодная задача будет просто отброшена. - person Theodor Zoulias; 16.02.2021
comment
Причина того, что source.TryRemove(key, out _) недостаточно, заключается в том, что GetOrAddAsync - это просто метод расширения, и он не полностью контролирует содержимое словаря. Таким образом, возможно, что во время выполнения задачи какой-то другой код может заменить ее другой задачей. В случае, если наша задача не удалась, мы хотим удалить ее из словаря, только если она все еще существует, а не удалять какую-то другую задачу, которая нам не известна. - person Theodor Zoulias; 16.02.2021
comment
newTaskTask.RunSynchronously(TaskScheduler.Default) запускает внешнюю задачу, которая вызывает делегата valueFactory. До достижения этой точки valueFactory не был вызван. Важно, чтобы valueFactory вызывался только один раз, в случае, если несколько потоков спешат вставить этот ключ в словарь. Аргумент TaskScheduler.Default гарантирует, что valueFactory будет вызываться синхронно хорошо известным TaskScheduler, и что мы не находимся во власти TaskScheduler.Current (значение параметра по умолчанию), каким бы он ни был. - person Theodor Zoulias; 16.02.2021
comment
Итак, нет, эта реализация не эквивалентна реализации этого ответа! ???? - person Theodor Zoulias; 16.02.2021
comment
Ах да, извините - я был полностью слеп. Я не видел холодную задачу. Это действительно правильно и необходимо. И да, я забыл о возможности изменить значение с помощью dict[key] = value, который изменяет значение. Простите за это. - person Sebastian Schumann; 16.02.2021
comment
@SebastianSchumann, не беспокойтесь. Я рад, что кто-то спросил, и я могу объяснить нюансы этого необычного метода. :-) - person Theodor Zoulias; 16.02.2021
comment
да. Я удалил свой вопрос после того, как прочитал документацию об этой функции. - person Sebastian Schumann; 16.02.2021
comment
Но остается одна путаница: зачем нам этот вызов RunSynchronously? Существует состояние гонки: предположим, что один вызов создает эту задачу, добавляет ее в dict, а переключение контекста происходит непосредственно перед вызовом Run... Другая задача получает уже добавленную задачу и ожидает ее. Это должно вызвать valueFactory. И даже если эта фабрика также будет вызвана Run.... Что мне здесь не хватает? - person Sebastian Schumann; 16.02.2021
comment
@SebastianSchumann, ожидающего выполнения задачи, недостаточно для вызова valueFactory. Только текущий поток знает о вложенном Task<Task<TValue>> и имеет средства для его запуска. Если он не запустит его, обе задачи, вложенная и развернутая, останутся холодными навсегда. Любой рабочий процесс, который пытается дождаться выполнения задачи, просто зайдет в тупик. - person Theodor Zoulias; 16.02.2021
comment
Ладно - думаю, я понял. На всякий случай: newTaskTask известен только текущему потоку - это очевидно. Только эта ветка умеет это называть - ясно. Параллельный вызов получает newTask - тоже неприятно. Ожидание этого newTask будет заблокировано, потому что newTaskTask не был запущен. Пух - хорошо. Если это предположение верно, реализация верна. - person Sebastian Schumann; 16.02.2021
comment
@SebastianSchumann да, в этом маленьком методе много чего происходит. :-) - person Theodor Zoulias; 16.02.2021
comment
@SebastianSchumann кстати, та же идея была использована здесь для реализации улучшенного класса AsyncLazy<T>. Создание холодных задач редко бывает хорошей идеей, но когда это так, это творит чудеса! ???? - person Theodor Zoulias; 16.02.2021