Как использовать kqueue для мониторинга файлов в asyncio?

Я хочу использовать kqueue для отслеживания изменений в файлах. Я вижу, как использовать select.kqueue() в многопоточном режиме.

Я ищу способ использовать его с asyncio. Я, возможно, пропустил что-то действительно очевидное здесь. Я знаю, что python использует kqueue для asyncio на macos. Я рад, что любое решение работает только при использовании селектора kqueue.

Пока единственный способ, которым я вижу это, - создать поток для непрерывного kqueue.control() из другого потока, а затем вводить события с помощью asyncio.loop.call_soon_threadsafe(). Я чувствую, что должен быть лучший способ.


person Philip Couling    schedule 15.09.2020    source источник


Ответы (1)


Вы можете добавить FD из объекта kqueue в качестве средства чтения в цикл управления, используя loop.add_reader(). Затем цикл управления сообщит вам, что события готовы к сбору.

Для этого есть две особенности, которые могут показаться странными тем, кто знаком с kqueue:

  • select.kqueue.control — это одноразовый метод, сначала меняет монитор и ждет поступления новых событий. Поскольку мы не хотим, чтобы он когда-либо блокировался, два действия должны быть разделены на один неблокирующий вызов для изменения монитора и второй, более поздний, неблокирующий вызов для сбора результирующих событий.
  • Поскольку мы никогда не хотим блокировать, тайм-аут никогда не может быть использован. Это можно повторно реализовать с помощью asyncio.wait_for()

Есть более эффективные способы написать это, но вот пример того, как полностью заменить select.kqueue.control асинхронным методом (здесь он называется kqueue_control):

async def kqueue_control(kqueue: select.kqueue,
                         changes: Optional[Iterable[select.kevent]],
                         max_events: int,
                         timeout: Optional[int]):

    def receive_result():
        try:
            # Events are ready to collect; fetch them but do not block
            results = kqueue.control(None, max_events, 0)
        except Exception as ex:
            future.set_exception(ex)
        else:
            future.set_result(results)
        finally:
            loop.remove_reader(kqueue.fileno())
            
    # If this call is non-blocking then just execute it
    if timeout == 0 or max_events == 0:
        return kqueue.control(changes, max_events, 0)
    
    # Apply the changes, but DON'T wait for events
    kqueue.control(changes, 0)
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    loop.add_reader(kqueue.fileno(), receive_result)
    if timeout is None:
        return await future
    else:
        return await asyncio.wait_for(future, timeout)

person Philip Couling    schedule 16.09.2020