Потоки Akka не запускаются, когда в Source большое количество записей

Я пытаюсь написать очень простой вводный пример использования Akka Streams. Я пытаюсь в основном создать поток, который берет диапазон целых чисел в качестве источника и отфильтровывает все целые числа, которые не являются простыми, создавая поток простых целых чисел в качестве своего вывода.

Класс, создающий поток, довольно прост; для этого у меня есть следующее.

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Flow;
import com.aparapi.Kernel;
import com.aparapi.Range;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class PrimeStream {
    private final AverageRepository averageRepository = new AverageRepository();
    private final ActorSystem actorSystem;

    public PrimeStream(ActorSystem actorSystem) {
        this.actorSystem = actorSystem;
    }

    public Flow<Integer, Integer, NotUsed> filterPrimes() {
        return Flow.of(Integer.class).grouped(10000).mapConcat(PrimeKernel::filterPrimes).filter( v -> v != 0);
    }
}

Когда я запускаю следующий тест, он работает нормально.

private final ActorSystem actorSystem = ActorSystem.create("Sys");

@Test
public void testStreams() {
    Flow<Integer, Integer, NotUsed> filterStream = new PrimeStream(actorSystem).filterPrimes();
    Source<Integer, NotUsed> flow = Source.range(10000000, 10001000).via(filterStream);
    flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem));
}

Однако, когда я увеличиваю диапазон в х10 раз, изменяя строку в тесте на следующую, он больше не работает.

Source<Integer, NotUsed> flow = Source.range(10000000, 10010000).via(filterStream);

Теперь, когда тест запускается, ни исключений, ни предупреждений. Он просто запускается, а затем завершается, вообще не отображая никакого текста на консоли.

Просто чтобы убедиться, что проблема не в самом тесте на простоту, я провел тест в том же диапазоне без использования Akka Streams, и он работает нормально. Следующий код работает без проблем.

@Test
public void testPlain() {
    List<Integer> in = IntStream.rangeClosed(10000000, 10010000).boxed().collect(Collectors.toList());
    List<Integer> out = PrimeKernel.filterPrimes(in);
    System.out.println(out);
}

Просто для ясности сам тест на простоту берет список целых чисел и устанавливает любой элемент в списке в 0, если он не является простым.

Как предложил @RamonJRomeroyVigil, если я полностью удалю часть mapConcat, но оставлю все как есть, на самом деле, я распечатаю 10 000 целых чисел. Однако, если я оставлю все то же самое, но просто заменю filterPrimes методом, который просто возвращает параметр метода как есть, не касаясь его, тогда он вообще ничего не выводит на экран. Я также попытался добавить println к начальному filterPrime для его отладки. Всякий раз, когда он не печатает какие-либо выходные данные, содержащие оператор отладки. Поэтому даже не предпринимается никаких попыток вызвать filterPrimes.


person Jeffrey Phillips Freeman    schedule 10.05.2018    source источник
comment
Просто дикая догадка, можете ли вы попробовать изменить сгруппированное значение с 10000 на 1000?   -  person Ankur    schedule 10.05.2018
comment
@Ankur, если я сделаю это, он будет работать с лицом. Но я хочу, чтобы он запускался партиями по 10 000, потому что этот метод будет обрабатывать простые числа в среде с ускорением на GPU. поэтому будет работать лучше всего в больших партиях.   -  person Jeffrey Phillips Freeman    schedule 10.05.2018
comment
Подождите секунду, runForeach возвращает будущее, поэтому, если вы хотите, чтобы все числа были напечатаны, вам нужно подождать в будущем, иначе тестовая функция вернется, и программа завершится без завершения будущего.   -  person Ankur    schedule 10.05.2018
comment
@Ankur, возможно, учебник, который я извлек, не показал этого (или, может быть, я что-то неправильно понял).. позвольте мне попробовать это ...   -  person Jeffrey Phillips Freeman    schedule 10.05.2018
comment
@Ankur Итак, runForEach не возвращает для меня Future, но он возвращает что-то, что может вести себя примерно так же ... так что я думаю, что вы можете что-то понять ...   -  person Jeffrey Phillips Freeman    schedule 10.05.2018
comment
Да, он возвращает docs.oracle.com/javase /8/docs/api/java/util/concurrent/ .. которые представляют собой асинхронные вычисления, как и будущее.   -  person Ankur    schedule 10.05.2018
comment
@Ankur Я пытался вызвать .handle, пытаясь завершить его, но безуспешно. Не могли бы вы предложить ответ на этот вопрос с предлагаемым решением, чтобы я мог лучше понять, как, по вашему мнению, я мог бы решить эту проблему. Это также позволит мне проголосовать и принять ответ.   -  person Jeffrey Phillips Freeman    schedule 10.05.2018


Ответы (1)


runForeach возвращает CompletionStage, поэтому, если вы хотите увидеть все печатаемые числа, вам нужно подождать CompletionStage, в противном случае тестовая функция вернется, и программа завершится без завершения CompletionStage.

Пример:

flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem)).toCompletableFuture().join();
person Ankur    schedule 10.05.2018
comment
Теперь это работает, спасибо. Как ни странно, метод .thenRun(...), предложенный в официальных документах (который я пробовал ранее), не помогает. Очень запутанно, но, вероятно, что-то очевидное мне не хватает. Несмотря на то, что я принимаю это как правильный ответ, спасибо. - person Jeffrey Phillips Freeman; 10.05.2018
comment
thenRun просто говорит, что когда этап завершается, запускается данная функция, но она не заставляет текущий поток ждать завершения асинхронного вычисления, вместо этого он возвращает новый CompletionStage, которого вам снова придется ждать. - person Ankur; 10.05.2018
comment
Спасибо, этот урок, похоже, не включает эту часть, как и некоторые другие. Очень неприятно: doc.akka.io/docs/akka/2.5 /stream/stream-quickstart.html - person Jeffrey Phillips Freeman; 10.05.2018