Как обернуть ConcurrentDictionary в BlockingCollection?

Я пытаюсь реализовать ConcurrentDictionary, завернув его в BlockingCollection, но, похоже, это не удалось.

Я понимаю, что объявления одной переменной работают с BlockingCollection, например ConcurrentBag<T>, ConcurrentQueue<T> и т. д.

Итак, чтобы создать ConcurrentBag, обернутый в BlockingCollection, я бы объявил и создал экземпляр следующим образом:

BlockingCollection<int> bag = new BlockingCollection<int>(new ConcurrentBag<int>());

но как это сделать для ConcurrentDictionary? Мне нужна функция блокировки BlockingCollection как на стороне производителя, так и на стороне потребителя.


person Matt    schedule 24.05.2012    source источник
comment
Словарь (и ConcurrentDictionary тоже) не сохраняет порядок элементов. Можете ли вы описать свой сценарий производитель-потребитель?   -  person Dennis    schedule 24.05.2012
comment
@Денис, я знаю об этом. Производитель сохраняет KeyValuePairs в concurrentDictionary, а задача-потребитель увеличивает целое число и удаляет KeyValuePair, если целое совпадает с соответствующим ключом. Я делаю это, потому что рабочие задачи заполняют concurrentDictionary значениями, но в произвольном порядке, задача-потребитель гарантирует, что полученные значения передаются/обрабатываются в правильном порядке. Можно ли обернуть ConcurrentDictionary в BlockingCollection?   -  person Matt    schedule 24.05.2012
comment
Какое решение вы придумали? Я пытаюсь найти хорошее решение аналогичной проблемы, когда производитель не производит товары в порядке, необходимом потребителю. (старый пост я знаю, но стоит попробовать)   -  person Kim    schedule 04.11.2015


Ответы (2)


Возможно, вам нужен параллельный словарь blockingCollection

        ConcurrentDictionary<int, BlockingCollection<string>> mailBoxes = new ConcurrentDictionary<int, BlockingCollection<string>>();
        int maxBoxes = 5;

        CancellationTokenSource cancelationTokenSource = new CancellationTokenSource();
        CancellationToken cancelationToken = cancelationTokenSource.Token;

        Random rnd = new Random();
        // Producer
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // put the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                box.Add("some message " + index, cancelationToken);
                Console.WriteLine("Produced a letter to put in box " + index);

                // Wait simulating a heavy production item.
                Thread.Sleep(1000);
            }
        });

        // Consumer 1
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // get the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                var message = box.Take(cancelationToken);
                Console.WriteLine("Consumed 1: " + message);

                // consume a item cost less than produce it:
                Thread.Sleep(50);
            }
        });

        // Consumer 2
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // get the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                var message = box.Take(cancelationToken);
                Console.WriteLine("Consumed 2: " + message);

                // consume a item cost less than produce it:
                Thread.Sleep(50);
            }
        });

        Console.ReadLine();
        cancelationTokenSource.Cancel();

Таким образом, потребитель, ожидающий чего-то в почтовом ящике 5, будет ждать, пока производитель положит письмо в почтовый ящик 5.

person cpsaez    schedule 30.01.2013

Вам нужно будет написать свой собственный класс адаптера - что-то вроде:

    public class ConcurrentDictionaryWrapper<TKey,TValue> : IProducerConsumerCollection<KeyValuePair<TKey,TValue>>
{
    private ConcurrentDictionary<TKey, TValue> dictionary;

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
    {
        return dictionary.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public void CopyTo(Array array, int index)
    {
        throw new NotImplementedException();
    }

    public int Count
    {
        get { return dictionary.Count; }
    }

    public object SyncRoot
    {
        get { return this; }
    }

    public bool IsSynchronized
    {
        get { return true; }
    }

    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
    {
        throw new NotImplementedException();
    }

    public bool TryAdd(KeyValuePair<TKey, TValue> item)
    {
        return dictionary.TryAdd(item.Key, item.Value);
    }

    public bool TryTake(out KeyValuePair<TKey, TValue> item)
    {
        item = dictionary.FirstOrDefault();
        TValue value;
        return dictionary.TryRemove(item.Key, out value);
    }

    public KeyValuePair<TKey, TValue>[] ToArray()
    {
        throw new NotImplementedException();
    }
}
person Nick Jones    schedule 24.05.2012
comment
Спасибо за предложение кода. Но моей основной целью использования BlockingCollection была возможность пометить коллекцию как «Добавление завершено» и проверить ее статус, а также то, добавлена ​​ли она как завершенная и пустая, аналогично тому, что предоставляет BlockingCollection. Я знаю, что могу легко добавить такую ​​​​функциональность, но я ищу предложение, как это сделать напрямую через BlockingCollection. Пока я не вижу причины, по которой он не может работать через коллекцию Blocking напрямую. Может быть, требуется только IProducerConsumerCollection‹T›? - person Matt; 24.05.2012