Я запускаю свое приложение Kotlin на основе Netty с помощью Spring Boot и WebFlux. Подробности таковы:
- Java 11;
- Kotlin 1.3.61;
- Spring Boot 2.2.5.RELEASE;
- Spring Vault Core 2.2.2.РЕЛИЗ.
Я получаю файл на веб-слое. WebFlux создает из него Part
(org.springframework.http.codec.multipart
). Данные хранятся в Flux
Project Reactor в Part
в виде потока DataBuffer
блоков размером 4Kb:
Flux<DataBuffer> content();
Из-за соблюдения согласованности фреймворков я преобразовываю Flux
в Flow
Kotlin.
Затем я использую encrypt(...)
синхронного клиента Vault, отправляющего фрагменты асинхронно (насколько я понимаю) в методе flatMapMerge
(примечание, что encrypt(...)
не suspend
, и это оболочка поверх HTTP-клиента для удаленного поставщика шифрования):
public String encrypt(String keyName, String plaintext);
Я проверил этот ответ https://stackoverflow.com/a/58659423/6612401 и обнаружил, что на основе потока Подход следует использовать с flow { emit(...)}
.
У меня вопрос: могу ли я использовать этот поточно-ориентированный подход без suspend
функций? Или есть лучший подход, учитывая, что я использую runBlocking(Dispatchers.IO)
и suspend
fold(...)
функцию.
Код выглядит следующим образом:
@FlowPreview
@ExperimentalCoroutinesApi
private fun getOpenByteArrayAndEncryptText(part: Part): Pair<ByteArray, String> = runBlocking(Dispatchers.IO) {
val pair = part.content().asFlow()
.flatMapMerge { dataBuffer ->
val openByteArray = dataBuffer.asInputStream().readBytes()
val opentextBase64 = Base64Utils.encodeToString(openByteArray)
flow { emit(Pair(openByteArray, vaultTransitTemplate.encrypt(KEY_NAME, opentextBase64))) }
}.fold(Pair(ByteArrayOutputStream(), StringBuilder())) { result, curPair ->
result.first.writeBytes(curPair.first)
result.second.append(curPair.second)
result
}
Pair(pair.first.toByteArray(), pair.second.toString())
}
P.S. Функция fold(...)
собирает открытые фрагменты в ByteArrayOutputStream
для последующего вычисления хэша и собирает зашифрованные фрагменты в StringBuilder
в результате шифрования файла.
P.P.S. Я попробовал свой подход. Этот метод отправляет в среднем 5-7 параллельных запросов на моем компьютере с 4 физическими ядрами Core i5 8gen. Он выполняет свою работу, но не так быстро. Если Vault развернут не локально, я получаю примерно 1 секунду на 1 Мб шифрования. Я так понимаю, это зависит от латентности сети. Я даже не рассматриваю скорость шифрования на стороне Vault, это молниеносно из-за размера фрагментов, который составляет всего 4 КБ. Есть ли способы увеличить скорость параллелизма?
P.P.P.S Я пробовал играть с concurrency = MAX_CONCURRENT_REQUESTS
в flatMapMerge{...}
. Пока ничего существенного в результатах. Еще лучше оставить значение по умолчанию.
vaultTransitTemplate.encrypt(KEY_NAME, opentextBase64)
- это вызов клиента HTTP. - person Eugene Mamaev   schedule 20.05.2020concurrency = MAX_CONCURRENT_REQUESTS
наflatMapMerge
, но если Vault выполняет шифрование так быстро, как вы говорите, возможно, ограничивающий фактор находится в другом месте. - person Marko Topolnik   schedule 20.05.2020concurrency = MAX_CONCURRENT_REQUESTS
. Пока ничего существенного в результатах. Еще лучше оставить значение по умолчанию. - person Eugene Mamaev   schedule 20.05.2020suspend
функцию из неsuspend
? Например, с ключевым словомsuspend {...}
. - person Eugene Mamaev   schedule 20.05.2020