Объедините список ZIO ZStreams в один

Мой старый код поддерживал использование одной очереди SQS с SqsStream. Мне нужно обновить его для поддержки нескольких очередей с учетом списка URL-адресов очереди.

Содержание метода:

for {
  sqs <- Sqs.>.async // async client
  urls <- Sqs.>.queueUrls // List[String] of multiple queues
  _ <- {
    urls
      .map(url => {
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
          .mapMParUnordered(10)(handleMessage) // run "handleMessage" up to 10 times concurrently, ZStream[Env, Throwable, Unit]
          .runDrain // ZIO[Env, Throwable, Unit]
          .forever // ZIO[Env, Throwable, Nothing]
      })
} yield ()

но компилятор жалуется, потому что ожидает (ZIO, ZIO, ZIO), тогда как я дал ему (ZIO, ZIO, List). Я предполагаю, что мне нужно свести все эффекты в этом списке к одному эффекту, который будет выполняться handleMessage параллельно во всех очередях, но я не уверен в синтаксисе, так как у меня нет опыта работы с ZIO.

По сути, к этому моменту

urls
      .map(url => {
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))

мой URL стал ZStream. Я думаю, мне нужно вызвать ZStream.flatMapPar, используя этот и следующий элемент, и так далее, пока все они не будут объединены. Как бы я это сделал?


person user3613290    schedule 10.02.2020    source источник


Ответы (1)


runDrain вернет ZIO, который вы можете запустить и забыть с помощью foreachPar_.

for {
  sqs <- Sqs.>.async
  urls <- Sqs.>.queueUrls
  // Returns ZIO[R, E, Unit] and executes each effect in parallel while discarding the results
  _ <- ZIO.foreachPar_(urls) { url =>
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
          // Handles up to 10 messages at a time in parallel.
          .mapMParUnordered(10)(handleMessage)
          // The stream is already unbounded so no need to have `.forever`
          .runDrain
      }
} yield ()

Я бы также пояснил, что SqsStream уже должен быть неограниченным, поэтому вам не нужно использовать forever, а параметр mapMParUnordered относится к максимальному параллелизму, а не к общему количеству обработанных событий.

person paulpdaniels    schedule 11.02.2020