Ограничьте нет. запросов в секунду, сгенерированных методом Dns.BeginGetHostEntry ИЛИ использовать параллельную библиотеку задач (TPL)

Я использовал метод Dns.BeginGetHostEntry, чтобы получить полное доменное имя для хостов на основе имени хоста (список имен хостов хранится в базе данных SQL-сервера). Этот метод (асинхронный) завершает выполнение менее чем за 30 минут для почти 150 тыс. записей и обновляет полное доменное имя в той же таблице SQL, где хранится имя хоста.

Это решение работает слишком быстро (превышая порог в 300 запросов в секунду). Поскольку разрешенного нет. количество запросов на генерацию сервера ограничено, мой сервер числится в топе болтунов и просит остановить запуск этого приложения. Мне нужно перестроить это приложение для синхронной работы, что теперь занимает более 6 часов.

//// TotalRecords are fetched from SQL database with the Hostname (referred as host further)
for (int i = 0; i < TotalRecords.Rows.Count; i++)
{
    try
    {
        host = TotalRecords.Rows[i].ItemArray[0].ToString();
        Interlocked.Increment(ref requestCounter);
        string[] arr = new string[] { i.ToString(), host }; 
        Dns.BeginGetHostEntry(host, GetHostEntryCallback,arr);
    }
    catch (Exception ex)
    {
        log.Error("Unknown error occurred\n ", ex);
    }
}
do
{
    Thread.Sleep(0);

} while (requestCounter>0);

ListAdapter.Update(ВсегоЗаписей);

Вопросы:

  1. Есть ли способ ограничить количество запросов, генерируемых этим методом, в секунду?

  2. Насколько я понимаю, ParallelOptions.MaxDegreeOfParallelism не контролирует количество потоков в секунду, так что может ли TPL быть лучшим вариантом? Можно ли ограничиться нет. запросов в секунду?


person swati gupta    schedule 22.05.2017    source источник
comment
Рассматривали ли вы очередь, которая настроена на обработку элемента только каждые x, поэтому максимум она работает 300 в секунду (в конце концов, 300 в секунду - это довольно много)   -  person BugFinder    schedule 22.05.2017
comment
Возможный дубликат Простой способ ограничения скорости запросов HttpClient   -  person bradgonesurfing    schedule 22.05.2017
comment
Да, SemaphoreSlim и таймер могут решить эту проблему.   -  person Sir Rufo    schedule 22.05.2017
comment
Пожалуйста, не добавляйте больше несвязанных вопросов к исходному вопросу. Stackoverflow лучше всего работает с конкретными вопросами. Создайте отдельные вопросы об ограничении скорости/оптимизации SQL/IPV6 и IPV4.   -  person bradgonesurfing    schedule 22.05.2017
comment
@bradgonesurfing Так как впервые столкнулся с переполнением стека, создал новые вопросы и сохранил исходный пост как есть с целевыми запросами. спасибо за ответы, обязательно попробую!   -  person swati gupta    schedule 22.05.2017
comment
Добро пожаловать в переполнение стека. Я надеюсь, что вы найдете это полезным. Также, пожалуйста, отметьте ответы, которые являются полезными или принятыми.   -  person bradgonesurfing    schedule 22.05.2017


Ответы (3)


Чисто асинхронное решение.

Он использует один пакет nuget Nite.AsyncEx и System.Reactive. Он выполняет обработку ошибок и предоставляет результаты DNS по мере их появления в виде IObservable<IPHostEntry>

Здесь многое происходит. Вам нужно будет понимать реактивные расширения. как стандартное асинхронное программирование. Вероятно, есть много способов добиться приведенного ниже результата, но это интересное решение.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Linq;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Threading;

#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed

public static class EnumerableExtensions
{
    public static IEnumerable<Func<U>> Defer<T, U>
        ( this IEnumerable<T> source, Func<T, U> selector) 
        => source.Select(s => (Func<U>)(() => selector(s)));
}


