Пакетный процессор Spring не запускает ItemProcessorListener

Итак, у меня проблема в Spring Batch 3.0.7.RELEASE и Spring 4.3.2.RELEASE, где слушатели не работают в моем ItemProcessor классе. Регулярная инъекция на уровне @StepScope работает для @Value("#{jobExecutionContext['" + Constants.SECURITY_TOKEN + "']}"), как показано ниже. Но это не работает для beforeProcess или beforeStep, я пробовал как версию аннотации, так и версию интерфейса. Я почти на 100% уверен, что это работало в какой-то момент, но не могу понять, почему это остановилось.

Любые идеи? Похоже, я неправильно настроил?

AppBatchConfiguration.java

@Configuration
@EnableBatchProcessing
@ComponentScan(basePackages = "our.org.base")
public class AppBatchConfiguration {

    private final static SimpleLogger LOGGER = SimpleLogger.getInstance(AppBatchConfiguration.class);

    private final static String OUTPUT_XML_FILE_PATH_PLACEHOLDER = null;
    private final static String INPUT_XML_FILE_PATH_PLACEHOLDER = null;

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean(name = "cimAppXmlReader")
    @StepScope
    public <T> ItemStreamReader<T> appXmlReader(@Value("#{jobParameters[inputXmlFilePath]}")
    String inputXmlFilePath) {
        LOGGER.info("Job Parameter => App XML File Path :" + inputXmlFilePath);
        StaxEventItemReader<T> reader = new StaxEventItemReader<T>();
        reader.setResource(new FileSystemResource(inputXmlFilePath));
        reader.setUnmarshaller(mecaUnMarshaller());
        reader.setFragmentRootElementNames(getAppRootElementNames());
        reader.setSaveState(false);

        // Make the StaxEventItemReader thread-safe
        SynchronizedItemStreamReader<T> synchronizedItemStreamReader = new SynchronizedItemStreamReader<T>();
        synchronizedItemStreamReader.setDelegate(reader);

        return synchronizedItemStreamReader;
    }

    @Bean
    @StepScope
    public ItemStreamReader<JAXBElement<AppIBTransactionHeaderType>> appXmlTransactionHeaderReader(@Value("#{jobParameters[inputXmlFilePath]}")
    String inputXmlFilePath) {
        LOGGER.info("Job Parameter => App XML File Path for Transaction Header :" + inputXmlFilePath);
        StaxEventItemReader<JAXBElement<AppIBTransactionHeaderType>> reader = new StaxEventItemReader<>();
        reader.setResource(new FileSystemResource(inputXmlFilePath));
        reader.setUnmarshaller(mecaUnMarshaller());

        String[] fragmentRootElementNames = new String[] {"AppIBTransactionHeader"};
        reader.setFragmentRootElementNames(fragmentRootElementNames);
        reader.setSaveState(false);

        return reader;
    }

    @Bean
    public Unmarshaller mecaUnMarshaller() {
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
        marshaller.setPackagesToScan(ObjectFactory.class.getPackage().getName());
        return marshaller;
    }

    @Bean
    public Marshaller uberMarshaller() {
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
        marshaller.setClassesToBeBound(ServiceRequestType.class);
        marshaller.setSupportJaxbElementClass(true);
        return marshaller;
    }

    @Bean(destroyMethod="") // To stop multiple close calls, see: http://stackoverflow.com/a/23089536
    @StepScope
    public ResourceAwareItemWriterItemStream<JAXBElement<ServiceRequestType>> writer(@Value("#{jobParameters[outputXmlFilePath]}")
    String outputXmlFilePath) {
        SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>> writer = new SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>>();

        writer.setResource(new FileSystemResource(outputXmlFilePath));
        writer.setMarshaller(uberMarshaller());
        writer.setSaveState(false);
        HashMap<String, String> rootElementAttribs = new HashMap<String, String>();
        rootElementAttribs.put("xmlns:ns1", "http://some.org/corporate/message/2010/1");
        writer.setRootElementAttributes(rootElementAttribs);
        writer.setRootTagName("ns1:SetOfServiceRequests");

        return writer;
    }

    @Bean
    @StepScope
    public <T> ItemProcessor<T, JAXBElement<ServiceRequestType>> appNotificationProcessor() {
        return new AppBatchNotificationItemProcessor<T>();
    }

    @Bean
    public ItemProcessor<JAXBElement<AppIBTransactionHeaderType>, Boolean> appBatchCreationProcessor() {
        return new AppBatchCreationItemProcessor();
    }


    public String[] getAppRootElementNames() {        
        //get list of App Transaction Element Names        
        return AppProcessorEnum.getValues();         
    }

