Потоковая повторная инициализация параллельного словаря

Я хочу знать, является ли следующий код потокобезопасным, хотя я предполагаю, что это не так. И как я могу сделать это потокобезопасным?

В основном у меня есть ConcurrentDictionary, который действует как кеш для таблицы базы данных. Я хочу запрашивать БД каждые 10 секунд и обновлять кеш БД. Другие потоки будут все время запрашивать этот словарь.

Я не могу просто использовать TryAdd, так как могут быть также удаленные элементы. Поэтому я решил вместо того, чтобы искать по всему словарю, возможно, обновить, добавить или удалить. Я бы просто переинициализировал словарь. Пожалуйста, скажите мне, глупая ли это идея.

Меня беспокоит то, что когда я повторно инициализирую словарь, потоки запросов больше не будут потокобезопасными для экземпляра, когда происходит инициализация. По этой причине я использовал блокировку словаря при его обновлении. Однако я не уверен, правильно ли это, поскольку объект изменяется в блокировке?

private static System.Timers.Timer updateTimer;
private static volatile Boolean _isBusyUpdating = false;
private static ConcurrentDictionary<int, string> _contactIdNames;

public Constructor()
{
    // Setup Timers for data updater         
    updateTimer = new System.Timers.Timer();
    updateTimer.Interval = new TimeSpan(0, 0, 10, 0).TotalMilliseconds;
    updateTimer.Elapsed += OnTimedEvent;
    // Start the timer
    updateTimer.Enabled = true;
}

private void OnTimedEvent(Object source, System.Timers.ElapsedEventArgs e)
{
    if (!_isBusyUpdating)
    {
        _isBusyUpdating = true;
        // Get new data values and update the list
        try
        {
            var tmp = new ConcurrentDictionary<int, string>();
            using (var db = new DBEntities())
            {
                foreach (var item in db.ContactIDs.Select(x => new { x.Qualifier, x.AlarmCode, x.Description }).AsEnumerable())
                {
                    int key = (item.Qualifier * 1000) + item.AlarmCode;
                    tmp.TryAdd(key, item.Description);
                }
            }
            if (_contactIdNames == null)
            {
                _contactIdNames = tmp;
            }
            else
            {
                lock (_contactIdNames)
                {
                    _contactIdNames = tmp;
                }
            }
        }
        catch (Exception e)
        {
            Debug.WriteLine("Error occurred in update ContactId db store", e);
        }
        _isBusyUpdating = false;
    }
}

    /// Use the dictionary from another Thread
    public int GetIdFromClientString(string Name)
    {
        try
        {
            int pk;
            if (_contactIdNames.TryGetValue(Name, out pk))
            {
                return pk;
            }
        }
        catch { }
        //If all else fails return -1
        return -1;
    }

person Zapnologica    schedule 12.10.2015    source источник
comment
Зачем нужно переинициализировать Словарь? Не могли бы вы вместо этого Clear()?   -  person Ric    schedule 12.10.2015
comment
Почему вы используете Concurrent Dictionary, когда к словарю никогда не обращаются одновременно? Вы пишете на него из другого места?   -  person usr    schedule 12.10.2015
comment
@usr Я обращаюсь к нему одновременно из нескольких разных потоков.   -  person Zapnologica    schedule 12.10.2015
comment
@Zapnologica - Вы читаете его из нескольких потоков? Не писать в него из нескольких потоков?   -  person Enigmativity    schedule 12.10.2015
comment
@Zapnologica - Почему вы смешиваете static и нестатический код?   -  person Enigmativity    schedule 12.10.2015
comment
@Zapnologica - А где определена переменная _clientIdDbStore и как она соотносится с _contactIdNames?   -  person Enigmativity    schedule 12.10.2015
comment
@Enigmativity да В настоящее время я только читаю его. Когда я начал его кодировать, я собирался обновить DS из базы данных, но теперь я рассматриваю подход повторной инициализации. Так что в данном случае в этом нет необходимости, но это не должно причинить никакого вреда, правильно?   -  person Zapnologica    schedule 12.10.2015
comment
Вы пишете на него из другого места? Пожалуйста, покажите этот код.   -  person usr    schedule 12.10.2015


