Как запустить несколько ExecutorServices, чтобы делать запросы и направлять ответ через единый интерфейс?

Этот вопрос является архитектурной проблемой, которую я не смог понять.

У меня есть TaskScheduler, у которого есть такие операции, как start () и stop (). TaskScheduler предназначен быть агностиком, я хочу иметь возможность передавать в него любые «Runnable», «UID» и «Interval», в течение которых должна работать служба. Все это добавляется в хэш-карту, так что если вы попытаетесь передать существующий исполняемый файл с тем же UID, он заменит предыдущий исполняемый файл новой информацией.

Расширением TaskScheduler является MyScheduler, который относится к запросу, который я хочу сделать. В этом примере я делаю несколько запросов профиля каждые 60 секунд. Чтобы отслеживать, какой из запросов профиля является каким, я использую UID в качестве ключа.

Затем я хочу отобразить ответы на уровне приложения от MyScheduler. Вот где у меня проблемы. Я могу только получить ответ от последнего планировщика. Таким образом, если я создам планировщик A и планировщик B, я буду получать обновления только от планировщика B. Аналогично, если я создам планировщик A-C, то я буду получать обновления только от планировщика C.

Я знаю, почему это так, MyScheduler использует последний запрос, который был ему передан. Однако я не знаю хорошей схемы (методологии) для решения этой проблемы.

TaskScheduler класс

public class TaskScheduler {

    private static Map<String, SchedulerModel> schedulerModels = new HashMap<>();

    TaskScheduler() {}

    private ScheduledFuture<?> start(@NotNull final SchedulerModel schedulerModel) {
        return schedulerModel.executorService.scheduleWithFixedDelay(schedulerModel.runnable, 0, schedulerModel.interval, TimeUnit.SECONDS);
    }

    /**
     * Method is used to onSchedulerStop executing tasks
     */
    private void shutdown(@NotNull SchedulerModel schedulerModel) {
        if (schedulerModel.executorService != null) {
            schedulerModel.executorService.shutdownNow();
            schedulerModel.executorService = null;
        }
    }

    /**
     * Method is used to initialize scheduler task and time delays
     *
     * @param runnable Represents a command that can be executed
     * @param interval The time interval for execution of code
     */
    void setTask(Runnable runnable, String uid, int interval) {
        SchedulerModel schedulerModel = new SchedulerModel();
        schedulerModel.executorService = Executors.newSingleThreadScheduledExecutor();
        schedulerModel.runnable = runnable;
        schedulerModel.interval = interval;
        schedulerModels.put(uid, schedulerModel);
    }

    public void stop(@NotNull String uid) {
        if (schedulerModels.get(uid) != null) {
            shutdown(schedulerModels.get(uid));
            schedulerModels.remove(uid);
        } else {
            // scheduler id not found
        }
    }


    public void start(@NotNull String uid) {
        if (schedulerModels.get(uid) != null) {
            start(schedulerModels.get(uid));
        } else {
            // scheduler id not found
        }
    }

}

MyScheduler (это имя временно)

public class MyScheduler extends TaskScheduler {

    private final int DEFAULT_SCHEDULER_INTERVAL = 60; // seconds

    private ProfileRequest request;
    private ApiInterface apiInterface;
    private SchedulerInterface schedulerInterface;


    public MyScheduler() {}

    public void createScheduler(@NotNull ApiInterface apiInterface,
                               @NotNull ProfileRequest request,
                               @NotNull SchedulerInterface schedulerInterface) {

        this.apiInterface = apiInterface;
        this.request = request;
        this.schedulerInterface = schedulerInterface;
        super.setTask(new SchedulerRunnable(), request.getUid(), DEFAULT_SCHEDULER_INTERVAL);
    }

    public void start(@NotNull String uid) {
        start(uid); // start scheduler
        schedulerInterface.onSchedulerStart(uid); // send feedback to callback
    }

    public void stop(@NotNull String uid) {
        stop(uid); // stop scheduler
        schedulerInterface.onSchedulerStop(uid); // send feedback to callback
    }

