Как изящно завершить весеннюю задачу @Schedule?

Я пытаюсь сделать так, чтобы служба весенней загрузки закончилась изящно. У нее есть метод с аннотацией @Scheduled. Сервис использует Spring-data для БД и spring-cloud-stream для RabbitMQ. Жизненно важно, чтобы DB и RabbitMQ были доступны до завершения запланированного метода. Имеется автоматическое масштабирование, которое часто запускает / останавливает экземпляры службы, сбой при остановке не является вариантом.

Из этого сообщения Spring - Scheduled Task - Graceful Shutdown я полагаю, что этого должно быть достаточно, чтобы добавлять

    @Bean
    TaskSchedulerCustomizer taskSchedulerCustomizer() {
        return taskScheduler -> {
            taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
            taskScheduler.setAwaitTerminationSeconds(30);
        };
    }

и Spring должен дождаться завершения работы приложения, пока не завершится запланированный метод или не истечет 30 секунд.

Когда служба останавливается во время выполнения запланированного метода, я могу видеть следующие вещи из журнала

  1. spring-cloud-stream закрывает соединения, не дожидаясь завершения метода.
  2. spring-data также сразу закрывает db-соединение.
  3. Метод не останавливается и пытается завершить, но терпит неудачу, поскольку он больше не может получить доступ к базе данных.

Есть идеи, как получить запланированный метод, включая доступ к db-connection и rabbitMq, чтобы закончить изящно?

Это мой класс приложения:

@SpringBootApplication(scanBasePackages = {
    "xx.yyy.infop.dao",
    "xx.yyy.infop.compress"})
@EntityScan("ch.sbb.infop.common.entity")
@EnableJpaRepositories({"xx.yyy.infop.dao", "xx.yyy.infop.compress.repository"})
@EnableBinding(CompressSink.class)
@EnableScheduling
public class ApplicationCompress {

    @Value("${max.commpress.timout.seconds:300}")
    private int maxCompressTimeoutSeconds;

    public static void main(String[] args) {
        SpringApplication.run(ApplicationCompress.class, args);
    }

    @Bean
    TaskSchedulerCustomizer taskSchedulerCustomizer() {
        return taskScheduler -> {
            taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
            taskScheduler.setAwaitTerminationSeconds(maxCompressTimeoutSeconds);
        };
    }

}

А этот фасоль:

@Component
@Profile("!integration-test")
public class CommandReader {

    private static final Logger LOGGER = LoggerFactory.getLogger(CommandReader.class);

    private final CompressSink compressSink;
    private final CommandExecutor commandExecutor;

    CommandReader(CompressSink compressSink, CommandExecutor commandExecutor) {
        this.compressSink = compressSink;
        this.commandExecutor = commandExecutor;
    }

    @PreDestroy
    private void preDestory() {
        LOGGER.info("preDestory");
    }

    @Scheduled(fixedDelay = 1000)
    public void poll() {
        LOGGER.debug("Start polling.");
        ParameterizedTypeReference<CompressCommand> parameterizedTypeReference = new ParameterizedTypeReference<>() {
        };
        if (!compressSink.inputSync().poll(this::execute, parameterizedTypeReference)) {
            compressSink.inputAsync().poll(this::execute, parameterizedTypeReference);
        }
        LOGGER.debug("Finished polling.");
    }

    private void execute(Message<?> message) {
        CompressCommand compressCommand = (CompressCommand) message.getPayload();

        // uses spring-data to write to DB
        CompressResponse compressResponse = commandExecutor.execute(compressCommand);

        // Schreibt die Anwort in Rensponse-Queue
        compressSink.outputResponse().send(MessageBuilder.withPayload(compressResponse).build());
    }

}

