Как читать все элементы из канала, когда вы не знаете их фактическое количество

Я пытаюсь реализовать сканер, который посещает какой-то URL-адрес, собирает из него новые относительные URL-адреса и создает отчет. Я пытаюсь сделать это одновременно, используя волокна и каналы Crystal, например:

urls = [...] # of String
visited_urls = []

pool_size.times do
  spawn do
    loop do
      url = urls.shift?
      break if url.nil?

      channel.send(url) if some_condition
    end
  end
end

# TODO: here the problem!
loop do
  url = channel.receive?
  break if url.nil? || channel.closed?

  visited_urls << url
end

puts visited_urls.inspect

Но здесь у меня проблема - бесконечная секунда loop (она вызывает channel.receive? до последнего элемента в канале, а затем ждет нового сообщения, которое никогда не приходит). Проблема существует, потому что я никогда не знаю, сколько элементов на самом деле в канале, поэтому я не могу сделать то, что предлагается в Concurency Руководства по языку Crystal.

Так что, может быть, есть хорошие практики работы с каналом, когда мы не знаем, сколько предметов он будет хранить, а нам нужно получить? Спасибо!


person Jack Owels    schedule 10.09.2019    source источник


Ответы (1)


Распространенное решение - иметь значение kill. Либо как часть основного потока данных, например:

results = Channel(String|Symbol).new(POOL_SIZE * 2)

POOL_SIZE.times do
  spawn do
    while has_work?
      results.send "some work result"
    end

    results.send :done
  end
end

done_workers = 0

loop do
  message = results.receive
  if message == :done
    done_workers += 1
    break if done_workers == POOL_SIZE
  elsif message.is_a? String
    puts "Got: #{message}"
  end
end

Или через дополнительный канал, чтобы сигнализировать о событии:

results = Channel(String).new(POOL_SIZE * 2)
done = Channel(Nil).new(POOL_SIZE)

POOL_SIZE.times do
  spawn do
    while has_work?
      results.send "some work result"
    end

    done.send nil
  end
end

done_workers = 0
loop do
  select
  when message = results.receive
    puts "Got: #{message}"
  when done.receive
    done_workers += 1
    break if done_workers == POOL_SIZE
  end
end
person Jonne Haß    schedule 10.09.2019