    private class SchedulerRunnable implements Runnable {

        @Override
        public void run() {

            ApiClient.createBookmark(request, new Callback<Response>() {
                @Override
                public void onSuccess(@NotNull Response response) {
                    schedulerInterface.onSuccess(response);
                }

                @Override
                public void onFailure(@NotNull Exception exception) {
                    schedulerInterface.onFailure(exception);
                }
            });
        }
    }
}

Попытка достичь этого на уровне приложения

    mProfileScheduler.createScheduler(apiInterface, request, new SchedulerInterface {

    Override
    public void onSuccess(Response response) {
       // problem so far is that I only get response from latest scheduler
    }

    Override
    public void onFailure(Exception exception) {}

    Override
    public void onSchedulerStop(String uid) {
       // pass back uid so that I know which profile scheduler was stopped
    }

    Override
    public void onSchedulerStart(String uid) {}
       // pass back uid so that I know which profile scheduler was started
    }
});

person portfoliobuilder    schedule 04.09.2019    source источник


Ответы (1)


У вас есть эта проблема, потому что schedulerInterface является участником MyScheduler. Таким образом, он используется для всех задач и перезаписывается после отправки каждой новой задачи.

Решение здесь - сделать schedulerInterface членом SchedulerRunnable:


private class SchedulerRunnable implements Runnable {
  private SchedulerInterface schedulerInterface;

  SchedulerRunnable(SchedulerInterface schedulerInterface) {
    this.schedulerInterface = schedulerInterface;
  }
}

Чтобы вызвать onSchedulerStop() и onSchedulerStart(), вы можете сделать start() и stop в TaskScheduler возвратом Runnable. Затем в MyTaskScheduler вы должны преобразовать его в SchedulerRunnable, чтобы получить ссылку на schedulerInterface.

Если вы не хотите, чтобы Runnable возвращался как часть общедоступного интерфейса, вы можете создать защищенные методы, например. Runnable doStart() и Runnable doStop, которые могут быть отменены и вызываются void start() и void stop().


Другие вопросы

Параллелизм

Вы используете HashMap вместо TaskScheduler schedulerModels. Это не поточно-ориентированное. Это нормально, если вы не собираетесь обращаться к нему более чем из одного потока. В противном случае вы можете столкнуться с проблемами с условиями гонки и видимостью памяти.

Вы должны использовать ConcurrentHashMap и его атомарные операции, такие как computeIfPresent() или computeIfAbsent() вместо put.

Управление ресурсами

  1. Когда вы заменяете существующую задачу новой с тем же UID, вы не останавливаете ее службу-исполнитель и не отменяете текущую запущенную задачу. Таким образом, вы собираетесь утечь потоки, и предыдущий запуск продолжится.

  2. Вы создаете новый SingleThreadExecutorService для каждой задачи. Это делает количество используемых потоков потенциально неограниченным и затрудняет какие-либо гарантии относительно потребления ресурсов приложением. Обычно вы должны использовать пул потоков с фиксированным количеством потоков, которые повторно используются между задачами.

Опять же, я предлагаю прочитать книгу «Java Concurrency in Practice», чтобы узнать об этих проблемах и шаблонах для их решения.


Полное решение

После разговора в чате это мое предложенное решение. Я издевался над всеми неуказанными классами и интерфейсами.

import org.jetbrains.annotations.NotNull;

