Как завершить поток в службе-исполнителе, если поток занимает слишком много времени?

Пример службы исполнителя

 static class MyRunnable implements Runnable {

    private String serverName;

    public MyRunnable(String serverName) {
        super();
        this.serverName = serverName;
    }

    @Override
    public void run() {
        ...
        conn = new ch.ethz.ssh2.Connection(serverName);
        conn.connect();

        boolean isAuthenticated = conn.authenticateWithPassword(user, pass);
        logger.info("Connecting to " + server);

        if (isAuthenticated == false) {
            logger.info(server + " Please check credentials");
        }

        sess = conn.openSession();
        ...

    }

}

public static void main(String[] args) {
    List<String> serverList = ...;
    ExecutorService executor = Executors.newFixedThreadPool(20);

    for (String serverName : serverList) {
        MyRunnable r = new MyRunnable(serverName);
        executor.execute(r);
    }

    executor.shutdown();
    executor.awaitTermination(1, TimeUnit.HOURS);
}

Вот пример кода моей службы-исполнителя. Но с этой логикой, когда я встречаю сервер, который не может подключиться или слишком долго подключается, это создает время зависания в моем приложении. Я хочу завершить/убить поток, если для подключения требуется больше x времени. Как я могу завершить задачу потока, если он не подключается к серверу в течение 2 секунд.

Попытка

       ThreadPoolExecutor executor = new ThreadPoolExecutor(
                10, 25, 500, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1));

Я добавил следующий код, но, по-видимому, он не завершает поток, если он занимает более 2000 миллисекунд.

Попытка 2

Future<?> future = executor.submit( new task));
            try {
                future.get(2000, TimeUnit.MILLISECONDS); // This waits timeout seconds; returns null
            }

            catch(TimeoutException e) {
                future.cancel(true);
               // System.out.println(server + "name");
            } 

person Jesse    schedule 15.06.2017    source источник
comment
Закройте соединение из другого потока, чтобы вызвать исключение в Runnable.   -  person JimmyB    schedule 15.06.2017
comment
Существует еще один метод подключения, который использует тайм-аут.   -  person assylias    schedule 15.06.2017


Ответы (4)


Как я могу завершить задачу потока, если он не подключается к серверу в течение 2 секунд.

Обычно это сложно сделать, потому что даже если вы прервете поток (как упоминается в других ответах), нет гарантии, что поток остановится. Прерывание просто устанавливает флаг в потоке, и код должен определить состояние и остановить. Это означает, что тонны потоков могут находиться в фоновом режиме в ожидании соединений.

Однако в вашем случае вы используете метод ch.ethz.ssh2.Connection.connect(). Оказывается, есть метод подключения, требующий таймаута. Я думаю, вы хотите следующее:

// try to connect for 2 seconds
conn.connect(null, 2000, 0);

Цитата из подключить метод javadocs:

В случае тайм-аута (либо connectTimeout, либо kexTimeout) генерируется исключение SocketTimeoutException.

person Gray    schedule 16.06.2017
comment
Я предполагаю, что null должно быть моим именем сервера/хоста? API показывает, что это верифер, я должен оставить его нулевым? - person Jesse; 19.06.2017
comment
Все, что я видел @Jesse, это то, что вызов connect() такой же, как connect(null, 0,0). Я думаю, что это верификатор. - person Gray; 19.06.2017
comment
Большое спасибо, сэр. Я просматривал API, и этот метод ускользнул из моего поля зрения. Он работает по назначению. - person Jesse; 19.06.2017

Вы должны сначала выполнить awaitTermination(), затем проверить возвращаемое значение, а затем выполнить shutdownNow(). shutdown() не гарантирует мгновенную остановку службы, он просто прекращает принимать новые задания и ожидает завершения всех заданий по порядку. shutdownNow(), с другой стороны, прекращает принимать новые задания, активно пытается остановить все запущенные задачи и не запускает новые, возвращая список всех заданий, ожидающих выполнения.

Из JavaDocs:

Следующий метод завершает работу ExecutorService в два этапа: сначала вызывается shutdown, чтобы отклонить входящие задачи, а затем, если необходимо, вызывается shutdownNow, чтобы отменить любые зависшие задачи:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }
person Subhranil    schedule 15.06.2017
comment
Почему -1 любой? Ответ кажется законным - person Andrey Cheboksarov; 15.06.2017

Вы всегда можете вызвать future.get(timeout...) Он вернет исключение тайм-аута, если он еще не закончился... тогда вы можете вызвать future.cancel().

person Akash    schedule 15.06.2017

Пока вы имеете дело с потоками в Java, единственный безопасный способ остановить поток — прервать его. Вы можете сначала позвонить shutdown(), а затем подождать. Этот метод не прерывает потоки.

Если это не помогает, вы вызываете shutdownNow(), который пытается отменить задачи, устанавливая флаг прерывания каждого потока в значение true. В этом случае, если потоки заблокированы/ожидают, будет выдано InterruptedException. Если вы проверите флаг прерывания где-то внутри своих задач, то у вас тоже все хорошо.

Но если у вас нет другого выбора, кроме как остановить потоки, вы все равно можете это сделать. Одним из возможных решений получения доступа к воркерам является трассировка всех созданных потоков внутри ThreadPoolExecutor с помощью собственной фабрики потоков.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class TestThreadPoolEx {

    static class CustomThreadFactory implements ThreadFactory {
        private List<Thread> threads = new ArrayList<>();

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            threads.add(t);
            return t;
        }

        public List<Thread> getThreads() {
            return threads;
        }

        public void stopThreads() {
            for(Thread t : threads) {
                if(t.isAlive()) {
                    try {
                        t.stop();
                    } catch (Exception e) {
                        //NOP
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        CustomThreadFactory factory = new CustomThreadFactory();
        ExecutorService ex = Executors.newFixedThreadPool(1, factory);
        ex.submit(() -> {
            while(true);
        });
        ex.shutdown();
        ex.awaitTermination(5, TimeUnit.SECONDS);
        ex.shutdownNow();
        ex.awaitTermination(5, TimeUnit.SECONDS);
        factory.stopThreads();
    }
}

Это, безусловно, небезопасно, но должно соответствовать вашим требованиям. В этом случае он может остановить цикл while(true). Отмена задач не сможет этого сделать.

person Andrey Cheboksarov    schedule 15.06.2017