А вот несколько строк из журнала (см. https://pastebin.com/raw/PmmqhH1P для полный журнал):

2020-05-15 11:59:35,640 [restartedMain] - INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.initialize - traceid= - Initializing ExecutorService 'taskScheduler'

2020-05-15 11:59:44,976 [restartedMain] - INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.initialize - traceid= - Initializing ExecutorService 'applicationTaskExecutor'

Disconnected from the target VM, address: '127.0.0.1:52748', transport: 'socket'
2020-05-15 12:00:01,228 [SpringContextShutdownHook] - INFO org.springframework.cloud.stream.binder.BinderErrorChannel.adjustCounterIfNecessary - traceid= - Channel 'application-1.kompressSync.komprimierungSyncProcessingGroup.errors' has 1 subscriber(s).
2020-05-15 12:00:01,229 [SpringContextShutdownHook] - INFO org.springframework.cloud.stream.binder.BinderErrorChannel.adjustCounterIfNecessary - traceid= - Channel 'application-1.kompressSync.komprimierungSyncProcessingGroup.errors' has 0 subscriber(s).
2020-05-15 12:00:01,232 [SpringContextShutdownHook] - INFO org.springframework.cloud.stream.binder.BinderErrorChannel.adjustCounterIfNecessary - traceid= - Channel 'application-1.kompressAsync.komprimierungAsyncProcessingGroup.errors' has 1 subscriber(s).
2020-05-15 12:00:01,232 [SpringContextShutdownHook] - INFO org.springframework.cloud.stream.binder.BinderErrorChannel.adjustCounterIfNecessary - traceid= - Channel 'application-1.kompressAsync.komprimierungAsyncProcessingGroup.errors' has 0 subscriber(s).
2020-05-15 12:00:01,237 [SpringContextShutdownHook] - INFO org.springframework.integration.endpoint.EventDrivenConsumer.logComponentSubscriptionEvent - traceid= - Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-05-15 12:00:01,237 [SpringContextShutdownHook] - INFO org.springframework.integration.channel.PublishSubscribeChannel.adjustCounterIfNecessary - traceid= - Channel 'application-1.errorChannel' has 0 subscriber(s).
2020-05-15 12:00:01,237 [SpringContextShutdownHook] - INFO org.springframework.integration.endpoint.EventDrivenConsumer.stop - traceid= - stopped bean '_org.springframework.integration.errorLogger'
2020-05-15 12:00:01,244 [SpringContextShutdownHook] - INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.shutdown - traceid= - Shutting down ExecutorService 'applicationTaskExecutor'
2020-05-15 12:00:01,245 [SpringContextShutdownHook] - INFO yy.xxx.infop.compress.CommandReader.preDestory - traceid= - preDestory
2020-05-15 12:00:01,251 [SpringContextShutdownHook] - INFO org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean.destroy - traceid= - Closing JPA EntityManagerFactory for persistence unit 'default'
2020-05-15 12:00:01,256 [SpringContextShutdownHook] - INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown - traceid= - Shutting down ExecutorService 'taskScheduler'
2020-05-15 12:00:01,256 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.doInTransactionWithoutResult - traceid=d22b696edc90e123 - 4
2020-05-15 12:00:02,257 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.doInTransactionWithoutResult - traceid=d22b696edc90e123 - 5
2020-05-15 12:00:03,258 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.doInTransactionWithoutResult - traceid=d22b696edc90e123 - 6
2020-05-15 12:00:04,260 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.doInTransactionWithoutResult - traceid=d22b696edc90e123 - 7
2020-05-15 12:00:05,260 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.doInTransactionWithoutResult - traceid=d22b696edc90e123 - 8
2020-05-15 12:00:06,261 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.doInTransactionWithoutResult - traceid=d22b696edc90e123 - 9
2020-05-15 12:00:07,262 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.doInTransactionWithoutResult - traceid=d22b696edc90e123 - end
2020-05-15 12:00:07,263 [scheduling-1] - INFO yy.xxx.infop.compress.condense.VmLaufVerdichter.verdichte - traceid=d22b696edc90e123 - VarianteTyp=G, vmId=482392382, vnNr=8416
2020-05-15 12:00:07,326 [scheduling-1] -ERROR yy.xxx.infop.compress.CommandExecutor.execute - traceid=d22b696edc90e123 - 

org.springframework.beans.factory.BeanCreationNotAllowedException: Error creating bean with name 'inMemoryDatabaseShutdownExecutor': Singleton bean creation not allowed while singletons of this factory are in destruction (Do not request a bean from a BeanFactory in a destroy method implementation!)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:208) ~[spring-beans-5.2.4.RELEASE.jar:5.2.4.RELEASE]

2020-05-15 12:00:08,332 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.execute - traceid=d22b696edc90e123 - Compress started. compressCommand=yy.xxx.infop.compress.client.CompressCommand@247ec0d[hostName=K57176,jobId=b1211ee8-4a54-47f2-a58b-92b3560bbddd,cmdId=1,userId=goofy2,commandTyp=verdichtet G, T und komprimiert G, T,vmId=482392382,started=1589536752609]
2020-05-15 12:00:08,337 [scheduling-1] -ERROR yy.xxx.infop.compress.CommandExecutor.execute - traceid=d22b696edc90e123 - 

org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is java.lang.IllegalStateException: EntityManagerFactory is closed
    at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:448) ~[spring-orm-5.2.4.RELEASE.jar:5.2.4.RELEASE]

