Мой старый код поддерживал использование одной очереди SQS с SqsStream. Мне нужно обновить его для поддержки нескольких очередей с учетом списка URL-адресов очереди.
Содержание метода:
for {
sqs <- Sqs.>.async // async client
urls <- Sqs.>.queueUrls // List[String] of multiple queues
_ <- {
urls
.map(url => {
SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
.mapMParUnordered(10)(handleMessage) // run "handleMessage" up to 10 times concurrently, ZStream[Env, Throwable, Unit]
.runDrain // ZIO[Env, Throwable, Unit]
.forever // ZIO[Env, Throwable, Nothing]
})
} yield ()
но компилятор жалуется, потому что ожидает (ZIO, ZIO, ZIO), тогда как я дал ему (ZIO, ZIO, List). Я предполагаю, что мне нужно свести все эффекты в этом списке к одному эффекту, который будет выполняться handleMessage
параллельно во всех очередях, но я не уверен в синтаксисе, так как у меня нет опыта работы с ZIO.
По сути, к этому моменту
urls
.map(url => {
SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
мой URL стал ZStream. Я думаю, мне нужно вызвать ZStream.flatMapPar
, используя этот и следующий элемент, и так далее, пока все они не будут объединены. Как бы я это сделал?