public class Program
{
    /// <summary>
    /// Returns the time to wait before processing another item
    /// if the rate limit is to be maintained
    /// </summary>
    /// <param name="desiredRateLimit"></param>
    /// <param name="currentItemCount"></param>
    /// <param name="elapsedTotalSeconds"></param>
    /// <returns></returns>
    private static double Delay(double desiredRateLimit, int currentItemCount, double elapsedTotalSeconds)
    {
        var time = elapsedTotalSeconds;
        var timeout = currentItemCount / desiredRateLimit;
        return timeout - time;
    }

    /// <summary>
    /// Consume the tasks in parallel but with a rate limit. The results
    /// are returned as an observable.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="tasks"></param>
    /// <param name="rateLimit"></param>
    /// <returns></returns>
    public static IObservable<T> RateLimit<T>(IEnumerable<Func<Task<T>>> tasks, double rateLimit){
        var s = System.Diagnostics.Stopwatch.StartNew();
        var n = 0;
        var sem = new  AsyncCountdownEvent(1);

        var errors = new ConcurrentBag<Exception>();

        return Observable.Create<T>
            ( observer =>
            {

                var ctx = new CancellationTokenSource();
                Task.Run
                    ( async () =>
                    {
                        foreach (var taskFn in tasks)
                        {
                            n++;
                            ctx.Token.ThrowIfCancellationRequested();

                            var elapsedTotalSeconds = s.Elapsed.TotalSeconds;
                            var delay = Delay( rateLimit, n, elapsedTotalSeconds );
                            if (delay > 0)
                                await Task.Delay( TimeSpan.FromSeconds( delay ), ctx.Token );

                            sem.AddCount( 1 );
                            Task.Run
                                ( async () =>
                                {
                                    try
                                    {
                                        observer.OnNext( await taskFn() );
                                    }
                                    catch (Exception e)
                                    {
                                        errors.Add( e );
                                    }
                                    finally
                                    {
                                        sem.Signal();
                                    }
                                }
                                , ctx.Token );
                        }
                        sem.Signal();
                        await sem.WaitAsync( ctx.Token );
                        if(errors.Count>0)
                            observer.OnError(new AggregateException(errors));
                        else
                            observer.OnCompleted();
                    }
                      , ctx.Token );

                return Disposable.Create( () => ctx.Cancel() );
            } );
    }

    #region hosts



    public static string [] Hosts = new [] { "google.com" }

    #endregion


    public static void Main()
    {
        var s = System.Diagnostics.Stopwatch.StartNew();

        var rate = 25;

        var n = Hosts.Length;

        var expectedTime = n/rate;

        IEnumerable<Func<Task<IPHostEntry>>> dnsTaskFactories = Hosts.Defer( async host =>
        {
            try
            {
                return await Dns.GetHostEntryAsync( host );
            }
            catch (Exception e)
            {
                throw new Exception($"Can't resolve {host}", e);
            }
        } );

        IObservable<IPHostEntry> results = RateLimit( dnsTaskFactories, rate );

        results
            .Subscribe( result =>
            {
                Console.WriteLine( "result " + DateTime.Now + " " + result.AddressList[0].ToString() );
            },
            onCompleted: () =>
            {
                Console.WriteLine( "Completed" );

                PrintTimes( s, expectedTime );
            },
            onError: e =>
            {
                Console.WriteLine( "Errored" );

                PrintTimes( s, expectedTime );

                if (e is AggregateException ae)
                {
                    Console.WriteLine( e.Message );
                    foreach (var innerE in ae.InnerExceptions)
                    {
                        Console.WriteLine( $"     " + innerE.GetType().Name + " " + innerE.Message );
                    }
                }
                else
                {
                        Console.WriteLine( $"got error " + e.Message );
                }
            }

            );

        Console.WriteLine("Press enter to exit");
        Console.ReadLine();
    }

    private static void PrintTimes(Stopwatch s, int expectedTime)
    {
        Console.WriteLine( "Done" );
        Console.WriteLine( "Elapsed Seconds " + s.Elapsed.TotalSeconds );
        Console.WriteLine( "Expected Elapsed Seconds " + expectedTime );
    }
}

Последние несколько строк вывода

