У меня проблемы с использованием twisted вместе с doctest
. Я пытаюсь открыть файл вот так:
from __future__ import print_function
from twisted.internet.defer import Deferred
from twisted.internet.fdesc import readFromFD, setNonBlocking
from twisted.internet.task import react
def asyncFunc(filename):
"""Description.
Examples:
>>> def run(reactor, filename):
... d = asyncFunc(filename)
... return d.addCallback(print)
>>>
>>> try:
... react(run, ['hello.txt'])
... except SystemExit:
... pass
hello world
<BLANKLINE>
"""
with open(filename) as f:
d = Deferred()
fd = f.fileno()
setNonBlocking(fd)
readFromFD(fd, d.callback)
return d
Он отлично работает только с одним тестом, но при нескольких я получаю ReactorNotRestartable
ошибку.
def anotherAsyncFunc(filename):
"""Description.
Examples:
>>> def run(reactor, filename):
... d = anotherAsyncFunc(filename)
... return d.addCallback(print)
>>>
>>> try:
... react(run, ['hello.txt'])
... except SystemExit:
... pass
hello world
<BLANKLINE>
"""
with open(filename) as f:
d = Deferred()
fd = f.fileno()
setNonBlocking(fd)
readFromFD(fd, d.callback)
return d
Затем я прочитал о MemoryReactor
и попробовал следующее:
def anotherAsyncFunc(filename):
"""Description.
Examples:
>>> from twisted.test.proto_helpers import MemoryReactor
>>> reactor = MemoryReactor()
>>>
>>> def run(reactor, filename):
... d = anotherAsyncFunc(filename)
... return d.addCallback(print)
>>>
>>> try:
... react(run, ['hello.txt'], reactor)
... except SystemExit:
... pass
hello world
<BLANKLINE>
"""
with open(filename) as f:
d = Deferred()
fd = f.fileno()
setNonBlocking(fd)
readFromFD(fd, d.callback)
return d
Но это дает мне 'MemoryReactor' object has no attribute 'addSystemEventTrigger'
AttributeError
. Есть идеи, как это сделать?