Как сделать пулинг HTTP-соединения с скрученным?

Я пишу очень простую программу-паук для получения веб-страниц с одного сайта.

Вот минимизированная версия.

from twisted.internet import epollreactor  
epollreactor.install()
from twisted.internet import reactor
from twisted.web.client import Agent, HTTPConnectionPool, readBody

baseUrl = 'http://acm.zju.edu.cn/onlinejudge/showProblem.do?problemCode='

start = 1001
end = 3500

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 10
agent = Agent(reactor, pool=pool)

def onHeader(response, i):
    deferred = readBody(response)
    deferred.addCallback(onBody, i)
    deferred.addErrback(errorHandler)
    return response

def onBody(body, i):
    print('Received %s, Length %s' % (i, len(body)))

def errorHandler(err):
    print('%s : %s' % (reactor.seconds() - startTimeStamp, err))

def requestFactory():
    for i in range (start, end):
        deferred = agent.request('GET', baseUrl + str(i))
        deferred.addCallback(onHeader, i)
        deferred.addErrback(errorHandler)
        print('Generated %s' % i)
        reactor.iterate(1)

    print('All requests has generated, elpased %s' % (reactor.seconds() - startTimeStamp))

startTimeStamp = reactor.seconds()
reactor.callWhenRunning(requestFactory)
reactor.run()

Для нескольких запросов, например 100, он работает нормально. Но для массовых запросов это не удастся.

Я ожидаю, что все запросы (около 3000) должны быть автоматически объединены в пул, запланированы и конвейеризированы, поскольку я использую HTTPConnectionPool, устанавливаю maxPersistentPerHost, создаю с ним экземпляр Agent и постепенно создаю соединения.

Но это не так, соединения не поддерживаются и не объединяются.

В этой программе соединения устанавливаются постепенно, но соединения не объединяются в пул, каждое соединение закрывается после получения тела, а более поздние запросы никогда не ждут в пуле доступного соединения.

Таким образом, потребуются тысячи сокетов, и, в конце концов, произойдет сбой из-за тайм-аута, потому что на удаленном сервере тайм-аут соединения установлен на 30 с. Тысячи запросов не могут быть выполнены в течение 30 секунд.

Не могли бы вы помочь мне в этом?

Я старался изо всех сил, вот мои находки.

  • Ошибка произошла ровно через 30 секунд после запуска реактора, на нее не будут влиять другие факторы.
  • Let the spider fetch my server, I find something interesting.
    1. The HTTP protocol version is 1.1 (I check the twisted document, the default HTTPClient is 1.0 rather than 1.1)
    2. Если бы я не добавлял какой-либо явный заголовок (как и в минимизированной версии), заголовок запроса не содержал бы Connection: Keep-Alive, либо заголовок ответа.
    3. Если я добавлю явный заголовок, чтобы убедиться, что это постоянное соединение, заголовок запроса будет содержать Connection: Keep-Alive, но заголовок ответа по-прежнему не будет. (Я уверен, что мой сервер ведет себя правильно, другие вещи, такие как Chrome, wget действительно получили заголовок Connection: Keep-Alive.)
  • Я проверяю /proc/net/sockstat во время бега, сначала он быстро увеличивается, а потом быстро уменьшается. (я увеличил ulimit для поддержки большого количества сокетов)
  • I write a similar program with treq, a twisted based request library). The code is almost the same, so not paste here.
    • Link: https://gist.github.com/Preffer/dad9b1228fcd75cebd75
    • Его поведение почти такое же. Не объединение. Ожидается, что он будет объединяться, как описано в списке функций treq.
    • Если я добавлю к нему явный заголовок, Connection: Keep-Alive никогда не появится в заголовке ответа.

Основываясь на всем вышесказанном, я весьма подозрительно отношусь к заголовку Connection: Keep-Alive, который портит программу. Но этот заголовок является частью стандарта HTTP 1.1 и сообщается как HTTP 1.1. Я совершенно озадачен этим.


person Eugene    schedule 28.08.2014    source источник
comment
Объединение в пул, объединение в пул и опрос — это разные вещи (ну, насколько я знаю, первое — это вообще не вещь).   -  person Jean-Paul Calderone    schedule 28.08.2014
comment
Прошу прощения за свою ошибку, все это должно быть pooling. Любые советы для моей проблемы?   -  person Eugene    schedule 29.08.2014
comment
Я нахожу аналогичный вопрос, он решил мою проблему. stackoverflow .com/questions/2861858/   -  person Eugene    schedule 29.08.2014
comment
Всякий раз, когда у вас есть фрагменты кода с переполнением стека, помещайте их в переполнение стека, а не размещайте на внешних сайтах.   -  person Glyph    schedule 29.08.2014
comment
@Glyph Извините, я вставляю код сейчас.   -  person Eugene    schedule 29.08.2014


Ответы (2)


