Как быстро отправить сообщения в хранилище очереди Azure с помощью Python?

Я пытаюсь отправить большое количество сообщений (десятки миллионов) в azure с помощью библиотеки python azure.storage.queue, однако на это уходит очень много времени. Код, который я использую, приведен ниже:

from azure.storage.queue import (
    QueueClient,
    BinaryBase64EncodePolicy,
    BinaryBase64DecodePolicy
)

messages = [example list of messages]
connectionString = "example connection string"
queueName = "example-queue-name"

queueClient = QueueClient.from_connection_string(connectionString, queueName)
for message in messages:
    queueClient.send_message(message)

В настоящее время для отправки около 70 000 сообщений требуется около 3 часов, что значительно слишком медленно, учитывая потенциальное количество сообщений, которые необходимо отправить.

Я просмотрел документацию, чтобы попытаться найти пакетный вариант, но похоже, что его нет: https://docs.microsoft.com/en-us/python/api/azure-storage-queue/azure.storage.queue.queueclient?view=azure-python

Я также поинтересовался, есть ли у кого-нибудь опыт использования библиотеки asynchio для ускорения этого процесса и мог бы предложить, как ее использовать?


person petgeo    schedule 10.11.2020    source источник
comment
Как дела? Мой пост полезен?   -  person Stanley Gong    schedule 12.11.2020


Ответы (1)


Попробуй это:

from azure.storage.queue import (
    QueueClient,
    BinaryBase64EncodePolicy,
    BinaryBase64DecodePolicy
)
from concurrent.futures import ProcessPoolExecutor
import time

messages = []

messagesP1 = messages[:len(messages)//2] 
messagesP2 = messages[len(messages)//2:] 

print(len(messagesP1))
print(len(messagesP2))

connectionString = "<conn str>"
queueName = "<queue name>"

queueClient = QueueClient.from_connection_string(connectionString, queueName)

def pushThread(messages):
   for message in messages:
       queueClient.send_message(message)



def callback_function(future):
    print('Callback with the following result', future.result())

tic = time.perf_counter()

def main():
    with ProcessPoolExecutor(max_workers=2) as executor:
        future = executor.submit(pushThread, messagesP1)
        future.add_done_callback(callback_function)
        future2 = executor.submit(pushThread, messagesP2)
        while True:
            if(future.running()):
                print("Task 1 running")
            if(future2.running()):
                print("Task 2 running")

            if(future.done() and future2.done()):
                print(future.result(), future2.result())
                break

if __name__ == '__main__':
    main()


toc = time.perf_counter()
    
print(f"spent {toc - tic:0.4f} seconds")

Как видите, я разделил массив сообщений на 2 части и использую 2 задачи для одновременного помещения данных в очередь. Согласно моему тесту, у меня около 800 сообщений, и я трачу 94 секунды на отправку всех сообщений: введите описание изображения здесь

Но, используя способ, описанный выше, у меня на это уходит 48 секунд:

введите описание изображения здесь

person Stanley Gong    schedule 11.11.2020
comment
Привет, Стэнли, большое спасибо за это, это похоже на то, что я искал. Я был очень занят, но планирую попробовать реализовать это сегодня или завтра, а потом вернусь к вам с отзывами! - person petgeo; 12.11.2020
comment
@petgeo, привет, как дела? Мой пост полезен? - person Stanley Gong; 18.11.2020
comment
Привет, Стэнли, я попытался реализовать это решение, но когда оно запустилось, оно не удалось, сказав, что нет никаких результатов на будущее. Затем я попытался запустить его без функции обратного вызова, и он работал без ошибок, но, к сожалению, на самом деле никаких сообщений не было. Мне пришлось вернуться к старой версии, но я могу перезапустить и получить для вас дополнительную информацию, если это будет полезно - person petgeo; 19.11.2020
comment
@petgeo, как дела? Я считаю, что вы решили свою проблему :) - person Stanley Gong; 25.11.2020
comment
@Stanley Gong, у вас есть минутка. У вас есть аналогичная проблема, я буду рад задать еще один вопрос, если вам нужно. - person wwnde; 08.01.2021
comment
@wwnde, конечно, просто дайте мне знать ссылку на вопрос - person Stanley Gong; 08.01.2021
comment
Еще не опубликовано, буду рад опубликовать, если хотите помочь. У меня есть триггер очереди, когда я вхожу в очередь и вручную ввожу сообщение в Azure, он срабатывает. Однако, если я использую таймер для написания сообщения, он не срабатывает и не выполняется, хотя сообщение написано. Я подозреваю, что не применяю encode_policy, как следовало бы. Чем вы можете помочь? - person wwnde; 08.01.2021
comment
@wwnde, обязательно попробую помочь :) - person Stanley Gong; 08.01.2021
comment
@Stanley Gong пригласил вас в чат - person wwnde; 08.01.2021
comment
@Stanley Gong stackoverflow.com/questions/65624035/ - person wwnde; 08.01.2021
comment
@wwnde, мой друг Хьюри занимается этим. Он в этом разбирается, не беспокойтесь. - person Stanley Gong; 08.01.2021