Как запретить параллельным заданиям Sidekiq выполнять код в Rails

У меня есть около 10 рабочих, которые выполняют работу, которая включает в себя следующее:

user = User.find_or_initialize_by(email: '[email protected]')

if user.new_record?
# ... some code here that does something taking around 5 seconds or so
elsif user.persisted?
# ... some code here that does something taking around 5 seconds or so
end

user.save

Проблема в том, что в определенное время два или более воркера запускают этот код в точное время, и, таким образом, позже я узнал, что у двух или более Пользователей одинаковые email, в которых я всегда должен заканчивать только уникальные электронные письма.

В моей ситуации невозможно создать уникальные индексы БД для email, так как уникальные адреса электронной почты являются условными — некоторые пользователи должны иметь уникальный адрес электронной почты, а некоторые — нет.

Стоит отметить, что моя модель User имеет проверки уникальности, но это все равно не помогает мне, потому что между .find_or_initialize_by и .save есть код, который зависит от того, создан ли уже пользовательский объект или нет.

Я пробовал пессимистическую и оптимистическую блокировку, но это мне не помогло, или, может быть, я просто не реализовал ее должным образом... если у вас есть предложения по этому поводу.

Решение, о котором я могу думать, состоит в том, чтобы блокировать другие потоки (задания Sidekiq) всякий раз, когда эти строки кода выполняются, но я не слишком уверен, как это реализовать, и я не знаю, является ли это даже рекомендуемым подходом.

Буду признателен за любую помощь.

РЕДАКТИРОВАТЬ

В моем конкретном случае будет сложно указать параметр электронной почты в задании, так как это задание немного сложнее, чем то, что было только что сказано выше. На самом деле задание представляет собой сценарий экспорта, в котором часть задания представляет собой код, приведенный выше. Я не думаю, что также возможно разделить вышеуказанную функциональность на другого отдельного работника... поскольку весь поток заданий должен быть последовательным и никакие части не должны обрабатываться параллельно/асинхронно. Это задание является лишь одним из заданий, управляемых другим заданием, в котором в конечном счете управляется главным заданием.


person Jay-Ar Polidario    schedule 12.02.2015    source источник


Ответы (3)


Пессимистическая блокировка - это то, что вам нужно, но она работает только с существующей записью - вы не можете использовать ее с new_record?, потому что в БД пока нечего блокировать.

person Mike Perham    schedule 12.02.2015
comment
Да, это была моя проблема и раньше, когда я пробовал пессимистическую блокировку - записи все еще не было. Ну, это все еще довольно незнакомо мне, поэтому я просто подумал, что, возможно, я сделал что-то не так в том, как я к этому подошел. Но я думаю, я снова попробую пессимистическую блокировку и, возможно, найду хак с блокировкой потока. Спасибо - person Jay-Ar Polidario; 12.02.2015

Я бы предложил другую архитектуру, чтобы обойти проблему.

Как насчет модели производитель-воркер, в которой один главный процесс Sidekiq получает список адресов электронной почты, а затем порождает рабочий процесс Sidekiq для каждого письма? Sidekiq упрощает эту задачу благодаря выделенной очереди для общения мастеров и рабочих.

При этом адрес электронной почты становится входным параметром работников, поэтому мы знаем по построению, что работники не будут путать данные друг друга.

person Eric Platon    schedule 16.02.2015
comment
Здесь был другой ответ, но он удалил его. Я упомянул там, что в моем конкретном случае будет сложно указать параметр электронной почты в задании, так как это задание немного сложнее, чем то, что было только что сказано выше. На самом деле задание представляет собой сценарий экспорта, в котором часть задания представляет собой код, приведенный выше. Я не думаю, что также возможно разделить вышеуказанную функциональность на другого отдельного работника... поскольку весь поток заданий должен быть последовательным и никакие части не должны обрабатываться параллельно/асинхронно. Хотя я ценю твой жест. Спасибо. - person Jay-Ar Polidario; 16.02.2015
comment
Понимаю. Тогда как вы думаете сделать свой вопрос немного более узким? Добавление деталей вашего комментария сделало бы всю ветку более полезной для людей, которые сталкиваются с похожими проблемами. Я не знаю точно о вашей ситуации, но (в целом) может быть стоит попытаться разбить задачу на короткие части (рекомендовано Майком Перхамом — создателем Sidekiq). В целом кажется, что вам нужна обработка транзакций. Это немного больше работы, но главный процесс может также управлять транзакцией для своих рабочих... - person Eric Platon; 16.02.2015
comment
Сначала я думал упростить свой вопрос, но, думаю, вы правы, я обновлю свой вопрос, включив в него подробности. Да, в настоящее время мы реализуем транзакционную обработку, и на самом деле у нас много независимых заданий (одно из которых включает это). Тем не менее, я думаю, что теперь я начинаю понимать, что вы имели в виду под мастер-процессом, когда упомянули «управление транзакцией для его рабочих». Обсужу перспективу с коллегами. Спасибо еще раз. - person Jay-Ar Polidario; 16.02.2015

Мне удалось решить мою проблему следующим образом:

Я обнаружил, что на самом деле могу добавить предложение where в Частичный индекс уникальности БД Rails , и, таким образом, теперь я могу настроить условия уникальности для разных типов пользователей на уровне базы данных, в которых другие параллельные задания теперь будут вызывать ошибку ActiveRecord::RecordNotUnique, если они уже созданы.

Единственная проблема теперь - это код между .find_or_initialize_by и .save, поскольку они зависят от времени от объектов пользователя, в которых всегда только одно параллельное задание всегда должно получать .new_record? == true, а другие параллельные задания должны запускать .persisted? == true, как одно задание. всегда будьте первым, чтобы создать его, но... все это еще не работает, потому что это только в строке .save, где вызывается проверка индекса уникальности db. Поэтому мне удалось решить эту проблему, поместив .save перед этими условиями, и в то же время я добавил блок восстановления для .save, который затем добавляет еще одно задание в очередь самого себя, если оно вызывает ошибку ActiveRecord::RecordNotUnique, чтобы убедиться, что асинхронные задания не будет конфликтов. Код теперь выглядит так, как показано ниже.

user = User.find_or_initialize_by(email: '[email protected]')

begin
  user.save
  is_new_record = user.new_record?
  is_persisted = user.persisted?

rescue ActiveRecord::RecordNotUnique => exception
  MyJob.perform_later(params_hash)
end

if is_new_record
  # do something if not yet created
elsif is_persisted
  # do something if already created
end
person Jay-Ar Polidario    schedule 16.02.2015