Как произвести параллельную работу с потенциальными взаимодействиями с БД по списку

Я стараюсь, чтобы мой код работал более эффективно, поэтому я пытаюсь понять, как заставить его работать с Futures и ForkJoinPool.

На данный момент у меня есть код, который работает так:

@RestController
@RequestMapping(SEND)
@Slf4j
public class InputMessageController {
private HandlerService service;
    @ApiResponses(value = {
            @ApiResponse(code = 200, message = "message sent"),
            @ApiResponse(code = 404, message = "channel not found or inactive"),
    })
    @RequestMapping(value = "/{channel}", method = RequestMethod.POST)
    @ResponseStatus(HttpStatus.OK)
    public List<ResponseDto> sendNotification(
            @Valid @RequestBody @NotNull List<PayloadInfoDto> requestDtoList,
            @PathVariable("channel") String url) throws InputChannelInactiveException {

        InputChannel channel = mapService.getChannelByUrl(url);

        if (channel != null){
            return service.processing(requestDtoList, channel);
        } else {
            loggerService.logChannelNotFound()
            throw new InputChannelInactiveException(url);
        }
    }
}
public class HandlerServiceImpl implements HandlerService {
public List<ResponseDto> processing(List<PayloadInfoDto> requestDtoList, InputChannel channel) {

        List<ResponseDto> responseDtoList = new ArrayList<>();

//this one takes quite long. would be better if it was done in multiple threads
        requestDtoList.forEach(inputRequestDto -> responseDtoList.add(processNorm(inputRequestDto, channel)));

        return responseDtoList;
    }
}

processNorm(PayloadInfoDto inputRequestDto, InputChannel channel) {
        RequestMessage msg;

        //call to multiple services which can throw an exception. Each exception is processed.
        //call to logger service which logs info to database

}

Мои вопросы:

  1. Для службы логгера, которая работает с базой данных, будет ли лучше писать вместо обычного сохранения в БД, особенно если мы не хотим ждать записи БД и хотим двигаться дальше?
private ExecutorService executor = Executors.newWorkStealingPool();
///....some code
public void exampleAsyncLog(){
//I don't wan't to wait while logger service writes to DB. It can take as long as it wants to write, 
//while I'll move on
executor.submit(() -> saveSomethingToDb()); //saveSomethingToDb() is trivial logRepository.save(newEntity)
}
  1. Я не понял, как запустить CompletableFuture / ForkJoinPool в List. По крайней мере, таким образом, чтобы а) принимать список в качестве аргумента б) обрабатывать каждую единицу в списке параллельно в) конечный результат также является списком. Все примеры, которые я встречал, работают с составлением списка CompletableFuture и последующим выполнением allOf () как в этом блоге. Является ли это правильным способом создания списка результатов из списка с использованием фьючерсов? Или есть подход получше?

person Sheyko Dmitriy    schedule 29.06.2020    source источник


Ответы (1)


Я посоветую вам использовать CompletableFuture для этой задачи, потому что вы хотите продолжить, только когда все ваши задачи сохранены в БД, что является наиболее безопасным для вашей задачи. Так что кодируйте что-нибудь вроде:

public class HandlerServiceImpl implements HandlerService {
public List<ResponseDto> processing(List<PayloadInfoDto> requestDtoList, InputChannel channel) {

        List<ResponseDto> responseDtoList = new ArrayList<>();

//this one takes quite long. would be better if it was done in multiple threads
CompletableFuture<List<ResponseDto>> future
  = CompletableFuture.supplyAsync(() -> requestDtoList.stream().map(inputRequestDto -> processNorm(inputRequestDto, channel)).collect(Collectors.toList())); //This code is not tested. It's the logic that I am showing you to implement that will work for you.

        return future.get();//wait till all the operation is completed.
    }
}
person Ashish Karn    schedule 29.06.2020