Выполнение следующего не будет работать одновременно, вместо этого он сначала выполнит Run1 и заблокируется до завершения, прежде чем он выполнит Run2.
@ray.remote
class Test:
def __init__(self):
pass
def Run1(self):
print('Run1 Start')
sleep(5)
print('Run1 End')
def Run2(self):
print('Run2')
ray.init()
test = Test.remote()
test.Run1.remote()
test.Run2.remote()
sleep(10)
Выход:
(pid=8109) Run1 Start
(pid=8109) Run1 End
(pid=8109) Run2
Это немного неожиданно. Как я могу добиться, чтобы методы выполнялись одновременно?
ИЗМЕНИТЬ, ЧТОБЫ УСТРАНИТЬ ПОСЛЕДУЮЩИЕ КОММЕНТАРИИ:
Двухпотоковый подход, похоже, не работает. Приведенный ниже код постоянно приводит к поломке каналов из PyArrow. Я хотел бы последовательно запускать оба метода - метод self.PreloadSamples и метод self.Optimize - параллельно. Класс BufferActor собирает и предоставляет пакетные образцы с помощью декорированного метода GetSamples () @ ray.remote. Поскольку данные на графическом процессоре не сериализуемы, это необходимо делать на стороне объекта оптимизатора, и я хочу убедиться, что это выполняется параллельно, а не последовательно по отношению к оптимизации.
См. Ниже полностью изолированную версию проблемы, которая воспроизводит проблемы примерно через 1 минуту работы:
import torch
import ray
import threading
from time import sleep
def Threaded(fn):
def wrapper(*args, **kwargs):
thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
thread.start()
return thread
return wrapper
@ray.remote
class BufferActor():
def __init__(self):
pass
def GetSamples(self):
return torch.randn(32, 100)
@ray.remote(num_gpus=1)
class OptimizerActor():
def __init__(self, bufferActor):
self.bufferActor = bufferActor
self.samplesOnGPU = list()
self.PreloadSamples()
self.Optimize()
@Threaded
def PreloadSamples(self):
#this retrieves a batch of samples (in numpy/torch format on CPU)
if (len(self.samplesOnGPU) < 5):
samples = ray.get(self.bufferActor.GetSamples.remote())
self.samplesOnGPU.append(samples.to('cuda'))
print('Samples Buffer: %s' % len(self.samplesOnGPU))
else:
sleep(0.01)
self.PreloadSamples()
@Threaded
def Optimize(self):
if (len(self.samplesOnGPU) > 0):
samples = self.samplesOnGPU.pop(0)
print('Optimizing')
#next we perform loss calc + backprop + optimizer step (not shown)
sleep(0.01)
self.Optimize()
ray.init()
bufferActor = BufferActor.remote()
optimizerActor = OptimizerActor.remote(bufferActor)
sleep(60*60)