Поймать прерывание клавиатуры, чтобы остановить работу многопроцессорного рабочего Python в очереди

Из нескольких сообщений, найденных в stackoverflow, я создал этот код.

Сценарий

Я хочу иметь multiprocessing.queue, в котором несколько рабочих "слушают"

В случае прерывания клавиатуры основной процесс больше не должен помещать новые элементы в очередь, и с помощью объектов-стражей рабочий процесс должен быть изящно остановлен.

Проблема

Моя проблема с текущей версией, где я использую

signal.signal(signal.SIGINT, signal.SIG_IGN) 

Игнорировать Ctrl + C означает, что он также игнорируется основным процессом.

Любые идеи ? Нужно ли использовать многопроцессорный рабочий пул? Некоторые примеры указывают на то, что мне, возможно, придется. Могу ли я тогда по-прежнему использовать очередь?

from multiprocessing import Pool, Process,Queue
import time
import signal
# http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Queue
# http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process


class Worker(Process):
    def __init__(self, queue,ident):
        super(Worker, self).__init__()
        # Ignore Signals
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        self.queue= queue
        self.idstr= str(ident)
        print "Ident" + self.idstr

    def run(self):
        print 'Worker started'
        # do some initialization here

        print 'Computing things!'
        for data in iter( self.queue.get, None ):
            print "#" + self.idstr + " : " + str(data)
            time.sleep(5)
            print "#" + self.idstr + "Queue Size: " + str(self.queue.qsize())

        print "Worker Done"

#### Main ####
request_queue = Queue(10)

for i in range(4):
    Worker( request_queue,i ).start()

try:
    for data in range(1000000):
        request_queue.put( data )
        #print "Queue Size: " + str(request_queue.qsize())
        # Sentinel objects to allow clean shutdown: 1 per worker.
    for i in range(4):
        request_queue.put( None ) 

except KeyboardInterrupt:
    print "Caught KeyboardInterrupt, terminating workers"
    while  request_queue.empty()==False:
         request_queue.get()
    request_queue.put( None )    

person Dukeatcoding    schedule 26.06.2013    source источник
comment
Взгляните на этот stackoverflow. ком/вопросы/4205317/   -  person Paco    schedule 26.06.2013
comment
эй, Пако, да, я нашел его и думал о том, как его интегрировать. так что очередь перестанет заполняться и рабочие будут остановлены. При первой мысли я пришел к выводу, что я все еще хочу, чтобы исключение было в основном потоке.   -  person Dukeatcoding    schedule 26.06.2013


Ответы (2)


Основываясь на вашем решении (что хорошо), я добавил дополнительный уровень защиты, если основной код не отвечает и пользователь дважды отменяет:

global STOP

import os, signal
def signal_handler(sig, frame):
    global STOP
    if STOP:
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        os.kill(os.getpid(), signal.SIGTERM)
    STOP = True
signal.signal(signal.SIGINT, signal_handler)
person codepoet    schedule 29.12.2013

Я думаю, что нашел решение. Еще мне не нравится, что я получаю SIGINT 1 раз от main и 4 раза от Worker, но, возможно, мне придется с этим смириться.

  1. Я указал обработчик сигнала прерывания.
  2. После получения первого Sig INT я игнорирую дальнейший сигнал SIG Int
  3. Я переключаю флаг остановки на TRUE
  4. Я разрываю цикл вставки очереди
  5. Я вызываю функцию остановки, которая очищает очередь и вставляет стоп-сигналы.

    from multiprocessing import Pool, Process,Queue
    import time
    import signal
    # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Queue
    # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process
    
    # Stop Flag for loop
    stop = False
    
    # Define SIGINT
    def signal_handler(sig, frame):
        print 'You pressed Ctrl+C!'
        global stop
        stop = True
        # Ignore more Ctrl+C
        signal.signal(signal.SIGINT, signal.SIG_IGN) 
    
    signal.signal(signal.SIGINT, signal_handler)
    
    def stopSentinel(request_queue):
        print "CTRL Stop Queue and insert None"
    
    # Empty Existing Queue
    while  request_queue.empty()==False:
         request_queue.get()
    
    # Put One None for each Worker
    for i in range(4):
        request_queue.put( None ) 
    
    
    class Worker(Process):
        def __init__(self, queue,ident):
            super(Worker, self).__init__()
    
            self.queue= queue
            self.idstr= str(ident)
            print "Ident" + self.idstr
    
        def run(self):
            print 'Worker started'
            # do some initialization here
    
            print 'Computing things!'
            for data in iter( self.queue.get, None ):
                print "#" + self.idstr + " : " + str(data)
                time.sleep(5)
                print "#" + self.idstr + "Queue Size: " + str(self.queue.qsize())
    
            print "Worker Done"
    
    
    
    #### Main #####
    request_queue = Queue(10)
    
    for i in range(4):
        Worker( request_queue,i ).start()
    
    #### Fill Queue with Data ####
    for data in range(1000000):
        request_queue.put( data )
        #print "Queue Size: " + str(request_queue.qsize())
        # Sentinel objects to allow clean shutdown: 1 per worker.
    
        # Check for Stop
        print "Check Breakout"
        if stop == True:
            print "Stop Break"
            break
    
    if stop == True:
        stopSentinel(request_queue)
    else:       
        print "Normal Stop" 
        for i in range(4):
            request_queue.put( None ) 
    
person Dukeatcoding    schedule 26.06.2013