многопроцессорная сломанная труба после долгого времени

Я разрабатываю сканер с использованием многопроцессорной модели.

которые используют multiprocessing.Queue для хранения URL-информации, которую необходимо сканировать, содержимое страницы, которое необходимо проанализировать, и многое другое; используйте multiprocessing.Event для управления подпроцессами; используйте multiprocessing.Manager.dict для хранения хэша просканированного URL-адреса; каждая многопроцессорность. Экземпляр Manager.dict использует multiprocessing.Lock для управления доступом.

Все параметры трех типов являются общими для всех подпроцессов и родительского процесса, и все параметры организованы в класс. Я использую экземпляр класса для передачи общих параметров из родительского процесса в подпроцесс. Так же, как: MGR = SyncManager() class Global_Params(): Queue_URL = multiprocessing.Queue() URL_RESULY = MGR.dict() URL_RESULY_Mutex = multiprocessing.Lock() STOP_EVENT = multiprocessing.Event() global_params = Global_Params()

В моем собственном механизме тайм-аута я использую process.terminate, чтобы остановить процесс, который не может остановиться сам по себе в течение длительного времени!

В моем тестовом примере было более 2500 целевых сайтов (некоторые из них не обслуживаются, некоторые огромны). сканировать сайт за сайтом, который находится в файле целевых сайтов.

В начале краулер мог работать хорошо, но через долгое время (где-то 8 часов, иногда 2 часа, иногда больше 15 часов) краулер просканировал более 100 (что неопределенно) сайтов, я получу информацию об ошибке :"Errno 32 сломанная труба"

Я пробовал следующие методы для определения местоположения и решения проблем:

  1. Найдите сайт A, на котором краулер сломался, затем используйте краулер для обхода сайта отдельно, краулер работал хорошо. Даже если я получаю фрагмент (например, 20 сайтов) из всех файлов целевых сайтов, содержащих сайт А, сканер работает хорошо!

  2. добавьте "-X /tmp/pymp-* 240 /tmp" в /etc/cron.daily/tmpwatch

  3. когда произошел сбой, файл /tmp/pymp-* все еще существует

  4. используйте multiprocessing.managers.SyncManager, замените multiprocessing.Manager и игнорируйте большинство сигналов, кроме SIGKILL и SIGTERM

  5. для каждого целевого сайта я очищаю большинство общих параметров (очереди, словари и события), если произошла ошибка, создаю новый экземпляр:

while global_params.Queue_url.qsize()>0: try: global_params.Queue_url.get(block=False) except Exception,e: print_info(str(e)) print_info("Clear Queue_url error!") time.sleep(1) global_params.Queue_url = Queue() pass ниже приведена информация Traceback, функция print_info предназначена для печати и хранения отладочной информации: [Errno 32] Broken pipe Traceback (most recent call last): File "Spider.py", line 613, in <module> main(args) File "Spider.py", line 565, in main spider.start() File "Spider.py", line 367, in start print_info("STATIC_RESULT size:%d" % len(global_params.STATIC_RESULT)) File "<string>", line 2, in __len__ File "/usr/local/python2.7.3/lib/python2.7/multiprocessing/managers.py", line 769, in _callmethod kind, result = conn.recv() EOFError Я не могу понять, почему, кто-нибудь знает причину?


person aassembly    schedule 09.12.2014    source источник
comment
Кажется, у вас недостаточно памяти для того, что вы пытаетесь обработать. Вам нужно будет найти способ освободить то, что вам больше не нужно использовать.   -  person eliasah    schedule 09.12.2014
comment
Когда краулер зависал, я проверял память, там еще свободно 10G памяти(всего 16G), а в кроулер очищал каждую Queue и dict в каждом цикле.   -  person aassembly    schedule 09.12.2014
comment
Это могут быть системы, которые убивают ваш процесс, поскольку он работает так долго. Возможно, вам потребуется настроить для него дополнительные привилегии в вашей системе.   -  person eliasah    schedule 09.12.2014
comment
Да, мои привилегии - обычный пользователь, а не root, но я могу использовать sudo для запуска python. Я не знаю, как настроить дополнительную привилегию, не могли бы вы привести пример?   -  person aassembly    schedule 09.12.2014
comment
Дело не в привилегиях sudo. Речь идет о вашем обработчике процессов. Я не уверен, как это настроить, но я почти уверен, что именно он вызывает все проблемы.   -  person eliasah    schedule 09.12.2014
comment
Хорошо спасибо. Я попробую ваше предложение.   -  person aassembly    schedule 09.12.2014
comment
Excute ulimit -a, я обнаружил, что максимальное количество пользовательских процессов составляет 1024. Но я думаю, что есть еще одна причина для сломанной трубы. Я пробую метод: stackoverflow.com/questions/3649458/ . Надеюсь, это полезно.   -  person aassembly    schedule 10.12.2014


Ответы (1)


Я не знаю, решает ли это вашу проблему, но есть один момент, о котором стоит упомянуть:

global_params.Queue_url.get(block=False)

... выдает исключение Queue.Empty, если очередь пуста. Не стоит пересоздавать очередь для пустого исключения.

Пересоздание очереди может привести к условиям гонки.

С моей точки зрения, у вас есть возможности:

  1. избавиться от блока кода "восстановления очереди"
  2. переключиться на другую реализацию очереди

использовать:

from Queue import Queue

вместо:

from multiprocessing import Queue
person st0ne    schedule 09.12.2014
comment
Да. Поскольку код находится в стадии тестирования, я сейчас не обращаю внимания на детали. Спасибо. - person aassembly; 09.12.2014
comment
о да, я понял. но это может привести к условиям гонки. я отредактировал свой ответ. - person st0ne; 09.12.2014
comment
да, но Queue.Queue нельзя разделить между процессами, насколько я помню, я проверял. - person aassembly; 10.12.2014
comment
затем постарайтесь избежать воссоздания очереди, если она пуста. - person st0ne; 10.12.2014
comment
да, сделали. stackoverflow .com/questions/3649458/ Я думаю, что @pankaj может быть прав, но я не уверен, куда следует вставить код. Я не мог его подключить, моя репутация слишком низка. - person aassembly; 10.12.2014