Как очистить буферы исходящих сообщений на Сервере?

Я написал службу, используя PollingDuplexHttpBinding, у которой есть клиент Silverllight, который ее использует. Моя служба в основном имеет заданное количество клиентов, отправляющих свои данные (довольно часто, каждую секунду, и данные довольно большие, каждый вызов составляет около 5 КБ) в службу, а также прослушивает новые данные, отправленные другими клиентами в службу для направляться к ним, что очень похоже на архитектуру чата.

Проблема, которую я замечаю, заключается в том, что когда клиенты подключаются к службе через Интернет, через несколько минут ответ службы становится медленным, а ответы становятся запаздывающими. Я пришел к выводу, что когда достигается пропускная способность хостов службы (скорость загрузки через Интернет, на сервере только около 15 КБ / с), сообщения, отправленные другими клиентами, буферизуются и обрабатываются соответствующим образом, когда есть доступная пропускная способность. Мне интересно, как именно я могу ограничить размер этого буфера, который служба использует для хранения полученных сообщений от клиентов? Не так важно, чтобы мои клиенты получали все данные, а скорее, чтобы они получали последние данные, отправленные другими, поэтому подключение в реальном времени — это то, что я ищу за счет гарантированной доставки.

Короче говоря, я хочу иметь возможность очищать свою очередь/буфер в службе всякий раз, когда она заполняется или достигается определенный предел, и снова начинать заполнять ее полученными вызовами, чтобы избавиться от задержки. Как мне это сделать? Является ли свойство MaxBufferSize тем, что мне нужно уменьшить на стороне службы, а также на стороне клиента? Или мне нужно закодировать эту функциональность в моем сервисе? Любые идеи?

Спасибо.

ИЗМЕНИТЬ:

Вот моя сервисная архитектура:

//the service
[ServiceContract(Namespace = "", CallbackContract = typeof(INewsNotification))]
[AspNetCompatibilityRequirements(RequirementsMode = AspNetCompatibilityRequirementsMode.Allowed)]
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Multiple, InstanceContextMode = InstanceContextMode.Single)]
public class NewsService
{


private static Dictionary<IChatNotification, string> clients = new Dictionary<IChatNotification, string>();
private ReaderWriterLockSlim subscribersLock = new ReaderWriterLockSlim();

[OperationContract(IsOneWay = true)]
public void PublishNotifications(byte[] data)
{
            try
            {
                subscribersLock.EnterReadLock();
                List<INewsNotification> removeList = new List<INewsNotification>();
                lock (clients)
                {
                    foreach (var subscriber in clients)
                    {
                        if (OperationContext.Current.GetCallbackChannel<IChatNotification>() == subscriber.Key)
                        {
                            continue;
                        }
                        try
                        {
                            subscriber.Key.BeginOnNotificationSend(data, GetCurrentUser(), onNotifyCompletedNotificationSend, subscriber.Key);

                        }
                        catch (CommunicationObjectAbortedException)
                        {
                            removeList.Add(subscriber.Key);
                        }
                        catch (CommunicationException)
                        {
                            removeList.Add(subscriber.Key);
                        }
                        catch (ObjectDisposedException)
                        {
                            removeList.Add(subscriber.Key);
                        }

                    }
                }

                foreach (var item in removeList)
                {
                    clients.Remove(item);
                }
            }
            finally
            {
                subscribersLock.ExitReadLock();
            }
        }

}


//the callback contract
[ServiceContract]
public interface INewsNotification
{
       [OperationContract(IsOneWay = true, AsyncPattern = true)]
       IAsyncResult BeginOnNotificationSend(byte[] data, string username, AsyncCallback callback, object asyncState);
       void EndOnNotificationSend(IAsyncResult result);
}

