TPL Dataflow - как вызвать несколько элементов элемента действия

Я новичок в TPL Dataflow. У меня есть список номеров проектов, которые мне нужно обработать. В проекте может быть около 8000 элементов, и мне нужно получить данные для каждого элемента в проекте, а затем отправить эти данные на 5 отдельных серверов.

Вот что я закодировал до сих пор. Я застрял на этапе загрузки этих данных на 5 серверов. Я не уверен, правильно ли это закодировано. Любой совет очень ценится.

public  static bool PushData(string projectId)
{
    var linkCompletion = new DataflowLinkOptions
    {
        PropagateCompletion = true
    };

    var projectItems = new TransformBlock<ProjectDTO, ProjectDTO>(
        dto => dto.GetItemData(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

    var itemData = new ActionBlock<ProjectDTO>(
         dto =>  PostEachServerAsync(dto, "server1", "setmemcache"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });


    projectItems.LinkTo(projectRules, linkCompletion);

    IList<ProjectDTO> dtoList = new List<ProjectDTO>();
    dtoList = MemcachedDTO.GetDataByProject(projectId);

    foreach (ProjectDTOd in dtoList)
    {
        projectItems.Post(d);
    }

    projectItems.Complete();
    projectItems.Completion.Wait();
    return false;
}

Вот мой код - но он не завершается должным образом - может ли кто-нибудь сказать мне, что я делаю не так?

             [HttpGet]
    public HttpResponseMessage ReloadItem(string projectQuery)
    {
        try
        {

            var linkCompletion = new DataflowLinkOptions
            {
                PropagateCompletion = true
            };

            IList<string> projectIds = projectQuery.Split(',').ToList();
            IEnumerable<string> serverList = ConfigurationManager.AppSettings["ServerList"].Split(',').Cast<string>();

            var iR = new TransformBlock<MemcachedDTO, MemcachedDTO>(
                dto => dto.GetIR(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 });

            var pR = serverList.Select(
                    i => new { Id = i, Action = new ActionBlock<MemcachedDTO>(dto => PostEachServerAsync(dto, i, "set"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }) });

            List<MemcachedDTO> dtoList = new List<MemcachedDTO>();

            foreach (string pid in projectIds)
            {
                IList<MemcachedDTO> dtoTemp = new List<MemcachedDTO>();
                dtoTemp = MemcachedDTO.GetItemIdsByProject(pid);
                dtoList.AddRange(dtoTemp);
            }


            foreach (var action in pR)
            {
                iR.LinkTo(action.Action, linkCompletion);
            }

            foreach (MemcachedDTO d in dtoList)
            {
                iR.Post(d);
            }
            iR.Complete();
            foreach (var action in pR)
            {
                action.Action.Completion.Wait();
            }


            return Request.CreateResponse(HttpStatusCode.OK, new { message = projectIds.ToString() + " reload success" });
        }
        catch (Exception ex)
        {
            return Request.CreateResponse(HttpStatusCode.InternalServerError, new { message = ex.Message.ToString() });
        }
    }

person klkj898    schedule 02.06.2017    source источник
comment
Если из 1000 ваших сообщений вы получили только 998, проверьте ошибки, могут быть исключения, о которых вы не знаете.   -  person VMAtm    schedule 04.06.2017


Ответы (1)


Ваш код вообще не компилируется, как вы его запускаете?

Прежде всего, не блокируйте свой поток с помощью .Wait(), используйте шаблон async/await вот. Во-вторых, вам понадобится BroadcastBlock, чтобы уведомить более 1 блока с помощью вашего данные. В-третьих, вам нужно 5 разных ActionBlock, а не 1 со степенью параллелизм 5. В-четвертых, вы ожидаете неправильного Completion задача - дождаться завершения последнего блока, а не первого, поэтому в вашем случае вам нужно дождаться завершения 5 блоков с _ 6_ метод.

Итак, ваш код может быть таким (я предполагаю, что projectRules и itemsData - это один и тот же блок):

public static async Task<bool> PushData(string projectId)
{
    var linkCompletion = new DataflowLinkOptions
    {
        PropagateCompletion = true
    };

    var projectItems = new TransformBlock<ProjectDTO, ProjectDTO>(
        dto => dto.GetItemData(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

    var broadcast = new BroadcastBlock<ProjectDTO>();
    projectItems.LinkTo(broadcast, linkCompletion);

    var pR = serverList.Select(
            i => new { Id = i, Action = new ActionBlock<MemcachedDTO>(dto => PostEachServerAsync(dto, i, "set"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }) });

    foreach (var action in pR)
    {
        broadcast.LinkTo(action.Action, linkCompletion);
    }

    var dtoList = MemcachedDTO.GetDataByProject(projectId);

    foreach (var d in dtoList)
    {
        projectItems.Post(d);
    }
    projectItems.Complete();

    // wait all the action blocks to finish
    await Task.WhenAll(projectRules1.Completion, projectRules2.Completion, projectRules3.Completion, projectRules4.Completion, projectRules5.Completion);
    return false;
}
person VMAtm    schedule 03.06.2017
comment
спасибо за ваш ответ - элементы проекта не находятся в том же блоке, что и данные элемента - это потому, что 1) я сначала получу элементы списка, которые соответствуют каждому из номеров проекта 2) Получите данные элемента на основе идентификаторов элементов из шаг 1) . После этого мне нужно будет отправить эти данные на 5 серверов. Я немного потерялся, как я могу перебирать идентификаторы проекта, чтобы получить идентификаторы элементов, а затем данные, соответствующие каждому идентификатору элемента - person klkj898; 03.06.2017
comment
Вот мой код, но он не завершается должным образом - что я делаю не так? - person klkj898; 03.06.2017
comment
1. Вы не использовали BroadcastBlock, поэтому сообщение получит только первый блок. 2. Вы блокируете свой поток с помощью Wait, что плохо, используйте asyncawait`. 3. SO - не место для компиляции кода за вас. Вы поняли идею, вы заставили ее работать. - person VMAtm; 03.06.2017
comment
Приносим извинения за неудобства. Это моя первая публикация в SO. Я отредактировал свой код, чтобы теперь его можно было скомпилировать. Я не уверен, что получаю BroadcastBlock. Не могли бы вы привести мне пример? Поэтому я в основном хотел бы отправить данные на все серверы, например, здесь var pR = serverList.Select (i = ›new {Id = i, Action = new ActionBlock ‹MemcachedDTO› (dto =› PostEachServerAsync (dto, i, set ), новый ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 3})}); - person klkj898; 03.06.2017
comment
Я уже приводил вам пример для трансляции. Сообщение распространяется на все связанные блоки. В вашем примере только первый получит сообщение - person VMAtm; 03.06.2017