import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class Main {

    private static class MySchedulerInterface implements SchedulerInterface {
        private final ProfileRequest request;

        public MySchedulerInterface(ProfileRequest req1) {
            this.request = req1;
        }

        @Override
        public void onSuccess(String response) {
            System.out.println("onSuccess:[" + request + "]" + response);
        }

        @Override
        public void onFailure(Exception exception) {
            System.out.println("onFailure:[" + request + "]" + exception);
        }

        @Override
        public void onSchedulerStop(String uid) {
            System.out.println("onSchedulerStop:[" + request + "] - " + uid);
        }

        @Override
        public void onSchedulerStart(String uid) {
            System.out.println("onSchedulerStart:[" + request + "] - " + uid);
        }
    }

    public static void main(String[] args) throws InterruptedException {

        ApiInterface api = new ApiInterface();

        ProfileRequest req1 = new ProfileRequest("1", "apple");
        ProfileRequest req2 = new ProfileRequest("2", "orange");
        ProfileRequest req3 = new ProfileRequest("3", "peach");
        ProfileRequest req11 = new ProfileRequest("1", "pineapple");

        MyScheduler scheduler = new MyScheduler();
        scheduler.createScheduler(api, req1, new MySchedulerInterface(req1));
        scheduler.createScheduler(api, req2, new MySchedulerInterface(req2));
        scheduler.createScheduler(api, req3, new MySchedulerInterface(req3));

        System.out.println("Created 3 tasks");
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Starting 3 tasks");
        scheduler.start("1");
        scheduler.start("2");
        scheduler.start("3");
        System.out.println("Started 3 tasks");

        TimeUnit.SECONDS.sleep(10);

        System.out.println("Replacing task 1...");
        scheduler.createScheduler(api, req11, new MySchedulerInterface(req11));
        System.out.println("Replaced task 1.");

        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping 3 tasks...");
        scheduler.stop("1");
        scheduler.stop("2");
        scheduler.stop("3");
        System.out.println("The end.");
    }
}

class ProfileRequest {
    private final String uid;
    private final String value;

    public ProfileRequest(String uid, String value) {
        this.uid = uid;
        this.value = value;
    }

    public String getUid() {
        return uid;
    }

    public String getValue() {
        return value;
    }

    @Override
    public String toString() {
        return new StringJoiner(", ", ProfileRequest.class.getSimpleName() + "[", "]")
                .add("uid='" + uid + "'")
                .add("value='" + value + "'")
                .toString();
    }
}

class ApiInterface {
    public void createBookmark(ProfileRequest request, Callback<String> stringCallback) {
        stringCallback.onSuccess("SUCCESS: I'm done with " + request);
    }
}

interface SchedulerInterface {

    void onSuccess(String response);

    void onFailure(Exception exception);

    void onSchedulerStop(String uid);

    void onSchedulerStart(String uid);
}


interface Callback<T> {
    void onSuccess(@NotNull T response);

    void onFailure(@NotNull Exception exception);
}

class MyScheduler extends TaskScheduler {

    private final int DEFAULT_SCHEDULER_INTERVAL = 2; // seconds

    public MyScheduler() {
    }

    public void createScheduler(@NotNull ApiInterface apiInterface,
                                @NotNull ProfileRequest request,
                                @NotNull SchedulerInterface schedulerInterface) {
        super.setTask(new SchedulerRunnable(apiInterface, request, schedulerInterface), request.getUid(), DEFAULT_SCHEDULER_INTERVAL);
    }

    @Override
    public ScheduledTask doStart(@NotNull String uid) {
        final ScheduledTask task = super.doStart(uid);
        if (task != null) {
            final SchedulerRunnable runnable = (SchedulerRunnable) task.runnable;
            runnable.schedulerInterface.onSchedulerStart(uid);
        }
        return task;
    }

    @Override
    protected ScheduledTask doStop(@NotNull String uid) {
        final ScheduledTask task = super.doStop(uid);
        if (task != null) {
            final SchedulerRunnable runnable = (SchedulerRunnable) task.runnable;
            runnable.schedulerInterface.onSchedulerStop(uid);
        }
        return task;
    }

    private class SchedulerRunnable implements Runnable {

        private final ApiInterface apiInterface;
        private final ProfileRequest request;
        private final SchedulerInterface schedulerInterface;

        SchedulerRunnable(ApiInterface apiInterface, ProfileRequest request, SchedulerInterface schedulerInterface) {
            this.apiInterface = apiInterface;
            this.request = request;
            this.schedulerInterface = schedulerInterface;
        }

        @Override
        public void run() {
            apiInterface.createBookmark(request, new Callback<String>() {
                @Override
                public void onSuccess(@NotNull String response) {
                    schedulerInterface.onSuccess(response);
                }

                @Override
                public void onFailure(@NotNull Exception exception) {
                    schedulerInterface.onFailure(exception);
                }
            });
        }
    }
}


