Передача итератора списка нескольким потокам в Java

У меня есть список, содержащий примерно 200 тыс. элементов.

Могу ли я передать итератор для этого списка нескольким потокам и заставить их перебирать всю партию без доступа к одним и тем же элементам?

Это то, о чем я думаю в данный момент.

Главный:

public static void main(String[] args)
{
    // Imagine this list has the 200,000 elements.
    ArrayList<Integer> list = new ArrayList<Integer>();

    // Get the iterator for the list.
    Iterator<Integer> i = list.iterator();

    // Create MyThread, passing in the iterator for the list.
    MyThread threadOne = new MyThread(i);
    MyThread threadTwo = new MyThread(i);
    MyThread threadThree = new MyThread(i);

    // Start the threads.
    threadOne.start();
    threadTwo.start();
    threadThree.start();
}

Моя тема:

public class MyThread extends Thread
{

    Iterator<Integer> i;

    public MyThread(Iterator<Integer> i)
    {
        this.i = i;
    }

    public void run()
    {
        while (this.i.hasNext()) {
            Integer num = this.i.next();
            // Do something with num here.
        }
    }
}

Мой желаемый результат здесь состоит в том, чтобы каждый поток обрабатывал примерно 66 000 элементов каждый, не слишком сильно блокируя итератор, а также чтобы ни один из потоков не обращался к одному и тому же элементу.

Это звучит выполнимо?


person Tom Wright    schedule 05.02.2016    source источник
comment
Использование Java 8 Streams и parallel() кажется здесь подходящим вариантом использования.   -  person Arnaud Denoyelle    schedule 05.02.2016
comment
Нет, вы не можете (безопасно, с этим кодом), потому что вызовы hasNext и next не являются атомарными.   -  person Andy Turner    schedule 05.02.2016
comment
@AndyTurner С потоками OP не будет обрабатывать итераторы вручную.   -  person Arnaud Denoyelle    schedule 05.02.2016
comment
очень сложно сделать это безопасно. но это можно сделать практически без времени ожидания, если вы синхронизируете только индекс   -  person nafas    schedule 05.02.2016
comment
@AndyTurner да, возможно с Java 8,   -  person nafas    schedule 05.02.2016


Ответы (5)


Вам действительно нужно управлять потоками и итераторами вручную? Вы можете использовать Java 8 Streams и позволить parallel() выполнять эту работу.

По умолчанию он будет использовать на один поток меньше, поскольку у вас есть процессоры.

Пример :

list.stream()
    .parallel()
    .forEach(this::doSomething)
;

//For example, display the current integer and the current thread number.
public void doSomething(Integer i) {
  System.out.println(String.format("%d, %d", i, Thread.currentThread().getId()));
}

Результат :

49748, 13
49749, 13
49750, 13
192710, 14
105734, 17
105735, 17
105736, 17
[...]

Изменить: если вы используете maven, вам нужно будет добавить эту часть конфигурации в pom.xml, чтобы использовать Java 8:

<build>
  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.3</version>
      <configuration>
        <source>1.8</source>
        <target>1.8</target>
      </configuration>
    </plugin>
  </plugins>
</build>
person Arnaud Denoyelle    schedule 05.02.2016
comment
Похоже, отличное решение, но я получаю, что ссылки на методы не поддерживаются на этом уровне языка, когда я пытаюсь сделать это выше. Любые идеи? - person Tom Wright; 05.02.2016
comment
Ваш проект не настроен для использования Java 8. Подсказка: если вы используете maven, вам нужно добавить часть конфигурации. - person Arnaud Denoyelle; 05.02.2016
comment
@TomWright Я добавил часть maven conf для Java 8 - person Arnaud Denoyelle; 05.02.2016
comment
Спасибо, Арно, все заработало. Сейчас я тестирую код в вашем ответе. - person Tom Wright; 05.02.2016
comment
Обратите внимание: если вам нужно контролировать количество запущенных потоков, чтобы не разрушить внешние системы (в случаях, когда вы выполняете внешние вызовы из своего потока), вы должны использовать подсписок, упомянутый ниже, в сочетании с упомянутым ниже ExecutorService, где fixedThreadPool имеет количество потоков, которые вы хотите использовать - person user2051552; 16.07.2019

Вы не можете сделать это потокобезопасным способом с помощью одного итератора. Я предлагаю использовать подсписки:

List sub1 = list.subList(0, 100);
List sub2 = list.subList(100, 200);

ArrayList#subList() просто обернет данный список без копирования элементов. Затем вы можете повторять каждый подсписок в другом потоке.

person AdamSkywalker    schedule 05.02.2016

Поскольку метод next() класса, реализующего интерфейс Iterator, выполняет манипуляции с данными, параллельное использование метода next() требует синхронизации. Синхронизация может быть выполнена с использованием блока synchronized в объекте итератора следующим образом:

synchronized(i)
{
    i.next();
}

Тем не менее, я рекомендую использовать Stream API, как в ответе выше, если вам нужна только параллельная обработка списка.

person oak    schedule 05.02.2016
comment
Без синхронизации какой здесь возможный исход? Может ли i.next() возвращать одно и то же значение для нескольких вызывающих потоков? - person Rory; 19.11.2020

Привет, чтобы предотвратить ваши потоки от дредов или голодания, вы можете использовать ExecutorService из класса пула потоков. Для меня это лучше, чем использование синхронизированных, блокировок или повторных блокировок. Вы также можете попробовать использовать Fork/join, но я не использовал его раньше. Это пример кода, но я надеюсь, что вы поняли идею

public static void main(String[] args){
   ExecutorService executor = Executors.newFixedThreadPool(200000);
   List<Future<Integer>> futureList = new ArrayList<>();
   //iteration code goes here
  executor.shutdown();
}

Public class MyThread implements Callable<ArrayList<Integer>>{

@Override
        public Iterator<Integer> call() throws Exception {
            //code goes here!
        }  

}
person Arthur Decker    schedule 05.02.2016

Если вы используете параллельный поток, вы будете выполнять свой код во многих потоках с элементами, равномерно распределенными между потоками:

list.parallelStream().forEach(this::processInteger);

Такой подход делает программирование действительно простым; вся тяжелая работа выполняется JRE.

Кроме того, что касается вашего кода, расширение Thread является плохим стилем. Вместо этого реализуйте Runnable и передайте экземпляр конструктору Thread — смотрите вживую

person Bohemian♦    schedule 05.02.2016