Удаленные вызовы блокируются при использовании методов в объекте-актере?

Выполнение следующего не будет работать одновременно, вместо этого он сначала выполнит Run1 и заблокируется до завершения, прежде чем он выполнит Run2.

@ray.remote
class Test:
    def __init__(self):
        pass

    def Run1(self):
        print('Run1 Start')
        sleep(5)
        print('Run1 End')

    def Run2(self):
        print('Run2')

ray.init()
test = Test.remote()
test.Run1.remote()
test.Run2.remote()

sleep(10)

Выход:

(pid=8109) Run1 Start
(pid=8109) Run1 End
(pid=8109) Run2

Это немного неожиданно. Как я могу добиться, чтобы методы выполнялись одновременно?

ИЗМЕНИТЬ, ЧТОБЫ УСТРАНИТЬ ПОСЛЕДУЮЩИЕ КОММЕНТАРИИ:

Двухпотоковый подход, похоже, не работает. Приведенный ниже код постоянно приводит к поломке каналов из PyArrow. Я хотел бы последовательно запускать оба метода - метод self.PreloadSamples и метод self.Optimize - параллельно. Класс BufferActor собирает и предоставляет пакетные образцы с помощью декорированного метода GetSamples () @ ray.remote. Поскольку данные на графическом процессоре не сериализуемы, это необходимо делать на стороне объекта оптимизатора, и я хочу убедиться, что это выполняется параллельно, а не последовательно по отношению к оптимизации.

См. Ниже полностью изолированную версию проблемы, которая воспроизводит проблемы примерно через 1 минуту работы:

import torch
import ray
import threading
from time import sleep


def Threaded(fn):
    def wrapper(*args, **kwargs):
        thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
        thread.start()
        return thread
    return wrapper

@ray.remote
class BufferActor():
    def __init__(self):
        pass

    def GetSamples(self):
        return torch.randn(32, 100)


@ray.remote(num_gpus=1)
class OptimizerActor():
    def __init__(self, bufferActor):
        self.bufferActor = bufferActor
        self.samplesOnGPU = list()

        self.PreloadSamples()
        self.Optimize()

    @Threaded
    def PreloadSamples(self):
        #this retrieves a batch of samples (in numpy/torch format on CPU)
        if (len(self.samplesOnGPU) < 5):
            samples = ray.get(self.bufferActor.GetSamples.remote())

            self.samplesOnGPU.append(samples.to('cuda'))

            print('Samples Buffer: %s' % len(self.samplesOnGPU))
        else:
            sleep(0.01)

        self.PreloadSamples()

    @Threaded
    def Optimize(self):
        if (len(self.samplesOnGPU) > 0):
            samples = self.samplesOnGPU.pop(0)
            print('Optimizing')

            #next we perform loss calc + backprop + optimizer step (not shown)

        sleep(0.01)
        self.Optimize()



ray.init()

bufferActor = BufferActor.remote()
optimizerActor = OptimizerActor.remote(bufferActor)

sleep(60*60)


ray
person Muppet    schedule 12.06.2019    source источник


Ответы (1)


Акторы будут выполнять один метод за раз, чтобы избежать проблем с параллелизмом. Если вам нужен параллелизм с акторами (что вы обычно делаете), лучший способ - запустить двух (или более) акторов и передать задачи им обоим.

person Robert Nishihara    schedule 13.06.2019
comment
Проблема в том, что мне нужны методы для передачи несериализуемых данных, поэтому я пытаюсь выполнить их в том же классе. В этом конкретном случае я хочу предварительно загрузить данные из BufferActor, чтобы их можно было поместить на графический процессор и подготовить для обработки Backpropagation / Optimizer. 70% времени уходит на передачу данных на графический процессор, поэтому мне бы очень хотелось, чтобы это выполнялось в фоновом режиме непрерывно, а не последовательно. Я пробовал использовать потоки (с ray.get() внутри), но это часто приводило к поломке труб. - person Muppet; 13.06.2019
comment
В этом случае, я думаю, что-то вроде того, что метод актора запускает фоновый поток, который выполняет некоторую работу, имеет смысл. Вам нужно вызвать ray.get в фоновом потоке? - person Robert Nishihara; 13.06.2019
comment
Да. Ему необходимо получить образцы для BufferActor. Я адаптировал приведенный выше код, чтобы вы могли видеть, как я сейчас с этим справляюсь. К сожалению, код всегда прерывается, когда в какой-то (случайной) точке происходит разрыв канала, даже если он работает нормально в течение нескольких минут. - person Muppet; 13.06.2019