Ответы (3)


Вы правы, ваш код не является потокобезопасным.

  1. Вам нужно заблокировать _isBusyUpdating переменную.
  2. Вам нужно блокировать _contactIdNames каждый раз, а не только когда это не null.

Также этот код похож на одноэлементный шаблон и имеет ту же проблему с инициализацией. Вы можете решить эту проблему с помощью блокировки с двойной проверкой. Однако вам также потребуется двойная проверка блокировки при доступе к записям.

В случае, когда вы обновляете сразу весь словарь, вам нужно каждый раз блокировать текущее значение при обращении. В противном случае вы можете получить к нему доступ, пока он все еще меняется, и получить ошибку. Поэтому вам либо нужно каждый раз блокировать переменную, либо использовать Interlocked.

Как сообщает MSDN, volatile должен помочь с _isBusyUpdating, он должен быть потокобезопасным.

Если вы не хотите отслеживать _contactIdNames потокобезопасность, попробуйте реализовать обновление каждой записи в том же словаре. Проблема будет в обнаружении разницы между БД и текущими значениями (какие записи были удалены или добавлены, другие можно просто переписать), но не в безопасности потоков, поскольку ConcurrentDictionary уже является потокобезопасным.

person NikolayK    schedule 12.10.2015
comment
Единственная проблема заключается в том, что идея использования Concurrent Dictionary заключается в том, что не нужно блокировать весь объект при каждом доступе, структура данных должна блокироваться атомарно. Кроме этого сценария? Что касается _isBusyUpdating, я хочу, чтобы перекрывающиеся таймеры просто перебегали и пропускали, а не накладывались друг на друга? Я объявил это как static volatile boolean. Я использую одноэлементный шаблон для класса, который содержит этот элемент кеша. Есть ли в этом смысл? - person Zapnologica; 12.10.2015

Кажется, вы много работаете на себя. Вот как я бы справился с этой задачей:

public class Constructor
{
    private volatile Dictionary<int, string> _contactIdNames;

    public Constructor()
    {
        Observable
            .Interval(TimeSpan.FromSeconds(10.0))
            .StartWith(-1)
            .Select(n =>
            {
                using (var db = new DBEntities())
                {
                    return db.ContactIDs.ToDictionary(
                        x => x.Qualifier * 1000 + x.AlarmCode,
                        x => x.Description);
                }
            })
            .Subscribe(x => _contactIdNames = x);
    }

    public string TryGetValue(int key)
    {
        string value = null;
        _contactIdNames.TryGetValue(key, out value);
        return value;
    }
}

Я использую Microsoft Reactive Extensions (Rx) Framework - NuGet «Rx-Main» - чтобы таймер обновлял словарь.

Rx должен быть довольно простым. Если вы еще не видели это очень простым языком, это похоже на то, что LINQ встречает события.

Если вам не нравится Rx, просто используйте вашу текущую модель таймера.

Весь этот код создает новый словарь каждые 10 секунд из БД. Я просто использую простой словарь, поскольку он создается только из одного потока. Поскольку присвоение ссылок является атомарным, вы можете просто повторно назначить словарь, когда захотите, с полной безопасностью потоков.

Несколько потоков могут безопасно читать из словаря, пока элементы не меняются.