result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 157.7.105.52
result 5/23/2017 3:23:36 PM 223.223.182.225
result 5/23/2017 3:23:36 PM 64.34.93.5
result 5/23/2017 3:23:36 PM 212.83.211.103
result 5/23/2017 3:23:36 PM 205.185.216.10
result 5/23/2017 3:23:36 PM 198.232.125.32
result 5/23/2017 3:23:36 PM 66.231.176.100
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:37 PM 219.84.203.116
Errored
Done
Elapsed Seconds 19.9990118
Expected Elapsed Seconds 19
One or more errors occurred.
     Exception Can't resolve adv758968.ru
     Exception Can't resolve fr.a3dfp.net
     Exception Can't resolve ads.adwitserver.com
     Exception Can't resolve www.adtrader.com
     Exception Can't resolve trak-analytics.blic.rs
     Exception Can't resolve ads.buzzcity.net

Я не смог вставить полный код, поэтому вот ссылка на код со списком хостов.

https://gist.github.com/bradphelan/084e4b1ce2604bbdf858d948699cc190

person bradgonesurfing    schedule 22.05.2017
comment
Спасибо за отзыв. Есть ли возможное решение без включения пакета Nuget? Я добавил еще несколько вопросов к исходному сообщению, которые кажутся узкими местами в моем приложении. Пожалуйста, просмотрите и помогите мне с более полным решением. - person swati gupta; 22.05.2017
comment
Что не так с включением пакета nuget? Включенный фреймворк CountDownEvent не имеет WaitAsync. Возможно, оплошность. Если вы не включите его, вам придется создать свой собственный счетчик из SemaphoreSlim и других неприятных вещей. - person bradgonesurfing; 22.05.2017
comment
(1 и 2) уже ответил мой ответ. (3) Задайте еще один вопрос об этом в stackoverflow. Это лишь вскользь связано с проблемой ограничения скорости. (4) Легко (Количество успешных запросов/время с момента запуска) Распечатайте его на консоли, зарегистрируйте что угодно. (5) Задайте другой вопрос о стеке. Не связано с проблемой ограничения скорости - person bradgonesurfing; 22.05.2017
comment
Исходный код CountDownEvent находится здесь если вам хочется перекодировать его без пакета nuget. - person bradgonesurfing; 22.05.2017
comment
bradgonesurfing. Не могли бы вы дать описание потока этого кода? При использовании метода Dns.GetHostEntryAsync, где именно он должен вызываться, поскольку в этом методе не требуется обратный вызов, а полное доменное имя может быть напрямую назначено набору данных. Небольшое описание всех задержек и асинхронного метода может помочь понять программу в целом. Заранее спасибо ! - person swati gupta; 23.05.2017
comment
При поиске хоста может быть возможность найти некоторые исключения. Где можно добавить обработку исключений при использовании Dns.GetHostEntryAsync внутри (Func‹Task›)(async() =›..) - person swati gupta; 23.05.2017
comment
Вы должны были собрать их сами. - person bradgonesurfing; 23.05.2017
comment
Заменить перечислить. Диапазон с вашим списком имен хостов для поиска. Замените Task.Delay вызовом gethostasync. Собрать все результаты — еще одна задача для вас :) - person bradgonesurfing; 23.05.2017
comment
Это твой счастливый день. Я обновил решение полным примером обработки реальных DNS-запросов. Код находится здесь. В коде, вставленном в ответ выше, отсутствует длинный список хостов для тестирования. Пожалуйста, отметьте ответ как принятый или проголосуйте за него, если он полезен. Вы, вероятно, не поймете это полностью с первого взгляда. Загрузите его в отладчик, пройдитесь по нему и прочитайте ссылки о реактивных расширениях и асинхронном ожидании. - person bradgonesurfing; 23.05.2017
comment
Мне это кажется полезным, но с таким сложным кодом это трудно понять тому, кто раньше не работал с TPL, асинхронностью или потоками. Я хотел бы понять, прежде чем что-либо внедрять. Мое требование довольно простое: специальное консольное приложение (даже окно консоли скрыто для этого решения, поэтому вывод не требуется), использующий GetHostEntry/GetHostEntryAsync с ограниченным номером. запросов в секунду. Я до сих пор не понимаю необходимости использования подписки/отсрочки и т. д. Я все еще трачу время на понимание, но любое более простое решение может быть действительно полезным. - person swati gupta; 24.05.2017
comment
Вы хотите делать сложные вещи, такие как многопоточность, ограничение скорости и параллельные операции, вам потребуется время, чтобы изучить материал. - person bradgonesurfing; 24.05.2017
comment
Лучшее, что я могу предложить, это заставить код, который я представил вам, работать так, как я его дал, и начать играть с ним в отладчике. Для каждого бита, который вы не понимаете, ищите документацию / google и читайте / читайте / читайте - person bradgonesurfing; 24.05.2017
comment
Я обнаружил ошибку в определении этого класса: статический класс EnumerableExtensions for Invalid token '=›' - person swati gupta; 24.05.2017