Конфиг службы:

  <system.serviceModel>
    <extensions>
      <bindingExtensions>
        <add name="pollingDuplex" type="System.ServiceModel.Configuration.PollingDuplexHttpBindingCollectionElement, System.ServiceModel.PollingDuplex, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35" />
      </bindingExtensions>
    </extensions>
    <behaviors>
      <serviceBehaviors>
        <behavior name="">

          <serviceMetadata httpGetEnabled="true" />
          <serviceThrottling maxConcurrentSessions="2147483647" />
          <serviceDebug includeExceptionDetailInFaults="true" />
        </behavior>
      </serviceBehaviors>
    </behaviors>
    <bindings>
      <pollingDuplex>

        <binding name="myPollingDuplex" duplexMode="SingleMessagePerPoll" 
                 maxOutputDelay="00:00:00" inactivityTimeout="02:00:00" 
                 serverPollTimeout="00:55:00" sendTimeout="02:00:00"  openTimeout="02:00:00" 
                  maxBufferSize="10000"  maxReceivedMessageSize="10000" maxBufferPoolSize="1000"/>

      </pollingDuplex>
    </bindings>
    <serviceHostingEnvironment aspNetCompatibilityEnabled="true" multipleSiteBindingsEnabled="true" />
    <services>
      <service name="NewsNotificationService.Web.NewsService">
        <endpoint address="" binding="pollingDuplex" bindingConfiguration="myPollingDuplex" contract="NewsNotificationService.Web.NewsService" />
        <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange" />
      </service>
    </services>
  </system.serviceModel>
    <system.webServer>
        <directoryBrowse enabled="true" />
    </system.webServer>
</configuration>

Клиент будет вызывать службу обычно между периодами 500-1000 мс, например:

_client.PublishNotificationAsync(byte[] data);

и обратный вызов уведомит клиента об уведомлениях, отправленных другими клиентами:

void client_NotifyNewsReceived(object sender, NewsServiceProxy.OnNewsSendReceivedEventArgs e)
        {
                e.Usernamer//WHich client published the data
                e.data//contents of the notification
        }

Итак, подведем итог: когда количество клиентов увеличивается, а скорость загрузки узлов службы через Интернет ограничена, сообщения, отправляемые службой подписчикам, где-то буферизуются и обрабатываются в очереди, что и вызывает проблему. Я не знаю, где буферизуются эти сообщения. В локальной сети сервис работает нормально, потому что сервер имеет скорость загрузки, равную его скорости загрузки (для 100 КБ/с входящих вызовов он отправляет 100 КБ/с уведомлений). Где буферизуются эти сообщения? И как мне очистить этот буфер?

Я сделал что-то экспериментальное, чтобы попытаться проверить, буферизуются ли сообщения в службе, я попытался вызвать этот метод на клиенте, но он всегда возвращает 0, даже когда один клиент все еще находится в процессе получения уведомлений, которые кто-то другой отправил 4- 5 минут назад:

[OperationContract(IsOneWay = false)]
public int GetQueuedMessages()
{

            return OperationContext.Current.OutgoingMessageHeaders.Count();
}

person Mohammad Sepahvand    schedule 07.06.2012    source источник
comment
Я думаю, что я могу быть на проблему. Я думаю, что ищу решение неправильной проблемы. У меня есть подозрение, что это проблема с потоками на клиентах. Когда клиент вызывает службу, вызов выполняется в потоке пользовательского интерфейса, что приводит к задержке, и это то, что я ДУМАЮ, является проблемой. Что «очередь» фактически возникает на клиентских компьютерах, и именно эта очередь отправляется в службу. Я работаю над этим сейчас, я дам вам знать. Спасибо всем за ваши усилия.   -  person Mohammad Sepahvand    schedule 16.06.2012


Ответы (2)


Я сделал некоторые расчеты для вашей ситуации.

  • размер сообщения = 100 КБ
  • канал загрузки = 15 КБ/с
  • Канал загрузки = 100 КБ/с
  • Клиент звонит в сервис 1-2 раза в секунду

клиент будет вызывать службу обычно между периодами 500-1000 мс

Это правильно?

Для одного клиента ваш загрузочный трафик только для сообщений будет 100-200 КБ/с и это только тело сообщения. Будет больше с заголовком и многое другое с включенной безопасностью.

Сообщения будут объединены для асинхронного вызова. Итак, если у нас есть 3 клиента, и каждое отправленное сообщение обратного вызова содержит 2 сообщения для каждого клиента. 4 клиента - 3 сообщения в каждом обратном вызове.

Для 3-х клиентов это будет 200-400КБ/с в канале загрузки.

Мне кажется, что сообщения слишком велики для заявленной вами пропускной способности.

Проверьте, можете ли вы:

  1. Уменьшите размер сообщения. Я не знаю характера вашего бизнеса, поэтому не могу дать совет здесь.

  2. Используйте сжатие для сообщений или трафика.

  3. Увеличьте пропускную способность сети. Без этого даже идеальное решение будет иметь очень большую задержку. Вы можете потратить дни и недели на оптимизацию своего кода, и даже если вы правильно использовали свое сетевое решение, оно все равно будет медленным.

