Я разрабатываю сканер с использованием многопроцессорной модели.
которые используют multiprocessing.Queue для хранения URL-информации, которую необходимо сканировать, содержимое страницы, которое необходимо проанализировать, и многое другое; используйте multiprocessing.Event для управления подпроцессами; используйте multiprocessing.Manager.dict для хранения хэша просканированного URL-адреса; каждая многопроцессорность. Экземпляр Manager.dict использует multiprocessing.Lock для управления доступом.
Все параметры трех типов являются общими для всех подпроцессов и родительского процесса, и все параметры организованы в класс. Я использую экземпляр класса для передачи общих параметров из родительского процесса в подпроцесс. Так же, как:
MGR = SyncManager()
class Global_Params():
Queue_URL = multiprocessing.Queue()
URL_RESULY = MGR.dict()
URL_RESULY_Mutex = multiprocessing.Lock()
STOP_EVENT = multiprocessing.Event()
global_params = Global_Params()
В моем собственном механизме тайм-аута я использую process.terminate, чтобы остановить процесс, который не может остановиться сам по себе в течение длительного времени!
В моем тестовом примере было более 2500 целевых сайтов (некоторые из них не обслуживаются, некоторые огромны). сканировать сайт за сайтом, который находится в файле целевых сайтов.
В начале краулер мог работать хорошо, но через долгое время (где-то 8 часов, иногда 2 часа, иногда больше 15 часов) краулер просканировал более 100 (что неопределенно) сайтов, я получу информацию об ошибке :"Errno 32 сломанная труба"
Я пробовал следующие методы для определения местоположения и решения проблем:
Найдите сайт A, на котором краулер сломался, затем используйте краулер для обхода сайта отдельно, краулер работал хорошо. Даже если я получаю фрагмент (например, 20 сайтов) из всех файлов целевых сайтов, содержащих сайт А, сканер работает хорошо!
добавьте "-X /tmp/pymp-* 240 /tmp" в /etc/cron.daily/tmpwatch
когда произошел сбой, файл /tmp/pymp-* все еще существует
используйте multiprocessing.managers.SyncManager, замените multiprocessing.Manager и игнорируйте большинство сигналов, кроме SIGKILL и SIGTERM
для каждого целевого сайта я очищаю большинство общих параметров (очереди, словари и события), если произошла ошибка, создаю новый экземпляр:
while global_params.Queue_url.qsize()>0:
try:
global_params.Queue_url.get(block=False)
except Exception,e:
print_info(str(e))
print_info("Clear Queue_url error!")
time.sleep(1)
global_params.Queue_url = Queue()
pass
ниже приведена информация Traceback, функция print_info предназначена для печати и хранения отладочной информации:
[Errno 32] Broken pipe
Traceback (most recent call last):
File "Spider.py", line 613, in <module>
main(args)
File "Spider.py", line 565, in main
spider.start()
File "Spider.py", line 367, in start
print_info("STATIC_RESULT size:%d" % len(global_params.STATIC_RESULT))
File "<string>", line 2, in __len__
File "/usr/local/python2.7.3/lib/python2.7/multiprocessing/managers.py", line 769, in _callmethod
kind, result = conn.recv()
EOFError
Я не могу понять, почему, кто-нибудь знает причину?