Используйте SemaphoreSlim с Timer, чтобы ограничить количество запросов за период.

[DebuggerDisplay( "Current Count = {_semaphore.CurrentCount}" )]
public class TimedSemaphoreSlim : IDisposable
{
    private readonly System.Threading.SemaphoreSlim _semaphore;
    private readonly System.Threading.Timer _timer;
    private int _releaseCount;

    public TimedSemaphoreSlim( int initialcount, TimeSpan period )
    {
        _semaphore = new System.Threading.SemaphoreSlim( initialcount );
        _timer = new System.Threading.Timer( OnTimer, this, period, period );
    }

    public TimedSemaphoreSlim( int initialCount, int maxCount, TimeSpan period )
    {
        _semaphore = new SemaphoreSlim( initialCount, maxCount );
        _timer = new Timer( OnTimer, this, period, period );
    }

    private void OnTimer( object state )
    {
        var releaseCount = Interlocked.Exchange( ref _releaseCount, 0 );
        if ( releaseCount > 0 )
            _semaphore.Release( releaseCount );
    }

    public WaitHandle AvailableWaitHandle => _semaphore.AvailableWaitHandle;
    public int CurrentCount => _semaphore.CurrentCount;

    public void Release()
    {
        Interlocked.Increment( ref _releaseCount );
    }

    public void Release( int releaseCount )
    {
        Interlocked.Add( ref _releaseCount, releaseCount );
    }

    public void Wait()
    {
        _semaphore.Wait();
    }

    public void Wait( CancellationToken cancellationToken )
    {
        _semaphore.Wait( cancellationToken );
    }

    public bool Wait( int millisecondsTimeout )
    {
        return _semaphore.Wait( millisecondsTimeout );
    }

    public bool Wait( int millisecondsTimeout, CancellationToken cancellationToken )
    {
        return _semaphore.Wait( millisecondsTimeout, cancellationToken );
    }

    public bool Wait( TimeSpan timeout, CancellationToken cancellationToken )
    {
        return _semaphore.Wait( timeout, cancellationToken );
    }

    public Task WaitAsync()
    {
        return _semaphore.WaitAsync();
    }

    public Task WaitAsync( CancellationToken cancellationToken )
    {
        return _semaphore.WaitAsync( cancellationToken );
    }

    public Task<bool> WaitAsync( int millisecondsTimeout )
    {
        return _semaphore.WaitAsync( millisecondsTimeout );
    }

    public Task<bool> WaitAsync( TimeSpan timeout )
    {
        return _semaphore.WaitAsync( timeout );
    }

    public Task<bool> WaitAsync( int millisecondsTimeout, CancellationToken cancellationToken )
    {
        return _semaphore.WaitAsync( millisecondsTimeout, cancellationToken );
    }

    public Task<bool> WaitAsync( TimeSpan timeout, CancellationToken cancellationToken )
    {
        return _semaphore.WaitAsync( timeout, cancellationToken );
    }

    #region IDisposable Support
    private bool disposedValue = false; // Dient zur Erkennung redundanter Aufrufe.

    private void CheckDisposed()
    {
        if ( disposedValue )
        {
            throw new ObjectDisposedException( nameof( TimedSemaphoreSlim ) );
        }
    }