Я знаю, это звучит как Капитан Очевидность, но на некоторые вопросы просто нет правильного ответа, если вы не измените вопрос.

После выполнения описанных выше шагов работайте с ServiceThrottlingBehavior в сочетании с пользовательским кодом, который будет управлять очередью обратного вызова.

ServiceThrottlingBehavior отклоняет запросы, если достигнута граница.

http://msdn.microsoft.com/en-us/library/ms735114(v=vs.100).aspx

Это тривиальный образец. Реальные числа должны быть определены специально для вашей среды.

<serviceThrottling 
 maxConcurrentCalls="1" 
 maxConcurrentSessions="1" 
 maxConcurrentInstances="1"
/>

Обновление:

Большая ошибка с моей стороны в вопросе, каждый вызов 5 КБ/с,

Даже с сообщениями размером 5 КБ 15 КБ/с недостаточно. Давайте посчитаем трафик только для 3 пользователей. 3 входящих сообщения в секунду. Теперь система должна использовать дуплекс, чтобы уведомлять пользователей о том, что отправили другие. Каждый пользователь должен получать сообщения от своих партнеров. Всего у нас 3 сообщения. Одно (принадлежащее отправителю) может быть пропущено, поэтому каждый пользователь должен получить 2 сообщения. 3 пользователя получат 10 КБ (5 КБ + 5 КБ = одно сообщение 10 КБ) = 30 КБ. Одно сообщение в секунду составит 30 КБ/сек в канале загрузки для 3 пользователей.

Где буферизуются эти сообщения? И как мне очистить этот буфер?

Это зависит от того, как вы размещаете свой сервис. Если это автономный сервис, он вообще не буферизуется. У вас есть код, который пытается отправить сообщение получателю, но это происходит очень медленно, потому что канал переполнен. В IIS может быть некоторая буферизация, но не рекомендуется трогать ее извне.

Правильное решение - дросселирование. При ограничении пропускной способности не следует пытаться отправлять все сообщения всем клиентам. Вместо этого вы должны, например, ограничить количество потоков, которые отправляют сообщения. Таким образом, из 3 пользователей выше вместо отправки 3 сообщений параллельно вы можете отправлять их последовательно или решить, что только один пользователь получит обновление в этом раунде, а следующий — в следующем.

Таким образом, общая идея состоит в том, чтобы не пытаться отправить все всем, как только у вас есть данные, а отправлять только тот объем данных, который вы можете себе позволить, и контролировать это, используя количество потоков или скорость отклика.

person Dmitry Harnitski    schedule 09.06.2012
comment
Я много играл с этими настройками. Я посмотрю еще раз и сообщу, как дела, спасибо. - person Mohammad Sepahvand; 09.06.2012
comment
+1 Я думаю, что это очень хорошее решение. Если сервер слишком занят, отклоните клиент и позвольте клиенту справиться с этим. - person flayn; 11.06.2012
comment
Я попробовал это, я установил все эти значения на 1, но даже сейчас, через некоторое время (около 5 минут), когда у меня есть 2+ клиента, использующих услугу, звонки начинают отставать и поступать с опозданием. Кроме того, я заметил, что многие люди говорят, что вы должны установить эти значения на максимум, чтобы повысить производительность вашего сервиса. В любом случае, я до сих пор понятия не имею, где эти сообщения ставятся в очередь, может быть, где-то в Интернете. - person Mohammad Sepahvand; 15.06.2012
comment
@ClunkyCoder Пожалуйста, прочитайте мой ответ. Невозможно отправить ›100 КБ/с, если у вас есть канал 15 КБ/с. Это ваша очередь. Это физически невозможно. У вас на дороге огромная пробка, и вы спрашиваете, какая машина должна первой выехать из гаража? Практически это не имеет значения. Все автомобили будут застрять в пробке. - person Dmitry Harnitski; 15.06.2012
comment
@dharnitski, ты прав. Большая ошибка с моей стороны в вопросе, каждый вызов составляет 5 КБ / с, я неправильно измерил 100 КБ / с, я понятия не имею, откуда я это взял: @ Как вы упомянули, невозможно отправить данные даже 1 клиенту даже для несколько секунд, если бы каждый вызов был размером 100 КБ (на сервер потребовалась бы скорость загрузки 100-200 КБ / с вместо 15!) Это также происходит в локальной сети, но менее регулярно. Я только что снова протестировал свой сервис на 3 компьютерах, и через некоторое время сообщения плохо попадают в очередь, но это гораздо менее заметно, чем в Интернете. Я убежден, что это не совсем проблема пропускной способности. - person Mohammad Sepahvand; 16.06.2012
comment
@dharnitski, каждый раз, когда служба получает сообщения от клиента, сервер получает 5 КБ/с, даже когда клиент отправляет 2 сообщения в секунду, поэтому каждое сообщение будет 2,5 КБ/с, 5 КБ/с — это среднее значение, которое я ожидаю услугу получить от 1 клиента. Я на 100% уверен, что это не проблема с пропускной способностью, даже в беспроводной локальной сети (скорость загрузки 10 МБ/с) с 4 клиентами, каждый из которых отправляет 5 КБ/с, поэтому хост службы будет получать 20 КБ/с, а также отправлять такое же количество данных (20 КБ) каждую секунду, я заметил, что происходит отставание. Я искал другие возможные причины, и я думаю, что знаю, в чем проблема - person Mohammad Sepahvand; 16.06.2012
comment
@dharnitski, см. комментарий, который я только что оставил на свой вопрос. - person Mohammad Sepahvand; 16.06.2012

