Проблема проектирования программы, диспетчер файловой системы, многопоточность c #

Я разрабатываю программу на C #, которая использует FilesystemWatcher для отслеживания файлов PDF, которые добавляются в monitor_directory. Каждый раз, когда файл добавляется в каталог, я добавляю его в BlockingQueue, который постоянно появляется в другом потоке бесконечного цикла while, он ждет там, пока будут добавлены пути к файлам, после чего я продолжаю обработку файлов, последнюю часть обработка файла PDF перемещает его в выходной каталог.

Поток отправки:

    private static void ThreadProc(object param)
    {

        FileMonitorManager _this = (FileMonitorManager)param;
        FileProcessingManager processingManager = new FileProcessingManager();
        processingManager.RegisterProcessor(new ExcelFileProcessor());
        processingManager.RegisterProcessor(new PdfFileProcessor());

        while (true)
        {
            try
            {
                var path = (string)_this.FileQueue.Dequeue();
                if (path == null)
                    break;
                bool b = processingManager.Process(path);
                if (!b)
                {
                    _this.FileQueue.Enqueue(path);
                    Console.WriteLine("\n\nError on file: " + path);
                }
                else
                    Console.WriteLine("\n\nSucces on file: " + path);

            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
    }

Функция «Обработка» проверяет, существует ли файл, выполняет некоторую обработку и перемещает файл PDF в выходной каталог.

Я столкнулся с двумя проблемами: 1. Обработчик событий On_Create из FileSystemWatcher запускается ДВАЖДЫ, поэтому BlockingQueue имеет одну и ту же запись дважды, в этом случае в процедуре обработки я проверяю, не был ли файл перемещен в выходной каталог (потому что то есть заключительная часть обработки заключается в перемещении файла туда), если это так, я продолжаю обработку, если нет, я выхожу. 2. Если по какой-то причине я получаю сообщение об ошибке при доступе к содержимому файла, говорящее: файл используется другим процессом, я возвращаю FALSE из функции Process и снова добавляю путь к файлу в очередь.

Теперь ... Это работает, но работает медленно ... Как я могу сделать это в многопоточном режиме, учитывая 2 проблемы, с которыми я столкнулся ... Изменить: Что, если я получу событие , добавьте его в очередь, он появляется, очередь пуста, а затем я снова получаю то же событие, очередь пуста, поэтому он добавляется, и в основном я получаю то же событие, обработанное ДВАЖДЫ?


person AlexandruC    schedule 19.06.2013    source источник


Ответы (2)


1) FileSystemWatcher уведомляет вас дважды, потому что файл обновляется в два этапа: сначала данные, затем метаданные. Таким образом, вы можете проверить, что последняя запись еще не была учтена, используя что-то вроде:

File.GetLastWriteTime(file);

Или вы можете проверить дубликаты.

2) Вы не используете многопоточность: вы обрабатываете один файл за раз, поэтому вы можете создать несколько потоков для выполнения метода Process, например использовать:

ThreadPool.QueueUserWorkItem
person Pragmateek    schedule 19.06.2013
comment
Вы можете сохранить карту, связывающую каждый файл с временем последнего редактирования. Затем, если вы видите, что время последней модификации, полученное с помощью GetLastWriteTime, такое же, как и при обработке файла, вы знаете, что делать нечего, и можете проигнорировать его. Вы даже можете отменить текущую обработку и перезапустить ее с текущим файлом, если считаете, что произошли изменения. - person Pragmateek; 19.06.2013
comment
Очень интересно, работает ли эта стратегия для того же файла, снова добавляемого в каталог монитора? Остается ли writeTime таким же, если я несу файл повсюду? Когда мне чистить карту? И в каком случае я получу только одно уведомление для каждого файла? потому что в настоящее время я получаю только один, я не знаю, что это происходит - person AlexandruC; 19.06.2013
comment
Если файл будет добавлен снова, дата его изменения изменится, и ваш код будет знать, что он должен его обработать. Что вы имеете в виду под повсюду: в одной и той же файловой системе включенная сеть должна быть в порядке (но не уверена). Карту чистить не нужно. FileSystemWatcher может вести себя случайным образом (даже при отсутствии уведомления при большой нагрузке, не уверен, что это его ошибка или базовый API ОС), поэтому да, вы можете иметь одно или два уведомления о том, что вы определяете как одно обновление (я наблюдал это тоже в сетевой файловой системе). - person Pragmateek; 19.06.2013
comment
Дело в том, что я получаю одно уведомление только для одного файла, неоднократно. Указывается ли где-нибудь о количестве уведомлений, которые я должен получать? Повсюду: в одной файловой системе, сети и т. Д. (Да) - person AlexandruC; 20.06.2013
comment
Нет, я не думаю, что это четко определено и может зависеть от текущей загрузки системы; вот почему вы должны сами создать какие-то меры безопасности :(: подождите немного, прежде чем начинать обработку, проверьте, запущена ли обработка, отмените обработку и замените ее новой ... - person Pragmateek; 20.06.2013

FileSystemWatcher печально известен своей болтливостью.

Думаю, я бы так и поступил ...

  1. Убедитесь, что в BlockingQueue уже есть запись для рассматриваемого файла, прежде чем добавлять ее во второй раз из вызова On_Create.
  2. Ожидаете ли вы, что в вашей очереди будет много нулевых путей? Надеюсь, проверка нуля - это всего лишь мера предосторожности. Но не ставьте пустые пути в очередь, если можете.
  3. В ваших рабочих потоках просто Dequeue и обработайте
  4. Если ваш рабочий поток получает ошибку при обработке, вы можете снова поставить его в очередь или вы можете отложить его как исключительный случай, поскольку, если у вас будет достаточно необработанных файлов, они могут завалить вашу очередь и замедлить вас.

Простой способ сделать это многопоточным - просто запускать новую задачу каждый раз, когда вы удаляете путь из очереди ...

    Task.Factory.StartNew(() =>
        {
            try
            {
                var path = (string) _this.FileQueue.Dequeue();
                if (path == null)
                    break;
                bool b = processingManager.Process(path);
                if (!b)
                {
                    _this.FileQueue.Enqueue(path);
                    Console.WriteLine("\n\nError on file: " + path);
                }
                else
                    Console.WriteLine("\n\nSucces on file: " + path);

            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        });

Для производственного кода вы также захотите передать токен отмены в задачу и иметь механизм для остановки цикла и задач.

person Kevin    schedule 19.06.2013
comment
Я намеренно ставлю в очередь нулевое значение, когда хочу, чтобы бесконечный цикл остановился. Другой вопрос, будет ли многопоточность этих операций ввода-вывода делать программу быстрее? - person AlexandruC; 19.06.2013
comment
Если предположить, что processingManager.Process выполняет интенсивную работу с ЦП, то почти наверняка да. Это предполагает, что вы привязаны к процессору ... Если вы привязаны к вводу-выводу, то нет. - person Kevin; 19.06.2013
comment
@Pragmateek: да, похоже, есть новый способ управления / использования потоков с каждым выпуском .NET. - person Kevin; 19.06.2013
comment
@ A.K вместо добавления нуля в очередь можно использовать свойство IsAddingCompleted в коллекции BlockingCollection. - person John Atwood; 19.06.2013
comment
.NET 3.5 здесь, нет BlockingCollection - person AlexandruC; 19.06.2013