    protected virtual void Dispose( bool disposing )
    {
        if ( !disposedValue )
        {
            if ( disposing )
            {
                _timer.Dispose();
                _semaphore.Dispose();
            }

            disposedValue = true;
        }
    }

    public void Dispose()
    {
        Dispose( true );
    }
    #endregion
}

Пример использования

IEnumerable<string> bunchOfHosts = GetBunchOfHosts();
IList<IPHostEntry> result;

using ( var limiter = new TimedSemaphoreSlim( 300, 300, TimeSpan.FromSeconds( 1 ) ) )
{
    result = bunchOfHosts.AsParallel()
        .Select( e =>
        {
            limiter.Wait();
            try
            {
                return Dns.GetHostEntry( e );
            }
            finally
            {
                limiter.Release();
            }
        } )
        .ToList();
}
person Sir Rufo    schedule 22.05.2017
comment
Это не очень хорошо работает, когда вы заменяете Console.WriteLine чем-то вроде DnsRequest, выполнение которого занимает много времени, и если вы хотите поддерживать определенную скорость, вам нужно запускать запросы параллельно. - person bradgonesurfing; 22.05.2017
comment
@bradgonesurfing Я тоже думал об этом и расширил этот класс, чтобы он вел себя как SemaphoreSlim (Wait(); Act(); Release();), но реальный выпуск выполняется внутренним таймером. - person Sir Rufo; 22.05.2017

Думали ли вы когда-нибудь об использовании библиотеки TPL Dataflow? В нем есть очень удобный способ ограничения одновременных операций одного типа. Также у него есть возможность ограничить весь конвейер, ограничив размер буфера. .

В основном все, что вам нужно создать, это конвейер с:

  • запись BufferBlock, которая будет использоваться для хранения всех ваших TotalRecords элементов
  • TransformBlock, который будет принимать host и возвращать результаты для него.
  • BatchBlock, который будет собирать результаты и создайте пакет результатов
  • ActionBlock, которые, наконец, обновляют вашу базу данных результатами.

Итак, ваш код может быть таким:

// buffer limited to 30 items in queue
// all other items would be postponed and added to queue automatically
// order in queue is preserved
var hosts = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = 30 });

// get a host and perform a dns search operation
var handler = new TransformBlock<string, IPHostEntry>(host => Dns.GetHostEntry(host),
  // no more than 5 simultaneous requests at a time
  new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });

// gather results in an array of size 500 
var batchBlock = new BatchBlock<IPHostEntry>(500);

// get the resulting array and save it to database
var batchSave = new ActionBlock<IPHostEntry[]>(r => GetHostEntryCallback(r));

// link all the blocks to automatically propagate items along the pipeline
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
hosts.LinkTo(handler, linkOptions);
handler.LinkTo(batchBlock, linkOptions);
batchBlock.LinkTo(batchSave, linkOptions);

// provide the data to pipeline
for (var i = 0; i < TotalRecords.Rows.Count; ++i)
{
    var host = TotalRecords.Rows[i].ItemArray[0].ToString();
    // async wait for item to be sent to pipeline
    // will throttle starting with 31th item in a buffer queue
    await hosts.SendAsync(host);
}

// pipeline is complete now, just wait it finishes
hosts.Complete();

// wait for the last block to finish it's execution
await batchSave.Completion;

// notify user that update is over

Я рекомендую вам прочитать весь How-to раздел на MSDN, чтобы лучше понять понимание того, что вы можете сделать с этой библиотекой, возможно, продолжите чтение с помощью официальная документация.

Кстати, вы можете использовать класс SqlBulkCopy для обновления базы данных, если он будет соответствовать вашим требованиям, обычно это быстрее, чем регулярное обновление с SqlDataAdapter.

person VMAtm    schedule 22.05.2017
comment
Требование состоит в том, чтобы регулировать запросы на основе времени (в секунду), максимальная степень параллелизма здесь не поможет. - person swati gupta; 24.05.2017
comment
Вы можете реализовать простой блок для управления пропускной способностью, но внутри TPL нет ничего, что бы предоставляло вам такую ​​функциональность из коробки. - person VMAtm; 24.05.2017