Я пытаюсь прервать multiprocessing.connection.Listener.accept()
, но пока безуспешно. Поскольку он не предоставляет параметр timeout
, я подумал, что, возможно, я мог бы использовать socket.setdefaulttimeout()
, чтобы прервать его, как было предложено в сообщении, которое я больше не могу найти здесь, на SO.
Это не сработало. Затем я попытался вызвать close()
для объекта Listener()
. согласно этому сообщению, это должно было сработать.
Однако похоже, что эти объекты не должны подыгрывать обычным решениям, связанным с socket
.
Я могу подтвердить, что Listener
закрыт объектом Timer
, как и ожидалось, но вызов accept()
не прерывается.
Код:
import logging
import socket
import os
from multiprocessing.connection import Listener
from queue import Queue, Empty
from threading import Thread, Event, Timer
class Node(Thread):
"""Base Class providing a AF_INET, AF_UNIX or AF_PIPE connection to its
data queue. It offers put() and get() method wrappers, and therefore
behaves like a Queue as well as a Thread.
Data from the internal queue is automatically fed to any connecting client.
"""
def __init__(self, sock_name, max_q_size=None, timeout=None,
*thread_args, **thread_kwargs):
"""Initialize class.
:param sock_name: UDS, TCP socket or pipe name
:param max_q_size: maximum queue size for self.q, default infinite
"""
self._sock_name = sock_name
self.connector = Listener(sock_name)
max_q_size = max_q_size if max_q_size else 0
self.q = Queue(maxsize=max_q_size)
self._running = Event()
self.connection_timer = Timer(timeout, self.connection_timed_out)
super(Node, self).__init__(*thread_args, **thread_kwargs)
def connection_timed_out(self):
"""Closes the Listener and shuts down Node if no Client connected.
:return:
"""
self.connector.close()
self.join()
def _start_connection_timer(self):
self.connection_timer.start()
def start(self):
self._running.set()
super(Node, self).start()
def join(self, timeout=None):
print("clearing..")
self._running.clear()
print("internal join")
super(Node, self).join(timeout=timeout)
print("Done")
def run(self):
while self._running.is_set():
print("Accepting connections..")
self._start_connection_timer()
try:
client = self.connector.accept()
self.connection_timer.cancel()
self.feed_data(client)
except (TimeoutError, socket.timeout):
continue
except Exception as e:
raise
print("Run() Terminated!")
def feed_data(self, client):
try:
while self._running.is_set():
try:
client.send(self.q.get())
except Empty:
continue
except EOFError:
return
if __name__ == '__main__':
import time
n = Node('/home/nils/git/spab2/test.uds', timeout=10)
n.start()
print("Sleeping")
time.sleep(15)
print("Manual join")
n.join()
Я понимаю, что мой вопрос дублирует этот вопрос, однако это почти год. старый и даже не получил комментариев. Кроме того, я использую Unix Domain Socket
s, а не соединение TCP
связанного сообщения.