Как получить доступ к запущенным потокам внутри ThreadPoolExecutor?

У меня есть очередь запущенных потоков, и я хотел бы выставить некоторые из ее данных во время ее выполнения, чтобы отслеживать процесс.

ThreadPoolExecutor предоставляет доступ к своей очереди, и я могу перебирать эти объекты, чтобы вызвать свой переопределенный метод toString(), но это только потоки, ожидающие выполнения.

Есть ли способ получить доступ к потокам, которые в настоящее время выполняются, для вызова моего метода? Или, может быть, есть лучший подход для этой задачи в целом?

Чтобы прояснить немного больше о цели, вот некоторый код общей идеи:

public class GetDataTask implements Runnable {
    private String pageNumber;
    private int dataBlocksParsed;
    private String source;
    private String dataType;


    public GetDataTask(String source, String dataType) {
        this.source = source;
        this.dataType = dataType;
    }

    @Override
    public void run() {
        //do stuff that affects pageNumber and dataBlocksParsed
    }

    @Override
    public String toString() {
        return "GetDataTask{" +
            "source=" + source +
            ", dataType=" + dataType +
            ", pageNumber=" + pageNumber +
            ", dataBlocksParsed=" + dataBlocksParsed +
            '}';
    }
}

и класс, содержащий исполнителя:

public class DataParseManager {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));

    public void addParseDataTask(String source, String dataType) {
        executor.execute(new GetDataTask(source, dataType));
    }

    // here's the method that I need
    public String getInfo() {
        StringBuilder info = new StringBuilder();
        //and here's the method that I'm missing - executor.getActiveThreads()
        for (Runnable r : executor.getActiveThreads()) {
            info.append(((GetDataTask) r).toString()).append('\n');
        }
        return info.append(executor.toString()).toString();
   }
}

person and_rew    schedule 23.02.2016    source источник
comment
Тема или задача? Есть разница! Из любой выполняющейся задачи вы можете использовать Thread.currentThread, чтобы перейти к потоку, который ее выполняет, и получить информацию. И вы, конечно, можете сохранить ссылки на все отправленные задачи, чтобы получить информацию из них.   -  person Fildor    schedule 23.02.2016
comment
Зачем вам очередь из запущенных потоков? Вы имеете в виду, что у вас есть пул потоков? Если вы хотите отслеживать, что делают ваши задачи, я предлагаю вам периодически обновлять некоторую информацию о том, что они делают, чтобы вы могли отслеживать это.   -  person Peter Lawrey    schedule 23.02.2016
comment
О, только что понял, я, вероятно, неправильно понял вопрос. Вы уже используете ExecutorService? То, что я написал выше, имеет смысл только в этом случае.   -  person Fildor    schedule 23.02.2016
comment
Да, ребята, извините. Я отредактировал исходное сообщение от ExecutorService до ThreadPoolExecutor, но правки вектора изменили его, и я этого не заметил.   -  person and_rew    schedule 23.02.2016
comment
Можете ли вы немного уточнить, что вы хотите показать и где? Может немного кода? Например, если вы хотите просто распечатать имена потоков, которые берут на себя задачу, вы можете сделать это внутри метода run, просто добавив с помощью Thread.currentThread и напечатав его имя и идентификатор задачи.   -  person Fildor    schedule 23.02.2016
comment
@Fildor Только что обновил вопрос. Данные, которые я хочу предоставить, обновляются во время выполнения потока, поэтому здесь мне нужна дополнительная функциональность.   -  person and_rew    schedule 23.02.2016
comment
Подход может состоять в том, чтобы активно обновлять общий объект статистики из метода run Задач и получать информацию из этого объекта, а не непосредственно из Задач.   -  person Fildor    schedule 23.02.2016
comment
@Fildor Одна из причин, почему я не использовал этот подход, заключалась в том, что я хотел бы иметь возможность получать данные, даже если в настоящее время работает метод (внутри run() есть другие объекты, которые могут зависнуть). Но хотелось бы попробовать все три подхода, так как предложение saka1029 теперь тоже выглядит более ясным, и каждый ответ имеет подсказку. Большое спасибо!   -  person and_rew    schedule 23.02.2016


Ответы (5)


Как насчет обернуть Runnable вот так.

static class MonitorRunnable implements Runnable {

    static final List<Runnable> activeTasks = Collections.synchronizedList(new ArrayList<>());

    private final Runnable runnable;

    public MonitorRunnable(Runnable runnable) {
        this.runnable = runnable;
    }

    @Override
    public void run() {
        activeTasks.add(runnable);
        runnable.run();
        activeTasks.remove(runnable);
    }
}

