Запросы параллельного ввода-вывода с Kotlin Flow, Coroutines и функцией NOT suspend

Я запускаю свое приложение Kotlin на основе Netty с помощью Spring Boot и WebFlux. Подробности таковы:

  • Java 11;
  • Kotlin 1.3.61;
  • Spring Boot 2.2.5.RELEASE;
  • Spring Vault Core 2.2.2.РЕЛИЗ.

Я получаю файл на веб-слое. WebFlux создает из него Part (org.springframework.http.codec.multipart). Данные хранятся в Flux Project Reactor в Part в виде потока DataBuffer блоков размером 4Kb:

Flux<DataBuffer> content();

Из-за соблюдения согласованности фреймворков я преобразовываю Flux в Flow Kotlin.

Затем я использую encrypt(...) синхронного клиента Vault, отправляющего фрагменты асинхронно (насколько я понимаю) в методе flatMapMerge (примечание, что encrypt(...) не suspend, и это оболочка поверх HTTP-клиента для удаленного поставщика шифрования):

public String encrypt(String keyName, String plaintext);

Я проверил этот ответ https://stackoverflow.com/a/58659423/6612401 и обнаружил, что на основе потока Подход следует использовать с flow { emit(...)}.

У меня вопрос: могу ли я использовать этот поточно-ориентированный подход без suspend функций? Или есть лучший подход, учитывая, что я использую runBlocking(Dispatchers.IO) и suspend fold(...) функцию.

Код выглядит следующим образом:

@FlowPreview
@ExperimentalCoroutinesApi
private fun getOpenByteArrayAndEncryptText(part: Part): Pair<ByteArray, String> = runBlocking(Dispatchers.IO) {
    val pair = part.content().asFlow()
            .flatMapMerge { dataBuffer ->
                val openByteArray = dataBuffer.asInputStream().readBytes()
                val opentextBase64 = Base64Utils.encodeToString(openByteArray)
                flow { emit(Pair(openByteArray,  vaultTransitTemplate.encrypt(KEY_NAME, opentextBase64))) }
            }.fold(Pair(ByteArrayOutputStream(), StringBuilder())) { result, curPair ->
                result.first.writeBytes(curPair.first)
                result.second.append(curPair.second)
                result
            }
    Pair(pair.first.toByteArray(), pair.second.toString())
}

P.S. Функция fold(...) собирает открытые фрагменты в ByteArrayOutputStream для последующего вычисления хэша и собирает зашифрованные фрагменты в StringBuilder в результате шифрования файла.

P.P.S. Я попробовал свой подход. Этот метод отправляет в среднем 5-7 параллельных запросов на моем компьютере с 4 физическими ядрами Core i5 8gen. Он выполняет свою работу, но не так быстро. Если Vault развернут не локально, я получаю примерно 1 секунду на 1 Мб шифрования. Я так понимаю, это зависит от латентности сети. Я даже не рассматриваю скорость шифрования на стороне Vault, это молниеносно из-за размера фрагментов, который составляет всего 4 КБ. Есть ли способы увеличить скорость параллелизма?

P.P.P.S Я пробовал играть с concurrency = MAX_CONCURRENT_REQUESTS в flatMapMerge{...}. Пока ничего существенного в результатах. Еще лучше оставить значение по умолчанию.


person Eugene Mamaev    schedule 20.05.2020    source источник
comment
Я не вижу в этом коде операций ввода-вывода, он привязан к процессору. Если вам нужна максимальная производительность для параллелизируемой работы с привязкой к ЦП, используйте Java Streams. Kotlin Flows предназначены для поддержки приостанавливаемых преобразований потоков, но вы их не используете.   -  person Marko Topolnik    schedule 20.05.2020
comment
@MarkoTopolnik Vault не является подключаемой библиотекой. Шифрование происходит на удаленном сервере, Vault здесь только HTTP-клиент для Vault. vaultTransitTemplate.encrypt(KEY_NAME, opentextBase64) - это вызов клиента HTTP.   -  person Eugene Mamaev    schedule 20.05.2020
comment
Итак, ваша цель - увеличить количество одновременных запросов? Вы можете попробовать поиграть с concurrency = MAX_CONCURRENT_REQUESTS на flatMapMerge, но если Vault выполняет шифрование так быстро, как вы говорите, возможно, ограничивающий фактор находится в другом месте.   -  person Marko Topolnik    schedule 20.05.2020
comment
Да, именно то, что я пытаюсь сделать, - это увеличить количество одновременных запросов. Я пробовал играть с concurrency = MAX_CONCURRENT_REQUESTS. Пока ничего существенного в результатах. Еще лучше оставить значение по умолчанию.   -  person Eugene Mamaev    schedule 20.05.2020
comment
Кстати, идиоматично ли делать suspend функцию из не suspend? Например, с ключевым словом suspend {...}.   -  person Eugene Mamaev    schedule 20.05.2020
comment
В этом нет никакого смысла. Вы просто добавляете накладные расходы, ничего не получая.   -  person Marko Topolnik    schedule 20.05.2020