2020-05-15 12:00:10,339 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.execute - traceid=d22b696edc90e123 - Compress started. compressCommand=yy.xxx.infop.compress.client.CompressCommand@247ec0d[hostName=K57176,jobId=b1211ee8-4a54-47f2-a58b-92b3560bbddd,cmdId=1,userId=goofy2,commandTyp=verdichtet G, T und komprimiert G, T,vmId=482392382,started=1589536752609]
2020-05-15 12:00:10,343 [scheduling-1] -ERROR yy.xxx.infop.compress.CommandExecutor.execute - traceid=d22b696edc90e123 - 

org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is java.lang.IllegalStateException: EntityManagerFactory is closed

2020-05-15 12:00:10,351 [scheduling-1] -DEBUG yy.xxx.infop.compress.CommandReader.poll - traceid=d22b696edc90e123 - Finished polling.
2020-05-15 12:00:10,372 [SpringContextShutdownHook] - INFO org.springframework.integration.monitor.IntegrationMBeanExporter.destroy - traceid= - Summary on shutdown: bean 'response'
2020-05-15 12:00:10,372 [SpringContextShutdownHook] - INFO org.springframework.integration.monitor.IntegrationMBeanExporter.destroy - traceid= - Summary on shutdown: nullChannel
2020-05-15 12:00:10,373 [SpringContextShutdownHook] - INFO org.springframework.integration.monitor.IntegrationMBeanExporter.destroy - traceid= - Summary on shutdown: bean 'errorChannel'
2020-05-15 12:00:10,373 [SpringContextShutdownHook] - INFO org.springframework.integration.monitor.IntegrationMBeanExporter.destroy - traceid= - Summary on shutdown: bean '_org.springframework.integration.errorLogger.handler' for component '_org.springframework.integration.errorLogger'
2020-05-15 12:00:10,374 [SpringContextShutdownHook] - INFO com.zaxxer.hikari.HikariDataSource.close - traceid= - HikariPool-1 - Shutdown initiated...
2020-05-15 12:00:10,405 [SpringContextShutdownHook] - INFO com.zaxxer.hikari.HikariDataSource.close - traceid= - HikariPool-1 - Shutdown completed.

Process finished with exit code 130

person BetaRide    schedule 15.05.2020    source источник
comment
сбой при остановке невозможен Вы не можете принудительно это сделать. Может случиться любое количество вещей. Что, если, например, отключится электричество?   -  person Michael    schedule 15.05.2020
comment
Да, конечно. Я говорю о нормальном отключении, когда крючки выключения пружины должны обеспечивать дружественное завершение работы.   -  person BetaRide    schedule 15.05.2020
comment
Если вы используете автомасштабирование AWS, он отправит SIGKILL на вашу виртуальную машину, прежде чем масштабирует приложение и завершит работу JVM. Может быть, добавить в программу ловушку отключения?   -  person Brad    schedule 26.12.2020


Ответы (1)


Я протестировал эту конфигурацию, которая должна работать так же, как ваш TaskSchedulerCustomizer:

spring.task.scheduling.shutdown.await-termination=true
spring.task.scheduling.shutdown.await-termination-period=30s

Spring ожидает 30 секунд со всеми доступными службами, прежде чем что-либо закрыть, если есть активные задачи. Если нет активных задач, выключение происходит немедленно.

Стоит упомянуть, что то, что привело меня к этому вопросу, было изящным отключением @Async методов, которое настроено очень похожим образом:

spring.task.execution.shutdown.await-termination=true
spring.task.execution.shutdown.await-termination-period=1s

или в коде:

@Bean
public TaskExecutorCustomizer taskExecutorCustomizer() {
    // Applies to @Async tasks, not @Scheduled as in the question
    return (customizer) -> {
        customizer.setWaitForTasksToCompleteOnShutdown(true);
        customizer.setAwaitTerminationSeconds(10);
    };
}

Возвращаясь к вашему случаю, я предполагаю, что TaskSchedulerCustomizer на самом деле не выполняется или заменяется чем-то еще после его выполнения.

Для первого варианта подтвердите, добавив операторы регистрации или установив точку останова в taskSchedulerCustomizer().

Для второго варианта я предлагаю установить точку останова в TaskSchedulerBuilder::configure(), чтобы посмотреть, что произойдет. Как только отладчик прерывает работу этого метода, добавьте точку останова по данным в свойстве ExecutorConfigurationSupport::awaitTerminationMillis taskScheduler, чтобы увидеть, не изменилось ли это свойство где-то еще.

Вы можете увидеть окончательный период завершения, использованный в процессе выключения, в методе ExecutorConfigurationSupport::awaitTerminationIfNecessary.

person bernie    schedule 26.12.2020