а также

public class DataParseManager {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));

    public void addParseDataTask(String source, String dataType) {
        executor.execute(new MonitorRunnable(new GetDataTask(source, dataType)));
    }

    // here's the method that I need
    public String getInfo() {
        StringBuilder info = new StringBuilder();
        //and here's the method that I'm missing - executor.getActiveThreads()
        synchronized (MonitorRunnable.activeTasks) {
            for (Runnable r : MonitorRunnable.activeTasks) {
                info.append(((GetDataTask) r).toString()).append('\n');
            }
        }
        return info.append(executor.toString()).toString();
   }
}
person saka1029    schedule 23.02.2016
comment
Сначала не понял, а теперь вижу. Интересный подход, попробую. Благодарю вас! - person and_rew; 23.02.2016
comment
Итак, я остановился на этом варианте, потому что он выглядел более быстрым для применения к моему существующему коду и казался немного более элегантным. Пока работает хорошо. Также добавлено toString() в MonitorRunnable для просмотра данных задачи из очереди пула. Спасибо всем! И особая благодарность @Fildor за дополнительные усилия. :) - person and_rew; 29.02.2016

Всякий раз, когда вы добавляете поток в очередь, также добавляйте его во вторую структуру данных, скажем, в HashSet. Затем, если вам нужно получить доступ к работающему потоку, вы можете проверить очередь ExecutorService, чтобы найти потоки, которые все еще ожидают выполнения: каждый поток в вашем HashSet, который еще не находится в очереди ExecutorService, в настоящее время выполняется.

person Thomas    schedule 23.02.2016
comment
Томас, спасибо за предложение. Я отредактировал исходный вопрос, но ваш подход все равно должен работать. За исключением того, что я не уверен, как правильно управлять готовыми потоками. И мне нужно было бы удалить его вручную из набора. Но если нет более элегантного способа... - person and_rew; 23.02.2016
comment
@and_rew Потоки в ThreadPoolExecutor не заканчиваются, как когда вы создаете поток и запускаете его. Они будут использованы повторно. То, что завершится, - это исполняемый или вызываемый объект, который вы отправляете в очередь для выполнения. - person Fildor; 23.02.2016
comment
Вам не обязательно слушать, пока потоки закончатся. Вместо этого всякий раз, когда вы хотите получить доступ к запущенным потокам, перебирайте HashSet, игнорируйте те, которые находятся в очереди ExecutorService, а также пропускайте те (и удаляйте их), которые завершены. - person Thomas; 23.02.2016

Как я написал в комментарии. Я бы сделал активное обновление для общего подхода к объекту статистики:

Я бы изменил задачу следующим образом:

public class GetDataTask implements Runnable {
    private String pageNumber;
    private int dataBlocksParsed;
    private String source;
    private String dataType;
    HashMap<GetDataTask,String> statistics


    public GetDataTask(String source, String dataType, HashMap<GetDataTask,String> statistics) {
        this.source = source;
        this.dataType = dataType;
        this.statistics = statistics;
    }

    @Override
    public void run() {
        // you'll probably want to immediately have stats available:
        statistics.put(this, this.toString());

        //do stuff that affects pageNumber and dataBlocksParsed
        // vv this will probably be inside your "do stuff" loop
        statistics.put(this, this.toString());
        // loop end

        // if you do not want stats of finished tasks, remove "this" here.
    }

    @Override
    public String toString() {
        return "GetDataTask{" +
            "source=" + source +
            ", dataType=" + dataType +
            ", pageNumber=" + pageNumber +
            ", dataBlocksParsed=" + dataBlocksParsed +
            '}';
    }
}

и менеджер:

public class DataParseManager {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));

    private HashMap<GetDataTask,String> stats = new ConcurrentHashMap<GetDataTask,String>();       

    public void addParseDataTask(String source, String dataType) {
        executor.execute(new GetDataTask(source, dataType, stats));
    }

    // here's the method that I need
    public String getInfo() {
        StringBuilder info = new StringBuilder();
        //and here's the method that I'm missing - executor.getActiveThreads()

        // >>> iterate "stats"'s values to build the info string ...            

        return info.append(executor.toString()).toString();
   }
}

ОБНОВИТЬ

Вы можете легко изменить этот подход к извлечению информации, повторяя ключи (которые являются исполняемыми задачами) и вызовите для них toString. Однако это очень похоже на подход саки. Может быть, вы чувствуете себя более комфортно с ним.

person Fildor    schedule 23.02.2016

