Как правильно закрыть переменное количество потоков?

Я создаю несколько потоков, к которым у меня есть доступ параллельно (или, возможно, параллельно). Я знаю, как сделать попытку с ресурсами, когда количество ресурсов фиксировано во время компиляции, но что, если количество ресурсов определяется параметром?

У меня есть что-то вроде этого:

private static void foo(String path, String... files) throws IOException {
    @SuppressWarnings("unchecked")
    Stream<String>[] streams = new Stream[files.length];

    try {
        for (int i = 0; i < files.length; i++) {
            final String file = files[i];
            streams[i] = Files.lines(Paths.get(path, file))
                .onClose(() -> System.out.println("Closed " + file));
        }

        // do something with streams
        Stream.of(streams)
            .parallel()
            .flatMap(x -> x)
            .distinct()
            .sorted()
            .limit(10)
            .forEach(System.out::println);
    }
    finally {
        for (Stream<String> s : streams) {
            if (s != null) {
                s.close();
            }
        }
    }
}

person Mark Jeronimus    schedule 07.05.2015    source источник
comment
Вы спрашиваете, есть ли ресурсы попытки с помощью, которые могли бы справиться с вашей ситуацией? Ответ отрицательный, но то, что у вас есть, просто прекрасно.   -  person Kayaman    schedule 07.05.2015
comment
В качестве альтернативы можно было бы перенести открытие потоков в параллельные операции, чтобы каждая из них имела дело только с одним потоком.   -  person biziclop    schedule 07.05.2015
comment
Да, хотя есть одна загвоздка: close() теоретически (хотя на практике маловероятно) может выдать UncheckedIOException, поэтому вам, вероятно, следует заключить s.close() в try { s.close(); } catch (Exception ex) { //quash or log }.   -  person Andrew Janke    schedule 07.05.2015
comment
Остерегаться! Ваша комбинация sorted()/distinct() и forEach (а не forEachOrdered) - рецепт проблем. См. stackoverflow.com/q/28259636/2711488.   -  person Holger    schedule 07.05.2015


Ответы (2)


Вы можете написать составной AutoCloseable для управления динамическим количеством AutoCloseable:

import java.util.ArrayList;
import java.util.List;

public class CompositeAutoclosable<T extends AutoCloseable> implements AutoCloseable {
    private final List<T> components= new ArrayList<>();

    public void addComponent(T component) { components.add(component); }

    public List<T> getComponents() { return components; }

    @Override
    public void close() throws Exception {
        Exception e = null;
        for (T component : components) {
            try { component.close(); }
            catch (Exception closeException) {
                if (e == null) { e = closeException; }
                else { e.addSuppressed(closeException); }
            }
        }
        if (e != null) { throw e; }
    }
}

и вы можете использовать его в своем методе:

private static void foo(String path, String... files) throws Exception {
    try (CompositeAutoclosable<Stream<String>> streams 
            = new CompositeAutoclosable<Stream<String>>()) {
        for (int i = 0; i < files.length; i++) {
            final String file = files[i];
            streams.addComponent(Files.lines(Paths.get(path, file))
                .onClose(() -> System.out.println("Closed " + file)));
        }
        streams.getComponents().stream()
            .parallel()
            .flatMap(x -> x)
            .distinct()
            .sorted()
            .limit(10)
            .forEach(System.out::println);
    }
}
person gontard    schedule 07.05.2015
comment
Хотя мне не нравится создавать собственные служебные классы, это, по крайней мере, работает. Причина, по которой мне это не нравится? Когда у других разработчиков, работающих над (разными частями) проекта, возникает аналогичная потребность, они могут не знать о вспомогательном классе и разрабатывать собственное решение. - person Mark Jeronimus; 07.05.2015
comment
Это может быть решено путем общения между разработчиками. Вчера у меня была эта проблема, поэтому я написал утилиту... - person gontard; 07.05.2015
comment
Если вы поймали closeException и уже есть предыдущее исключение, вы должны использовать addSuppressed вместо того, чтобы перезаписывать предыдущее исключение… - person Holger; 07.05.2015
comment
Разве не плохо просто поймать/бросить Exception ? Не могли бы вы вместо этого использовать IOExcetion ? - person JonasCz; 07.05.2015
comment
Нет, к сожалению, AutoCloseable объявляет close() throws Exception. @Holger приятно знать. - person Mark Jeronimus; 07.05.2015
comment
@JonasCz: к сожалению, Stream реализует AutoCloseable, а не Closeable. Хотя Stream.close не объявляет проверенные исключения, interface делает. Таким образом, попытка написать код без объявления throws Exception потребует особого случая Stream вместо того, чтобы обрабатывать его в общем случае через interface. - person Holger; 07.05.2015
comment
@Holger я обновил код для управления несколькими исключениями. - person gontard; 11.05.2015

документация Stream.flatMap гласит:

Каждый отображаемый поток закрывается после помещения его содержимого в этот поток.

Другими словами, для обычного закрытия потоков не требуется никаких дополнительных действий. Однако, поскольку закрыты только обработанные потоки, не следует жадно создавать потоки, не зная, будут ли они впоследствии обработаны потоком:

private static void foo(String path, String... files) throws IOException {
    Arrays.stream(files).flatMap(file-> {
              try { return Files.lines(Paths.get(path, file))
                    .onClose(() -> System.out.println("Closed " + file)); }
              catch(IOException ex) { throw new UncheckedIOException(ex); } })
          .parallel()
          .distinct()
          .sorted()
          .limit(10)
          .forEachOrdered(System.out::println);
}

Создавая подпотоки внутри flatMap, гарантируется, что каждый из них будет создан только в том случае, если поток будет его обрабатывать. Таким образом, это решение закроет все подпотоки даже без внешнего Stream внутри оператора try-with-resource.

person Holger    schedule 07.05.2015
comment
Хотя я уверен, что ваше решение работает, очень трудно читать и видеть, что происходит в каждом утверждении. Каждый разработчик должен предпочесть легко читаемый код оптимизированному и/или умному коду, иначе это вызовет проблемы с обслуживанием или ошибки. - person Mark Jeronimus; 07.05.2015
comment
Вы уверены, что ваше первое решение читает файлы параллельно, как исходный код в вопросе? Я думаю, что parallel следует применять перед flatMap. - person Przemyslaw Zych; 09.05.2015
comment
@Przemyslaw Zych: размещение parallel() не имеет значения, оно превратит всю обработку в параллель, поскольку потоковый конвейер в целом является либо параллельным, либо последовательным. - person Holger; 11.05.2015