    @Bean
    public Step AppStep() {
        // INPUT_XML_FILE_PATH_PLACEHOLDER and OUTPUT_XML_FILE_PATH_PLACEHOLDER will be overridden 
        // by injected jobParameters using late binding (StepScope)
        return stepBuilderFactory.get("AppStep")
                .<Object, JAXBElement<ServiceRequestType>> chunk(10)
                .reader(appXmlReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
                .processor(appNotificationProcessor())
                .writer(writer(OUTPUT_XML_FILE_PATH_PLACEHOLDER))
                .taskExecutor(concurrentTaskExecutor())
                .throttleLimit(1)
                .build();

    }

    @Bean
    public Step BatchCreationStep() {
        return stepBuilderFactory.get("BatchCreationStep")
                .<JAXBElement<AppIBTransactionHeaderType>, Boolean>chunk(1)
                .reader(appXmlTransactionHeaderReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
                .processor(appBatchCreationProcessor())
                .taskExecutor(concurrentTaskExecutor())
                .throttleLimit(1)
                .build();
    }

    @Bean
    public Job AppJob() {
        return jobBuilderFactory.get("AppJob")
                .incrementer(new RunIdIncrementer())
                .listener(AppJobCompletionNotificationListener())
                .flow(AppStep())
                .next(BatchCreationStep())
                .end()
                .build();
    }

    @Bean
    public JobCompletionNotificationListener AppJobCompletionNotificationListener() {
        return new JobCompletionNotificationListener();
    }

    @Bean
    public TaskExecutor concurrentTaskExecutor() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(1);
        return taskExecutor;
    }
}

AppBatchNotificationItemProcessor.java

@StepScope
public class AppBatchNotificationItemProcessor<E> extends AppAbstractItemProcessor<E, JAXBElement<ServiceRequestType>> implements ItemProcessor<E, JAXBElement<ServiceRequestType>>, StepExecutionListener {

    // This is populated correctly
    @Value("#{jobExecutionContext['" + Constants.SECURITY_TOKEN + "']}")
    private SecurityToken securityToken;

    @Autowired
    private AppProcessorService processor;

    @Override
    public JAXBElement<ServiceRequestType> process(E item) throws BPException {
        // Do Stuff
        return srRequest;
    }

    @BeforeProcess
    public void beforeProcess(E item) {
        System.out.println("Doesn't execute");
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // Doesn't execute
        System.out.println("Doesn't execute");
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        // Doesn't execute
        System.out.println("Doesn't execute");
    }

}

person Joel Pearson    schedule 15.08.2016    source источник


Ответы (2)


Это связано с тем, что вы возвращаете интерфейсы вместо реализации в ваших @Bean методах. ИМХО, при использовании конфигурации java в Spring вы должны возвращать наиболее конкретный тип. Вот почему:

При настройке через XML вы предоставляете класс в конфигурации XML. Это открывает доступ к реализации для Spring, так что любые интерфейсы, реализуемые классом, могут быть обнаружены и обработаны соответствующим образом. При использовании конфигурации java тип возвращаемого значения метода @Bean служит заменой этой информации. И вот в чем проблема. Если ваш возвращаемый тип - интерфейс, Spring знает только об этом конкретном интерфейсе, а не обо всех интерфейсах, которые реализация может реализовать. Возвращая конкретный тип там, где это возможно, вы даете Spring понимание того, что вы на самом деле возвращаете, чтобы он мог лучше справиться с различными вариантами использования регистрации и подключения.

В вашем конкретном примере, поскольку вы возвращаете ItemProcessor и его область действия (следовательно, проксируется), Spring знает только о методах / поведении, ожидаемых от интерфейса ItemProcessor. Если вы вернете реализацию (AppBatchNotificationItemProcessor), другие варианты поведения можно настроить автоматически.

person Michael Minella    schedule 15.08.2016
comment
Спасибо, Майкл, это сработало. - person Joel Pearson; 16.08.2016

Насколько я помню, вы должны зарегистрировать читатель, писатель, процессор непосредственно в качестве слушателя на шаге, если вы используете StepScope.

StepScope не позволяет фреймворку определять, какие интерфейсы, соответственно. @annotations (например, @ BeforeProcess) прокси фактически реализует / определяет и поэтому не может зарегистрировать его в качестве слушателя.

Итак, я предполагаю, что если добавить

    return stepBuilderFactory.get("AppStep")
            .<Object, JAXBElement<ServiceRequestType>> chunk(10)
            .reader(appXmlReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
            .processor(appNotificationProcessor())
            .writer(writer(OUTPUT_XML_FILE_PATH_PLACEHOLDER))

        .listener(appNotificationProcessor())

            .taskExecutor(concurrentTaskExecutor())
            .throttleLimit(1)
            .build();

это будет работать.

person Hansjoerg Wingeier    schedule 15.08.2016