Сохранить очередь Python в файл

Я использую cclass Python Queue для управления списком задач, которые совместно используются несколькими рабочими потоками. Фактический код огромен, и я все еще нахожусь в процессе того, чтобы полностью избавиться от ошибок. Время от времени рабочие потоки падают, и мне приходится перезапускать всю процедуру. В процессе я теряю все поставленные в очередь задачи. Есть ли способ сохранить очередь в файл, чтобы всякий раз, когда я перезапускаю процесс, список задач предварительно загружался из этого файла?

На первый взгляд кажется, что когда я получаю или ставлю задачи в очередь, я должен одновременно читать и писать в файл. Однако это не дает мне функциональности queue.task_done() и может быть не самым оптимизированным решением. Любые идеи очень приветствуются.


person Shah W    schedule 04.08.2011    source источник
comment
Прежде чем тратить слишком много времени на написание собственной среды параллельных вычислений, проверьте, есть ли существующая среда, которую вы можете использовать. Например. Параллельная среда IPython, или Kamaelia.   -  person Thomas K    schedule 04.08.2011


Ответы (5)


Рассматривали ли вы возможность просто процедить свою очередь?

person kojiro    schedule 04.08.2011
comment
Вы предполагаете, что каждый раз, когда в очередь вносятся изменения, их следует сериализовать и сохранять в файл? При большом размере очереди это может быть не оптимальным решением. - person Shah W; 05.08.2011
comment
Быть уверенным. Но вы уже по колено в неоптимальной ситуации. Держите рабочие потоки от сбоя, и вопрос становится спорным. - person kojiro; 05.08.2011

Есть несколько подходов к этому, включая модуль pickle...

Но, на мой взгляд, было бы проще просто записать в файл, строку за строкой, каждый элемент очереди в столбцах, содержащих другие свойства, которые вы, возможно, захотите сохранить, например task_done.

пример:

element1, True
element2, False
...

В python очень легко читать файл, отформатированный следующим образом:

for line in file('path/file.ext'):
    name, state = line.split(sep_char)
    #and them insert into the queue...
person BrainStorm    schedule 04.08.2011
comment
Это было действительно первое решение, которое пришло мне в голову. Однако я надеялся найти быструю реализацию, построенную поверх класса очереди. Внутреннее решение потребовало бы от меня умения работать с многопоточностью и управлять общими ресурсами данных. - person Shah W; 05.08.2011

Самый простой способ сделать это — использовать AMQP для очередей сообщений и позволить брокеру сообщений позаботиться о сообщениях за вас. Я реализовал аналогичную систему, используя RabbitMQ в качестве брокера сообщений с надежными постоянными очередями. Сообщения даже пережили сбой серверного программного обеспечения RabbitMQ, когда я использовал устаревшую версию сервера 1.72 на виртуальном сервере Linux с 512 МБ ОЗУ и примерно миллионом сообщений в игре.

Как я это делаю, каждый тип работника потребляет сообщения из другой очереди. Если мне нужно более одного работника этого типа, то очередь сообщений автоматически циклически обрабатывается, и если работник не может завершить обработку сообщения, он просто не подтверждает его, и он возвращается в очередь.

Я написал небольшой модуль-прокладку примерно из 80 строк кода, чтобы разместить его перед kombu, а позже переписал его, чтобы использовать py-amqplib. Если бы я знал о haigha раньше, я бы использовал его, так как он очень близко соответствует документу со спецификациями AMQP.

Я не рекомендую комбу, потому что он очень сложен для отладки и странным образом расходится со стандартом AMQP. Взгляните на haigha, потому что, хотя документация представляет собой не более чем один пример фрагмента кода на PyPi, она лучше документирована, чем kombu или amqplib, потому что вы можете использовать спецификации AMQP в качестве документов haigha.

person Michael Dillon    schedule 10.08.2011

Я могу предложить простой вариант — обернуть таблицу базы данных в класс и использовать ее в качестве очереди. Столбец с автоинкрементом будет творить чудеса для этого (следующий элемент, который нужно удалить, — это элемент с наименьшим идентификатором).

class dbQueue:
  init():
    # Pick some random id for this run (or set it to some thing you know).
  put():
    # Insert entry into table
  get():
    # The update .. select combo removes the need for a database that has transactions.
    # If no entries bear your ID:
      # Update the next entry that is not already marked with your ID.
    # Select the entry that matches your ID and return it.
  task_done():
    # Delete the entry with your ID.

Это не будет иметь наилучшей производительности в зависимости от того, как часто обновляется очередь, даже база данных sqlite в памяти не будет такой быстрой, как структура связанного списка. Обратная сторона заключается в том, что вы можете просматривать базу данных с помощью любого инструмента, который имеет доступ к базе данных, поэтому вы можете видеть, какой из них выполняется.

person David    schedule 06.09.2011

Реализуйте механизм рукопожатия между работником и мастером.

У мастера есть список задач, прежде чем помещать их в Очередь, соберите список в файл. Затем вставьте задачу в очередь. Когда рабочий завершает работу, он отправляет обратно сообщение ACK. Только в этот момент разберите список задач и удалите соответствующий идентификатор.

person fabrizioM    schedule 06.09.2011