API потоковой передачи Twitter - urllib3.exceptions.ProtocolError: ('Соединение прервано: IncompleteRead

Запуск сценария python с использованием tweepy which streaming (с использованием потокового API твиттера) в случайной выборке английских твитов в течение минуты, затем чередуется с поиском (с использованием API поиска в твиттере) в течение минуты, а затем возвращается. Проблема, которую я обнаружил, заключается в том, что примерно через 40 секунд происходит сбой потоковой передачи и возникает следующая ошибка:

Полная ошибка:

urllib3.exceptions.ProtocolError: ('Соединение прервано: IncompleteRead (прочитано 0 байт)', IncompleteRead (прочитано 0 байт))

Количество прочитанных байтов может варьироваться от 0 до целых тысяч.

В первый раз, когда это видно, потоковая передача преждевременно прерывается, и функция поиска запускается раньше, после того, как функция поиска завершена, она снова возвращается в поток, и при втором повторении этой ошибки происходит сбой кода.

Код, который я использую:

# Handles date time calculation
def calculateTweetDateTime(tweet):
    tweetDateTime = str(tweet.created_at)

    tweetDateTime = ciso8601.parse_datetime(tweetDateTime)
    time.mktime(tweetDateTime.timetuple())
    return tweetDateTime

# Checks to see whether that permitted time has past.
def hasTimeThresholdPast():
    global startTime
    if time.clock() - startTime > 60:
        return True
    else:
        return False

#override tweepy.StreamListener to add logic to on_status
class StreamListener(StreamListener):

    def on_status(self, tweet):
        if hasTimeThresholdPast():
            return False

        if hasattr(tweet, 'lang'):
            if tweet.lang == 'en':

                try:
                    tweetText = tweet.extended_tweet["full_text"]
                except AttributeError:
                    tweetText = tweet.text

                tweetDateTime = calculateTweetDateTime(tweet)

                entityList = DataProcessing.identifyEntities(True, tweetText)
                DataStorage.storeHotTerm(entityList, tweetDateTime)
                DataStorage.storeTweet(tweet)


    def on_error(self, status_code):
        def on_error(self, status_code):
            if status_code == 420:
                # returning False in on_data disconnects the stream
                return False


def startTwitterStream():

    searchTerms = []

    myStreamListener = StreamListener()
    twitterStream = Stream(auth=api.auth, listener=StreamListener())
    global geoGatheringTag
    if geoGatheringTag == False:
        twitterStream.filter(track=['the', 'this', 'is', 'their', 'though', 'a', 'an'], async=True, stall_warnings=True)

    if geoGatheringTag == True:
        twitterStream.filter(track=['the', 'this', 'is', 'their', 'though', 'a', 'an', 'they\'re'],
                             async=False, locations=[-4.5091, 55.7562, -3.9814, 55.9563], stall_warnings=True)



# ----------------------- Twitter API Functions ------------------------
# XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
# --------------------------- Main Function ----------------------------

startTime = 0


def main():
    global startTime
    userInput = ""
    userInput.lower()
    while userInput != "-1":
        userInput = input("Type ACTiVATE to activate the Crawler, or DATABASE to access data analytic option (-1 to exit): \n")
        if userInput.lower() == 'activate':
            while(True):
                startTime = time.clock()

                startTwitterStream()

                startTime = time.clock()
                startTwitterSearchAPI()

if __name__ == '__main__':
    main() 

Я урезал функцию поиска и аспекты обработки базы данных, поскольку они разделены, и чтобы не загромождать код.

Если у кого-то есть идеи, почему это происходит и как я могу решить эту проблему, дайте мне знать, мне было бы любопытно получить любую информацию.


Решения, которые я пробовал:
Блок Try / Except с http.client.IncompleteRead:
Согласно Ошибка-при-fetching-tweets-with-tweepy

Установка Stall_Warning = в True:
Согласно Incompleteread-error-when -retrieving-twitter-data-using-python

Удаление фильтра английского языка.


person Chris Cookman    schedule 15.11.2018    source источник


Ответы (2)


Решено.

Для любопытных или тех, кто сталкивается с подобной проблемой: после некоторых экспериментов я обнаружил, что проблема заключалась в задержке входящих твитов. Каждый раз, когда система получает твит, моя система запускала процесс идентификации и сохранения объекта, который стоил небольшого отрезка времени, и за время сбора от нескольких сотен до тысяч твитов это отставание становилось все больше и больше, пока API не смог его обработать и подбросил эту ошибку.

Решение. Разделите функцию on_status / on_data / on_success до самого необходимого и обрабатывайте любые вычисления, например сохранение или идентификацию объекта, отдельно после закрытия сеанса потоковой передачи. В качестве альтернативы вы можете сделать свои вычисления намного более эффективными и сделать временной промежуток несущественным на ваше усмотрение.

person Chris Cookman    schedule 16.11.2018
comment
это мне очень помогло, у меня были те же проблемы. По сути, решение состоит в том, чтобы просто сбрасывать данные и выполнять обработку отдельно, как вы правильно упомянули. - person tezzaaa; 15.03.2019
comment
Всем привет. Спасибо за это, но что вы имеете в виду, убирая функцию on_status / on_data / on_success? Думаю, меня смущает тот факт, что вы даже не реализуете эту функцию в своем StreamListener. - person SteakOverflow; 26.09.2020
comment
@SteakOverflow Привет, поэтому on_status - первая функция, объявленная в SteamListener: class StreamListener (StreamListener): def on_status (self, tweet): другие имена, которые я предложил с on_data / on_success, являются обычно используемыми альтернативными именами для этого типа функция. Какое бы имя вы ни выбрали, ключ должен минимизировать интенсивность обработки данных, пока поток активен, поскольку он может перегрузить его и вызвать сбой. Какая бы у вас ни была функция, которая считывает данные, она будет классифицироваться как функция on_data. - person Chris Cookman; 28.09.2020
comment
как вы проверяете отставание tweepy? я не совсем уверен, как проверить этот процесс. - person A-nak Wannapaschaiyong; 30.01.2021

Я просто делюсь своим опытом, основываясь на результатах следующего пользователя Криса Кукмана. Выполнив его совет, та же проблема, что и у меня, исчезла. Но в моем случае я использовал его с discord.py. Итак, я создал универсальный список (status_list), и всякий раз, когда запускается tweepy on_status, он добавляется к этому универсальному списку.

Затем я настроил @ tasks.loop (seconds = 10) с помощью discord.py, чтобы отслеживать, не является ли status_list не пустым каждые несколько секунд, а затем, если он обнаруживает, что у него есть контент, он просматривает его, а затем запускает процесс в каждом списке.

person Aeiddius    schedule 25.03.2021