class SchedulerModel {
    ScheduledExecutorService executorService;
    Runnable runnable;
    int interval;
}


class TaskScheduler {

    static class ScheduledTask {
        String uid;
        Runnable runnable;
        int interval;
        ScheduledFuture<?> future;

        ScheduledTask(String uid, Runnable runnable, int interval, ScheduledFuture<?> future) {
            this.uid = uid;
            this.runnable = runnable;
            this.interval = interval;
            this.future = future;
        }

        void dispose() {
            if (future != null) {
                future.cancel(true);
            }
        }

        boolean isScheduled() {
            return future != null;
        }
    }

    private ConcurrentMap<String, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>();
    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);

    TaskScheduler() {
    }

    /**
     * Method is used to initialize scheduler task and time delays
     *
     * @param runnable Represents a command that can be executed
     * @param interval The time interval for execution of code
     */
    void setTask(Runnable runnable, String uid, int interval) {
        AtomicBoolean requiresRestart = new AtomicBoolean(false);
        final ScheduledTask task = scheduledTasks.compute(uid, (id, oldTask) -> {
            ScheduledTask newTask = new ScheduledTask(uid, runnable, interval, null);
            if (oldTask != null) {
                oldTask.dispose();
                requiresRestart.set(oldTask.isScheduled());
            }
            return newTask;
        });

        if (requiresRestart.get()) {
            start(uid);
        }
    }

    public void start(@NotNull String uid) {
        doStart(uid);
    }

    public void stop(@NotNull String uid) {
        doStop(uid);
    }

    protected ScheduledTask doStart(@NotNull String uid) {
        final ScheduledTask scheduledTask = scheduledTasks.computeIfPresent(uid, (id, oldTask) -> {
            ScheduledFuture<?> future = executor.scheduleWithFixedDelay(
                    oldTask.runnable, 0, oldTask.interval, TimeUnit.SECONDS);
            ScheduledTask newTask = new ScheduledTask(oldTask.uid, oldTask.runnable, oldTask.interval, future);
            return newTask;
        });
        return scheduledTask;
    }

    protected ScheduledTask doStop(@NotNull String uid) {
        final ScheduledTask task = scheduledTasks.remove(uid);
        task.dispose();
        return task;
    }


}

person Devstr    schedule 04.09.2019
comment
В вашем коде также есть множество проблем с параллелизмом и управлением ресурсами. Я могу расширить это, если хотите. Я бы посоветовал прочитать книгу Java Concurrency in Practice для получения общей информации о параллелизме: jcip.net - person Devstr; 04.09.2019
comment
спасибо за пример. И да, пожалуйста, подробно расскажите о проблемах, которые вы видите. - person portfoliobuilder; 04.09.2019
comment
№1 - хороший аргумент, спасибо за это. Но №2 целенаправлен. Существует x-количество потоков, которые можно создать. Не обращайте внимания на кошмар, который творится. Но способ сделать это, согласно моим исследованиям, - создать новый отдельный ThreadExecutorService для каждой задачи. Затем я могу отменить (завершить) каждую задачу, используя UID. Что работает отлично. - person portfoliobuilder; 04.09.2019
comment
Добро пожаловать! Не могли бы вы уточнить, в чем причина № 2? ИМО, лучший способ - сохранить ссылку на Future, который вы получаете от executor.scheduleWithFixedDelay() при отправке задачи, и вызвать cancel, если вы хотите остановить ее. Ваш runnable, конечно же, должен правильно обрабатывать ThreadInterruptedException. - person Devstr; 04.09.2019
comment
Причина №2 - глупые менеджеры по продукту. У меня вопрос по поводу отсылки к Будущему. Хотя это помогает мне отслеживать цепочки, как я могу отслеживать возвращаемые ответы? Я также веду список ScheduledFuture ‹?› - person portfoliobuilder; 04.09.2019
comment
Позвольте нам продолжить это обсуждение в чате. - person portfoliobuilder; 04.09.2019