ConcurrentBag‹T› получает дубликаты (кажется, не является потокобезопасным)

Должно быть, я где-то делаю что-то не так, потому что я получаю дубликаты элементов в своем concurrentbag, вот цепочка событий

  var listings = new ConcurrentBag<JSonListing>();
  Parallel.ForEach(Types, ParallelOptions, (type, state) =>
  {
      ...
      status = ProcessType(listings, status, type, state);
      ....
   });

  private GeocodeResults ProcessType(ConcurrentBag<JSonListing> listings, GeocodeResults status, XElement type, ParallelLoopState state)
  {
      ....
      AddListingsToList(results, type, listings);
      .... 
  }

private void AddListingsToList(dynamic results, XElement type, ConcurrentBag<JSonListing> listings)
    {

        var typeMustMatch = type.Attribute("TypeMustMatch").Value;
        var typeID = Convert.ToInt32(type.Attribute("ID").Value);

        foreach (var result in results.results)
        {


            var foundListing = listings.SingleOrDefault(x => x.ID == result.id);
            if (foundListing != null)
            {
                var typeIDs = foundListing.TypeIDs.Split("|".ToCharArray(), StringSplitOptions.RemoveEmptyEntries).ToList();
                if (!typeIDs.Contains(typeID.ToString()))
                {
                    foundListing.TypeIDs += "|" + typeID;
                }
            }
            else
            {
                var listing = new JSonListing { ID = result.id, ReferenceNumber = result.reference, TypeIDs = typeID.ToString(), TypeMustMatch = typeMustMatch };
                listings.Add(listing);
            }




        }


    }

Список добавления должен гарантировать, что, если элемент уже существует, не будет добавлен другой элемент с тем же идентификатором, а вместо этого будет обновлено какое-то свойство. Теперь ошибка, которую я получаю,

System.InvalidOperationException: последовательность содержит более одного соответствующего элемента
в System.Linq.Enumerable.SingleOrDefault[TSource](источник IEnumerable`1, предикат Func`2)
в LocalSearch.Processor.CityProcessor.AddListingsToList(результаты объекта , тип XElement, списки ConcurrentBag`1) в d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:строка 310
в CallSite.Target(Closure, CallSite, CityProcessor, Object, XElement, ConcurrentBag`1 )
в LocalSearch.Processor.CityProcessor.ProcessType(списки ConcurrentBag`1, статус GeocodeResults, тип XElement, состояние ParallelLoopState) в d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:строка 249
в LocalSearch.Processor.CityProcessor.‹>c__DisplayClass4.b__0(тип XElement, состояние ParallelLoopState) в d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:строка 137


person Zoinky    schedule 07.02.2014    source источник
comment
Это было бы решено более чисто с помощью PLINQ.   -  person spender    schedule 07.02.2014
comment
@spender: Похоже, что ProcessType будет иметь значительные побочные эффекты, которые были исключены из примера.   -  person Jon    schedule 07.02.2014
comment
@spender Простое свойство ProcessType — это вызов (или несколько вызовов) API-интерфейса Google Places, который возвращает динамический список доступных мест, соответствующих поисковому запросу. PLINQ для этого не подходит, так как они не находятся в памяти.   -  person Zoinky    schedule 07.02.2014


Ответы (2)


ConcurrentBag гарантирует, что каждая операция над ним является потокобезопасной, если рассматривать ее отдельно. Это не гарантирует, что несколько последовательных операций будут рассматриваться как атомарная группа.

В результате ваш код имеет состояние гонки: вы проверяете, есть ли в сумке какой-то предмет X, но два потока могут запустить тест одновременно, решить, что предмета там нет, и приступить к его добавлению. Конечный результат: две копии предмета оказываются в сумке.

Похоже, что ваш вариант использования лучше реализовать, используя вместо него ConcurrentDictionary и метод TryAdd. , который является атомарным. В качестве альтернативы вы можете поместить lock() вокруг сумки, чтобы все внутри блока работало атомарно, но тогда вам действительно не нужна параллельная коллекция, и вместо этого можно использовать прямое List.

person Jon    schedule 07.02.2014
comment
Как я могу добиться того, чтобы дубликаты не попали в сумку? - person Zoinky; 07.02.2014
comment
Я думаю, что лучше всего было бы использовать List‹T›, пусть у него есть дубликаты, после того, как вы пройдете по этому списку, большинство из этих списков будет иметь примерно 4 000 элементов, поэтому это не будет большим значительным ударом по производительности (по крайней мере, я так не думаю, но буду пробовать) - person Zoinky; 07.02.2014
comment
@Zoinky: Вы можете попробовать многое, лучшее зависит от ваших конкретных требований (мы можем только догадываться). Вместо того, чтобы разрешать дубликаты, почему бы просто не использовать Set<T> с соответствующим компаратором равенства? Еще лучше, поскольку здесь можно использовать естественный ключ, почему бы не использовать ConcurrentDictionary? - person Jon; 07.02.2014
comment
Я считаю, что ConcurrentDictionary будет тем, что я ищу, мои требования довольно просты. Основной Parrallel.ForEach имеет около 20 элементов, каждый из которых будет отдельным вызовом API Google Places, теперь каждый из этих типов будет возвращать до 200 результатов, тип 1, тип 2 потенциально могут возвращать один и тот же элемент, но вместо того, чтобы отбрасывать один из них я просто хочу отслеживать его, добавляя идентификатор типа к элементу, уже находящемуся в коллекции, поэтому, если тип1 и тип2 возвращают один и тот же элемент, должен быть только один элемент со строкой 1|2, обозначающей оба этих нашел предмет. - person Zoinky; 07.02.2014

Это не означает, что Concurrent Bag не является потокобезопасным, это просто означает, что ваш код не является потокобезопасным.

Вы проверяете значение в параллельном пакете, а затем добавляете новый элемент, если предыдущая проверка не удалась.

Однако, поскольку в вашем коде нет блокировок, два задания могут одновременно выполнять следующее:

THREAD 1        THREAD 2
=-=-=-=-=-=-=-=-=-=-=-=-
Check Exists
                Check Exists
Add New
                Add New

Вам нужно заблокировать свой чек и добавить подпрограммы.

person Bob Vale    schedule 07.02.2014