MaxBufferSize не поможет вам с этой проблемой. Вам придется кодировать его самостоятельно, я не знаю ни одного существующего решения/фреймворка. Однако это звучит как интересная проблема. Вы можете начать с поддержки Queue<Message> для каждого подключенного клиента, а при отправке в эту очередь (или когда клиент вызывает удаление из очереди) вы можете переоценить, что Message должно быть отправлено.

ОБНОВЛЕНИЕ: во-первых, я бы забыл о попытках сделать это со стороны клиента, а в конфигурации вам придется кодировать это самостоятельно

Здесь я вижу, куда вы отправляете своим клиентам:

 subscriber.Key.BeginOnNotificationSend(data, GetCurrentUser(), onNotifyCompletedNotificationSend, subscriber.Key);

поэтому вместо того, чтобы асинхронно отправлять эти уведомления каждому клиенту, вы должны отправить их в файл Queue<byte[]>. Каждое клиентское соединение будет иметь свою собственную очередь, и вам, вероятно, следует создать класс, предназначенный для каждого клиентского соединения:

обратите внимание, что этот код не будет компилироваться "из коробки" и, вероятно, содержит некоторые логические ошибки, используйте его только в качестве руководства

public class ClientConnection
{
    private INewsNotification _callback;
    private Queue<byte> _queue = new Queue<byte>();
    private object _lock = new object();
    private bool _isSending
    public ClientConnection(INewsNotification callBack)
    {
           _callback=callback;
    }
    public void Enqueue(byte[] message)
    { 
       lock(_lock)
       {               
           //what happens here depends on what you want to do. 
           //Do you want to only send the latest message? 
           //if yes, you can just clear out the queue and put the new message in there,                         
           //or you could keep the most recent 5 messages.                
           if(_queue.Count > 0)
               _queue.Clear();
          _queue.Enqueue(message);
           if(!_isSending)
               BeginSendMessage();
       }
    }

    private void BeginSendMessage()
    {
       _isSending=true;
        _callback.BeginOnNotificationSend(_queue.Dequeue(), GetCurrentUser(), EndCallbackClient, subscriber.Key);

    }

    private void EndCallbackClient(IAsyncResult ar)
    {
        _callback.EndReceive(ar);
        lock(_lock)
        {
           _isSending=false;
           if(_queue.Count > 0)
              BeginSendMessage();//more messages to send
        }
    }
}

Представьте себе сценарий, в котором клиенту передается одно сообщение, и при отправке еще 9 сообщений вызывается ClientConnection.Enqueue. Когда первое сообщение закончено, он проверяет очередь, которая должна содержать только последнее (9-е сообщение)

person wal    schedule 07.06.2012
comment
Спасибо, я начал с того, что сделал что-то вроде это предложение, написав метод службы для возврата OperationContext.Current.OutgoingMessageProperties.Count(), затем я вызвал это на клиент, и он постоянно возвращал 0, даже когда все еще были 100 сообщений, пересылаемых клиенту из буфера после того, как отправитель прекратил отправку на некоторое время. Есть идеи, что это за свойство на самом деле? - person Mohammad Sepahvand; 09.06.2012
comment
зачем вам это имущество? как вы отправляете сообщения с сервера клиенту (отправьте код) - person wal; 10.06.2012