Поскольку у вас есть контроль над используемым исполнителем, я бы использовал методы beforeExecute и afterExecute ThreadPoolExecutor для отслеживания запущенных задач и использовал их для создания метода getActiveTasks.

import java.util.Set;
import java.util.concurrent.*;

public class ActiveTasksThreadPool extends ThreadPoolExecutor {

    private final ConcurrentHashMap<Runnable, Boolean> activeTasks = new ConcurrentHashMap<>();

    public ActiveTasksThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {

        activeTasks.put(r, Boolean.TRUE);
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        super.afterExecute(r, t);
        activeTasks.remove(r);
    }

    public Set<Runnable> getActiveTasks() {
        // the returned set will not throw a ConcurrentModificationException.
        return activeTasks.keySet();
    }

    public static void main(String[] args) {

        final int maxTasks = 5;
        ActiveTasksThreadPool tp = new ActiveTasksThreadPool(maxTasks, maxTasks, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        try {
            System.out.println("Active tasks: " + tp.getActiveTasks());
            final CountDownLatch latch = new CountDownLatch(1); 
            for (int i = 0; i < maxTasks; i ++) {
                final int rnumber = i;
                tp.execute(new Runnable() {
                    @Override
                    public void run() {
                        try { latch.await(); } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    @Override
                    public String toString() {
                        return "Runnable " + rnumber;
                    }
                });
            }
            Thread.sleep(100L); // give threads a chance to start
            System.out.println("Active tasks: " + tp.getActiveTasks());
            latch.countDown();
            Thread.sleep(100L); // give threads a chance to finish
            System.out.println("Active tasks: " + tp.getActiveTasks());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            tp.shutdownNow();
        }
    }

}
person vanOekel    schedule 23.02.2016

Вам просто нужно где-то хранить ссылки на запущенные потоки, которые будут запускаться в ThreadPoolExecutor, добавляя поверх других ответов, это пример небольшого приложения, которое считывает состояния потоков, выполняемых внутри ThreadPoolExecutor, каждую 1 секунду до выключения:

package sample;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);

        for (int i = 1; i <= 10; i++)
        {
            Task task = new Task("Task " + i);
            executor.execute(task);
        }

        executor.shutdown();

        try {
            while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                System.out.println("Awaiting completion of threads, threads states: " + Task.getThreadsStateCount());
            }

        } catch (InterruptedException e) {
        }

        System.out.println("Executor shutdown -> " + executor.isShutdown());
    }
}

class Task implements Runnable {

    static final List<Thread> activeTasks = Collections.synchronizedList(new ArrayList<>());
    static final Random r = new Random();

    private String name;

    public Task(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        Thread t = Thread.currentThread();
        System.out.println("current thread : " + t.getName() + " group " + t.getThreadGroup() + " state " + t.getState());
        activeTasks.add(t);

        try {
            int tries = 0;

            while (tries < 10) {
                int randomNum = r.nextInt(10000);
                // do some expensive computation
                for(int i = 0; i < 4; i++) {
                    isPrime(r.nextLong());
                }

                // now sleep
                Thread.sleep(randomNum);
                tries++;
            }

        } catch (InterruptedException e) {
        }

        System.out.println("completed task for thread : " + t.getName() + " group " + t.getThreadGroup() + " state " + t.getState());
    }

    static boolean isPrime(long n)
    {
        if (n <= 1)
            return false;
        if (n <= 3)
            return true;

        if (n % 2 == 0 || n % 3 == 0)
            return false;

        for (int i = 5; i * i <= n; i = i + 6)
            if (n % i == 0 || n % (i + 2) == 0)
                return false;

        return true;
    }

    public static String getThreadsStateCount() {
        return "NEW: " + getCountThreadsState(Thread.State.NEW) +
                " ,RUNNABLE: " + getCountThreadsState(Thread.State.RUNNABLE) +
                " ,WAITING: " + getCountThreadsState(Thread.State.WAITING) +
                " ,TIMED_WAITING: " + getCountThreadsState(Thread.State.TIMED_WAITING) +
                " ,BLOCKED: " + getCountThreadsState(Thread.State.BLOCKED) +
                " ,TERMINATED: " + getCountThreadsState(Thread.State.TERMINATED);
    }

    public static long getCountThreadsState(Thread.State state) {
        return activeTasks.stream().filter(x -> x.getState() == state).count();
    }
}

// печатает что-то вроде:

Ожидание завершения потоков, состояния потоков: NEW: 0, RUNNABLE: 1, WAITING: 0, TIMED_WAITING: 9, BLOCKED: 0, TERMINATED: 0

person guilhebl    schedule 18.10.2019