Я пытаюсь написать очень простой вводный пример использования Akka Streams. Я пытаюсь в основном создать поток, который берет диапазон целых чисел в качестве источника и отфильтровывает все целые числа, которые не являются простыми, создавая поток простых целых чисел в качестве своего вывода.
Класс, создающий поток, довольно прост; для этого у меня есть следующее.
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Flow;
import com.aparapi.Kernel;
import com.aparapi.Range;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class PrimeStream {
private final AverageRepository averageRepository = new AverageRepository();
private final ActorSystem actorSystem;
public PrimeStream(ActorSystem actorSystem) {
this.actorSystem = actorSystem;
}
public Flow<Integer, Integer, NotUsed> filterPrimes() {
return Flow.of(Integer.class).grouped(10000).mapConcat(PrimeKernel::filterPrimes).filter( v -> v != 0);
}
}
Когда я запускаю следующий тест, он работает нормально.
private final ActorSystem actorSystem = ActorSystem.create("Sys");
@Test
public void testStreams() {
Flow<Integer, Integer, NotUsed> filterStream = new PrimeStream(actorSystem).filterPrimes();
Source<Integer, NotUsed> flow = Source.range(10000000, 10001000).via(filterStream);
flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem));
}
Однако, когда я увеличиваю диапазон в х10 раз, изменяя строку в тесте на следующую, он больше не работает.
Source<Integer, NotUsed> flow = Source.range(10000000, 10010000).via(filterStream);
Теперь, когда тест запускается, ни исключений, ни предупреждений. Он просто запускается, а затем завершается, вообще не отображая никакого текста на консоли.
Просто чтобы убедиться, что проблема не в самом тесте на простоту, я провел тест в том же диапазоне без использования Akka Streams, и он работает нормально. Следующий код работает без проблем.
@Test
public void testPlain() {
List<Integer> in = IntStream.rangeClosed(10000000, 10010000).boxed().collect(Collectors.toList());
List<Integer> out = PrimeKernel.filterPrimes(in);
System.out.println(out);
}
Просто для ясности сам тест на простоту берет список целых чисел и устанавливает любой элемент в списке в 0, если он не является простым.
Как предложил @RamonJRomeroyVigil, если я полностью удалю часть mapConcat, но оставлю все как есть, на самом деле, я распечатаю 10 000 целых чисел. Однако, если я оставлю все то же самое, но просто заменю filterPrimes методом, который просто возвращает параметр метода как есть, не касаясь его, тогда он вообще ничего не выводит на экран. Я также попытался добавить println к начальному filterPrime для его отладки. Всякий раз, когда он не печатает какие-либо выходные данные, содержащие оператор отладки. Поэтому даже не предпринимается никаких попыток вызвать filterPrimes.
runForeach
возвращает будущее, поэтому, если вы хотите, чтобы все числа были напечатаны, вам нужно подождать в будущем, иначе тестовая функция вернется, и программа завершится без завершения будущего. - person Ankur   schedule 10.05.2018