Поток для обработки нескольких остальных вызовов

Я пытаюсь обработать около 1000 файлов, используя следующий код:

ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
Runnable worker = null;
for (File file : files) {
    if (file.isFile()) {
        worker = new FileProcessThread(file, connectionVo);
        executor.execute(worker);
        file.deleteOnExit();
    }
}
while (!executor.isTerminated()) {
    System.out.println("Still running");
}
executor.shutdown();
System.out.println("Finished all threads");

Этот код создает несколько потоков. Каждый поток имеет несколько вызовов rest внутри. Эти остальные API предназначены для обработки входного файла. Каждый поток также регистрирует каждое событие транзакции, которое происходит во время обработки. Но результат выполнения этих потоков несовместим.

  1. Для нескольких потоков он работает отлично. Выбирает файл. Регистрирует правильные
    транзакции и перемещает обработанный файл в нужный каталог.
  2. Но для некоторых потоков он показывает некоторое непредсказуемое поведение, например, регистрирует событие обработки файла одного потока в другом.

Шаги в каждом потоке:

  1. Создать транзакцию - оставшийся вызов
  2. Регистрировать событие в транзакции для запуска процесса — остаточный вызов
  3. Передает файл другому модулю для преобразования файла - остаточный вызов, который внутренне
    создает еще один поток, который синхронизируется
  4. После обработки файл перемещается в другой - в тот же каталог кода.

Мне нужна стабильная производительность этих потоков. Любая помощь будет оценена.

Код внутри запуска:

 long transactionID = 0l;
    long connectionId = connectionVo.getConnectionId();
    try {
        transactionID = beginTransaction.getTransactionId();
        FileInputStream processedFileData;
        processedFileData = new FileInputStream(file);

        Response response = Service.postMessage(stream2file,              
        connectionId, 0, transactionID);

        if (response.getStatus() != 200) {
              writToDirectory(stream2file, userError, file.getName(), transactionID);
            }

        } else {
            String userArchive = getUserArchive();
            if (checkDirectory(userArchive, transactionID)) {
                writToDirectory(stream2file, userArchive, file.getName(), transactionID);
            }

        }
        file.delete();

    } catch (FileNotFoundException e) {
    } 

person ncp    schedule 12.01.2016    source источник
comment
Я думаю, что ваш threadPoolSize должен быть меньше 1000, что является количеством обрабатываемых файлов. Таким образом, поток будет повторно использоваться для работы с разными файлами.   -  person tianwei    schedule 12.01.2016
comment
В настоящее время я сохранил threadPoolSize только как 10   -  person ncp    schedule 12.01.2016
comment
Кроме того, если я это понимаю - это только о журнале, который запутался. Если это так, то вы можете заглянуть в Future. Отправьте свою задачу службе-исполнителю, чтобы получить будущую ссылку и, используя ее, запросить результат. Вы можете попробовать вернуть строку завершения, которая в вашем случае может быть rest call name - log event - tran id, и записать ее.   -  person Tirath    schedule 12.01.2016
comment
вы также можете добавить идентификатор потока и имя обрабатываемого файла в зарегистрированное сообщение, чтобы было понятнее, какое сообщение приходит из какого потока   -  person Gavriel    schedule 12.01.2016
comment
Вы также можете опубликовать код для FileProcessThread, так как это, вероятно, вызывает проблему. Вы, вероятно, используете некоторые небезопасные вызовы там.   -  person Gavriel    schedule 12.01.2016
comment
Показанный здесь код ничего не говорит о проблеме. Это связано с тем, как реализован FileProcessThread. Вы передаете connectionVo, что звучит подозрительно (обычно вы не разделяете соединения между потоками)   -  person Enno Shioji    schedule 12.01.2016
comment
Вы удаляете файлы при выходе, а не при успешной обработке, поэтому, если ваше приложение умирает до обработки каждого файла, они все равно могут быть удалены. Я предлагаю вам удалять только те файлы, которые были успешно обработаны, и перемещать файлы, которые не удалось выполнить, в другой каталог.   -  person Peter Lawrey    schedule 12.01.2016


Ответы (1)


Я предлагаю вам использовать Java 8 для многопоточности, поскольку она намного чище.

files.parallelStream()
     .filter(File::isFile)
     .forEach(f -> new FileProcessThread(file, connectionVo).run());

Ваша задача удаляет файл после успешного завершения.

Это будет передавать каждый файл только одной задаче.

Кстати, не называйте свои задачи xxxThread, если они на самом деле не являются потоками, и избегайте подклассов потоков.

person Peter Lawrey    schedule 12.01.2016