person Enigmativity    schedule 12.10.2015
comment
Очень интересный подход. Я никогда не слышал об этом, но похоже, что это очень чистое и простое решение. - person Zapnologica; 13.10.2015
comment
@Zapnologica - мне нравится просто и понятно. Он не сильно отличается от вашего подхода, но просто удаляет всю ненужную защиту от блокировок и потоков. Если у вас один писатель и несколько читателей, это все, что вам нужно. - person Enigmativity; 13.10.2015
comment
+1 _contactIdNames должен быть изменчивым, и я бы заменил материал Rx таймером или циклом задержки, потому что это проще. - person usr; 14.10.2015
comment
@usr - Как вы думаете, почему _contactIdNames должно быть volatile? Есть только один поток писателя. - person Enigmativity; 15.10.2015
comment
@Enigmativity и читатели тоже. Эти двое должны синхронизироваться. Это гонка за данными. - person usr; 15.10.2015
comment
@usr - Но поскольку это единственное присвоение ссылки (которое является атомарным), это не должно быть проблемой. - person Enigmativity; 15.10.2015
comment
@Enigmativity, ни ECMA, ни CLR ничего не гарантируют в этой ситуации. Запись не обязательно должна быть видимой. Например, считыватели могут навсегда кэшировать старое значение. Атомарность не проблема. - person usr; 15.10.2015
comment
@usr - Достаточно честно. Я добавил volatile. - person Enigmativity; 15.10.2015
comment
@usr и Enigmativity, вы, ребята, самые безумные разработчики в области переполнения стека. Спасибо за ответ! Вы оба напоминаете мне сцену с Биллом Мясником - person Firegarden; 02.07.2017
comment
@Firegarden - Если вы пытаетесь @ уведомить двух людей, вам нужно сделать два комментария с уведомлением для обоих пользователей. - person Enigmativity; 02.07.2017

Я хочу знать, является ли следующий код потокобезопасным, хотя я предполагаю, что это не так. И как я могу сделать это потокобезопасным?

Я верю, что это не так. Прежде всего, я бы создал свойство для ConcurrentDictionary и проверил, идет ли обновление внутри метода get, и если да, я бы вернул предыдущую версию объекта:

    private object obj = new object();
    private ConcurrentDictionary<int, string> _contactIdNames;
    private ConcurrentDictionary<int, string> _contactIdNamesOld;
    private volatile bool _isBusyUpdating = false;

    public ConcurrentDictionary<int, string> ContactIdNames
    {
        get
        {
            if (!_isBusyUpdating) return _contactIdNames;

            return _contactIdNamesOld;
        }
        private set 
        {
            if(_isBusyUpdating) _contactIdNamesOld = 
                new ConcurrentDictionary<int, string>(_contactIdNames);

            _contactIdNames = value; 
        }
    }

И ваш метод может быть:

    private static void OnTimedEvent(Object source, System.Timers.ElapsedEventArgs e)
    {
        if (_isBusyUpdating) return;

        lock (obj)
        {

            _isBusyUpdating = true;
            // Get new data values and update the list

            try
            {
                ContactIdNames = new ConcurrentDictionary<int, string>();
                using (var db = new DBEntities())
                {
                    foreach (var item in db.ContactIDs.Select(x => new { x.Qualifier, x.AlarmCode, x.Description }).AsEnumerable())
                    {
                        int key = (item.Qualifier * 1000) + item.AlarmCode;
                        _contactIdNames.TryAdd(key, item.Description);
                    }
                }                    
            }
            catch (Exception e)
            {
                Debug.WriteLine("Error occurred in update ContactId db store", e);        
                _contactIdNames = _contactIdNamesOld;            
            }
            finally 
            {                   
                _isBusyUpdating = false;
            }
        }
    }

P.S.

Меня беспокоит то, что когда я повторно инициализирую словарь, потоки запросов больше не будут потокобезопасными для экземпляра, когда происходит инициализация. По этой причине я использовал блокировку словаря при его обновлении. Однако я не уверен, правильно ли это, поскольку объект изменяется в блокировке?

Этот ConcurrentDictionary<T> тип является потокобезопасным, а не его экземпляром, поэтому даже если вы создадите новый экземпляр и измените ссылку на него - это не повод для беспокойства.

person Fabjan    schedule 12.10.2015