Я решил проблему сам, с помощью IRC и другого вопроса в stackoverflow, Поставить в очередь удаленные вызовы перспективному брокеру Python Twisted?

Таким образом, поведение агента сильно отличается от поведения в Nodejs (у меня есть некоторый опыт работы с Nodejs). Как описано в документе Nodejs.

агент.запросы

Объект, содержащий очереди запросов, которые еще не были назначены сокетам.

агент.maxSockets

По умолчанию установлено значение 5. Определяет, сколько одновременных сокетов агент может открыть для каждого источника. Происхождение представляет собой комбинацию «хост:порт» или «хост:порт:локальный адрес».

Итак, вот разница.

  • Скрученный:

    • There is no doubt that Agent could queue requests if construct with a HTTPConnectionPool instance.
    • Но если новый запрос выдается после того, как соединения в пуле закончились, агент все равно создаст новое соединение и выполнит запрос, а не поместит его в очередь.
    • На самом деле это приведет к удалению соединения в пуле и добавлению вновь сгенерированного соединения в пул, сохраняя количество соединений равным maxPersistentPerHost
  • Узлы:

    • By default, agent will queue the requests with a implicit connection pool, which have a size of 5 connections.
    • Если после исчерпания соединений в пуле выдается новый запрос, агент помещает запросы в очередь в agent.requests переменную, ожидая доступного соединения.

Поведение агента в скрученном виде приводит к тому, что агент может ставить запросы в очередь, но на самом деле это не так.

Следуйте нашей интуиции: после назначения пула соединений агенту это соответствует интуиции, согласно которой агент будет использовать только соединения в пуле и ждать доступного соединения, если пул закончился. Это точно соответствует агенту в Nodejs.

Лично я считаю, что это ошибочное поведение в Twisted или, по крайней мере, его можно было бы улучшить, предоставив возможность установить поведение агента.

В соответствии с этим я должен использовать DeferredSemaphore для ручного планирования запросов.

Я поднимаю вопрос в проекте treq на github и получаю аналогичное решение. https://github.com/dreid/treq/issues/71

Вот мое решение.

#!/usr/bin/env python
from twisted.internet import epollreactor
epollreactor.install()
from twisted.internet import reactor
from twisted.web.client import Agent, HTTPConnectionPool, readBody
from twisted.internet.defer import DeferredSemaphore

baseUrl = 'http://acm.zju.edu.cn/onlinejudge/showProblem.do?problemCode='

start = 1001
end = 3500
count = end - start
concurrency = 10
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = concurrency
agent = Agent(reactor, pool=pool)
sem = DeferredSemaphore(concurrency)
done = 0

def onHeader(response, i):
    deferred = readBody(response)
    deferred.addCallback(onBody, i)
    deferred.addErrback(errorHandler, i)
    return deferred

def onBody(body, i):
    sem.release()
    global done, count
    done += 1
    print('Received %s, Length %s, Done %s' % (i, len(body), done))
    if(done == count):
        print('All items fetched')
        reactor.stop()

def errorHandler(err, i):
    print('[%s] id %s: %s' % (reactor.seconds() - startTimeStamp, i, err))

def requestFactory(token, i):
    deferred = agent.request('GET', baseUrl + str(i))
    deferred.addCallback(onHeader, i)
    deferred.addErrback(errorHandler, i)
    print('Request send %s' % i)
    #this function it self is a callback emit by reactor, so needn't iterate manually
    #reactor.iterate(1)
    return deferred

def assign():
    for i in range (start, end):
        sem.acquire().addCallback(requestFactory, i)

startTimeStamp = reactor.seconds()
reactor.callWhenRunning(assign)
reactor.run()

Это правильно? Прошу указать на мою ошибку и улучшения.

person Eugene    schedule 29.08.2014

Для нескольких запросов, например 100, он работает нормально. Но для массовых запросов это не удастся.

Это либо защита от поисковых роботов, либо защита сервера от DoS/DDoS, поскольку вы отправляете слишком много запросов с одного и того же IP-адреса за короткое время, поэтому брандмауэр или WSA заблокируют ваш будущий запрос. Просто измените свой скрипт, чтобы сделать запрос в пакетном режиме через некоторое время. вы можете использовать callLater() через некоторое время после каждого запроса X.

person e-nouri    schedule 29.08.2014
comment
Конечно, это возможно, но позже я обнаружил, что это не так, потому что Nodejs работает нормально. Не могли бы вы взглянуть на ответ самостоятельно? - person Eugene; 29.08.2014
comment
Проблема кажется мне более простой, поскольку ваш код протестирует ее и сообщит мне, работает ли она, кроме того, я думаю, что добавление времени между каждыми 100 проще, чем перестройка другого сканера. Удачи друг! - person e-nouri; 29.08.2014
comment
Конечно, это работает. Теперь я понял, что должен связать DeferredSemaphore с Agent, чтобы поведение агента соответствовало ожиданиям. - person Eugene; 30.08.2014