EventGridTrigger для темы служебной шины Azure

Я создал функцию Azure на основе триггера EventGrid. Этот триггер срабатывает всякий раз, когда новое сообщение приходит в тему служебной шины. Ниже представлен сгенерированный шаблон функции

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public static void Run(JObject eventGridEvent, TraceWriter log)
{
    log.Info(eventGridEvent.ToString(Formatting.Indented));
}

Мое требование от функции Azure - обрабатывать данные и хранить их в ADLS. Теперь, как мне проанализировать / десериализовать данные из типа JObject. Мне нужно нормализовать данные в этой функции, прежде чем сохранять их в хранилище озера данных. Мне нужно перезаписать функцию ?.

Пожалуйста, предоставьте некоторую информацию / ссылку, чтобы удовлетворить это требование


person user1941025    schedule 04.03.2019    source источник


Ответы (2)


Служебная шина (Premium) отправляет события для двух сценариев:

  1. ActiveMessagesWithNoListenersAvailable
  2. DeadletterMessagesAvailable

Первое событие будет отправлено, когда есть сообщения, связанные с конкретным объектом, и не существует активных слушателей. Сущность будет указана в полезной нагрузке вместе с другой необходимой информацией для доступа к ней (например, пространство имен или тема подписки, от которой нужно получать). Схема определена в документация.

Вторая схема событий аналогична первой и генерируется для очередей с недействительными буквами.

Теперь, как мне проанализировать / десериализовать данные из типа JObject. Мне нужно нормализовать данные в этой функции, прежде чем сохранять их в хранилище озера данных. Мне нужно перезаписать функцию ?.

eventGridEvent JSON сам по себе не отправляет вам сообщения служебной шины Azure. Вам нужно будет знать, как исходные сообщения были сериализованы в первую очередь, то есть что использовала сторона отправителя. Эта десериализация должна быть осуществлена ​​в функции, а затем в коде для записи озера данных объекта.

person Sean Feldman    schedule 04.03.2019

В дополнение к ответу Шона интеграция служебной шины Azure с AEG позволяет создать некоторые возможности сторожевого таймера для объектов ASB. Обратите внимание, что эта интеграция не похожа на то, что делается для учетной записи большого двоичного объекта хранилища, где события публикуются каждый раз, когда большой двоичный объект создается / удаляется. Другими словами, ASB не будет публиковать событие для каждого сообщения, поступившего в объект ASB, события публикуются как сторожевой таймер объекта.

Этот вид сторожевого таймера сущности использует следующую логику:

  1. Событие не публикуется, если в объекте нет сообщения.
  2. Событие публикуется немедленно, когда первое сообщение поступает в сущность и в сущности нет активного слушателя в течение более 360 секунд.
  3. Событие публикуется каждые 120 секунд, когда прослушиватель еще не активен и в сущности есть хотя бы одно сообщение.
  4. Событие публикуется через 360 секунд простоя (неактивного) прослушивателя, но в сущности все еще есть хотя бы одно сообщение. Например, если у нас есть 5 сообщений в объекте, и подписчик будет запрашивать только одно сообщение с помощью REST Api, следующее событие будет опубликовано через 360 секунд. Другими словами, сторожевой таймер позволяет удерживать слушателя в режиме ожидания в течение 360 секунд.

Основываясь на описанном выше поведении «сторожевого таймера», эта функция выглядит более подходящей для обмена сообщениями с медленным трафиком, например для пробуждения и мониторинга слушателей на объектах ASB.

Обратите внимание, что время простоя в 360 секунд для прослушивателя можно избежать, используя политику короткого времени повтора на уровне подписки, поэтому абонент может быть вызван снова 3 раза в течение 5 минут повтора.

В целях тестирования ниже приведен фрагмент кода функции EventGridTrigger для подписчика на события ASB.

#r "..\\bin\\Microsoft.Azure.ServiceBus.dll"
#r "Newtonsoft.Json"

using System;
using System.Threading.Tasks;
using System.Text;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Web;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.ServiceBus.Primitives;



// sasToken cache
static SasTokenHelper helper = new SasTokenHelper(Environment.GetEnvironmentVariable("AzureServiceBusConnectionString"));

public static async Task Run(JObject eventGridEvent, ILogger log)
{
    log.LogInformation(eventGridEvent.ToString());

    // from the eventgrid payload
    var requestUri = $"{eventGridEvent["data"]?["requestUri"]?.Value<string>()}?api-version=2015-01";

    using (var client = new HttpClient())
    {

        client.DefaultRequestHeaders.Add("Authorization", helper.GetSasToken());

        do
        {
            // read & delete the message 
            var response = await client.DeleteAsync(requestUri);

            // check for message
            if (response.StatusCode != HttpStatusCode.OK)
            {
                log.LogWarning($">>> No message <<<");
                break;
            }

            // message body
            string jsontext = await response.Content.ReadAsStringAsync();

            // show the message
            log.LogInformation($"\nHeaders:\n\t{string.Join("\n\t", response.Headers.Select(i => $"{i.Key}={i.Value.First()}"))}\nBody:\n\t{jsontext}");
        } while (true);

    }

    await Task.CompletedTask;
}




// helpers
class SasTokenHelper
{
    DateTime expiringSaS;
    uint sasTTLInMinutes = 10;
    string sasToken = string.Empty;
    (string hostname, string keyname, string key) config;

    public SasTokenHelper(string connectionString)
    {
        config = GetPartsFromConnectionString(connectionString);
        GetSasToken();
    }

    public string GetSasToken()
    {
        lock (sasToken)
        {
            if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
            {
                this.sasToken = GetSASToken(config.hostname, config.key, config.keyname, sasTTLInMinutes);
                expiringSaS = DateTime.UtcNow.AddMinutes(sasTTLInMinutes);
            }
            return sasToken;
        }
    }

    internal (string hostname, string keyname, string key) GetPartsFromConnectionString(string connectionString)
    {
        var parts = connectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Split(new[] { '=' }, 2)).ToDictionary(x => x[0].Trim(), x => x[1].Trim());
        return (parts["Endpoint"] ?? "", parts["SharedAccessKeyName"] ?? "", parts["SharedAccessKey"] ?? "");
    }

    internal string GetSASToken(string resourceUri, string key, string keyName = null, uint minutes = 10)
    {
        var tp = SharedAccessSignatureTokenProvider.CreateSharedAccessSignatureTokenProvider(keyName, key, TimeSpan.FromMinutes(minutes));
        return tp.GetTokenAsync(resourceUri, TimeSpan.FromSeconds(60)).Result.TokenValue;
    }
}
person Roman Kiss    schedule 05.03.2019