В дополнение к ответу Шона интеграция служебной шины Azure с AEG позволяет создать некоторые возможности сторожевого таймера для объектов ASB. Обратите внимание, что эта интеграция не похожа на то, что делается для учетной записи большого двоичного объекта хранилища, где события публикуются каждый раз, когда большой двоичный объект создается / удаляется. Другими словами, ASB не будет публиковать событие для каждого сообщения, поступившего в объект ASB, события публикуются как сторожевой таймер объекта.
Этот вид сторожевого таймера сущности использует следующую логику:
- Событие не публикуется, если в объекте нет сообщения.
- Событие публикуется немедленно, когда первое сообщение поступает в сущность и в сущности нет активного слушателя в течение более 360 секунд.
- Событие публикуется каждые 120 секунд, когда прослушиватель еще не активен и в сущности есть хотя бы одно сообщение.
- Событие публикуется через 360 секунд простоя (неактивного) прослушивателя, но в сущности все еще есть хотя бы одно сообщение. Например, если у нас есть 5 сообщений в объекте, и подписчик будет запрашивать только одно сообщение с помощью REST Api, следующее событие будет опубликовано через 360 секунд. Другими словами, сторожевой таймер позволяет удерживать слушателя в режиме ожидания в течение 360 секунд.
Основываясь на описанном выше поведении «сторожевого таймера», эта функция выглядит более подходящей для обмена сообщениями с медленным трафиком, например для пробуждения и мониторинга слушателей на объектах ASB.
Обратите внимание, что время простоя в 360 секунд для прослушивателя можно избежать, используя политику короткого времени повтора на уровне подписки, поэтому абонент может быть вызван снова 3 раза в течение 5 минут повтора.
В целях тестирования ниже приведен фрагмент кода функции EventGridTrigger для подписчика на события ASB.
#r "..\\bin\\Microsoft.Azure.ServiceBus.dll"
#r "Newtonsoft.Json"
using System;
using System.Threading.Tasks;
using System.Text;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Web;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.ServiceBus.Primitives;
// sasToken cache
static SasTokenHelper helper = new SasTokenHelper(Environment.GetEnvironmentVariable("AzureServiceBusConnectionString"));
public static async Task Run(JObject eventGridEvent, ILogger log)
{
log.LogInformation(eventGridEvent.ToString());
// from the eventgrid payload
var requestUri = $"{eventGridEvent["data"]?["requestUri"]?.Value<string>()}?api-version=2015-01";
using (var client = new HttpClient())
{
client.DefaultRequestHeaders.Add("Authorization", helper.GetSasToken());
do
{
// read & delete the message
var response = await client.DeleteAsync(requestUri);
// check for message
if (response.StatusCode != HttpStatusCode.OK)
{
log.LogWarning($">>> No message <<<");
break;
}
// message body
string jsontext = await response.Content.ReadAsStringAsync();
// show the message
log.LogInformation($"\nHeaders:\n\t{string.Join("\n\t", response.Headers.Select(i => $"{i.Key}={i.Value.First()}"))}\nBody:\n\t{jsontext}");
} while (true);
}
await Task.CompletedTask;
}
// helpers
class SasTokenHelper
{
DateTime expiringSaS;
uint sasTTLInMinutes = 10;
string sasToken = string.Empty;
(string hostname, string keyname, string key) config;
public SasTokenHelper(string connectionString)
{
config = GetPartsFromConnectionString(connectionString);
GetSasToken();
}
public string GetSasToken()
{
lock (sasToken)
{
if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
{
this.sasToken = GetSASToken(config.hostname, config.key, config.keyname, sasTTLInMinutes);
expiringSaS = DateTime.UtcNow.AddMinutes(sasTTLInMinutes);
}
return sasToken;
}
}
internal (string hostname, string keyname, string key) GetPartsFromConnectionString(string connectionString)
{
var parts = connectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Split(new[] { '=' }, 2)).ToDictionary(x => x[0].Trim(), x => x[1].Trim());
return (parts["Endpoint"] ?? "", parts["SharedAccessKeyName"] ?? "", parts["SharedAccessKey"] ?? "");
}
internal string GetSASToken(string resourceUri, string key, string keyName = null, uint minutes = 10)
{
var tp = SharedAccessSignatureTokenProvider.CreateSharedAccessSignatureTokenProvider(keyName, key, TimeSpan.FromMinutes(minutes));
return tp.GetTokenAsync(resourceUri, TimeSpan.FromSeconds(60)).Result.TokenValue;
}
}
person
Roman Kiss
schedule
05.03.2019