C # Async WebRequests: выполнение действия после выполнения всех запросов

У меня есть это базовое консольное приложение для парсинга на C #, которое асинхронно использует WebRequest для получения html из списка сайтов. Он работает нормально, но как мне настроить триггер, который срабатывает после обработки каждого сайта в списке?

Я потратил пару часов на изучение различных решений в Интернете, включая документы MS, но ни одно из них не дает прямого ответа через код. Я читал об IAsyncResult.AsyncWaitHandle, но понятия не имею, как интегрировать его в свой код. Я просто хотел бы вызвать пользовательскую функцию, когда все потоки завершат обработку или тайм-аут.

Одна уловка заключается в том, что я никогда не знаю заранее, сколько сайтов в моем списке (он определяется пользователем), поэтому мне нужно решение, достаточно надежное, чтобы ждать 5 событий для завершения 100 000 событий.

Спасибо. Рабочий код ниже:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Net;
using System.Threading;


namespace AsyncApp_01
{
    class Program
    {
        static void Main(string[] args)
        {
            ArrayList alSites = new ArrayList();
            alSites.Add("http://www.google.com");
            alSites.Add("http://www.lostspires.com");

            ScanSites(alSites);

            Console.Read();
        }

        private static void ScanSites(ArrayList sites)
        {
            foreach (string uriString in sites)
            {
                WebRequest request = HttpWebRequest.Create(uriString);
                request.Method = "GET";
                object data = new object(); //container for our "Stuff"

                // RequestState is a custom class to pass info to the callback
                RequestState state = new RequestState(request, data, uriString);
                IAsyncResult result = request.BeginGetResponse(new AsyncCallback(UpdateItem), state);


                //Register the timeout callback
                ThreadPool.RegisterWaitForSingleObject(result.AsyncWaitHandle, new WaitOrTimerCallback(ScanTimeoutCallback), state, (30 * 1000), true);

            }
        }


        private static void UpdateItem(IAsyncResult result)
        {
            // grab the custom state object
            RequestState state = (RequestState)result.AsyncState;
            WebRequest request = (WebRequest)state.Request;

            // get the Response
            HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result);
            Stream s = (Stream)response.GetResponseStream();
            StreamReader readStream = new StreamReader(s);

            // dataString will hold the entire contents of the requested page if we need it.
            string dataString = readStream.ReadToEnd();
            response.Close();
            s.Close();
            readStream.Close();

            Console.WriteLine(dataString);
        }


        private static void ScanTimeoutCallback(object state, bool timedOut)
        {
            if (timedOut)
            {
                RequestState reqState = (RequestState)state;
                if (reqState != null)
                {
                    reqState.Request.Abort();
                }
                Console.WriteLine("aborted- timeout");
            }
        } 


        class RequestState
        {
            public WebRequest Request; // holds the request
            public object Data; // store any data in this
            public string SiteUrl; // holds the UrlString to match up results (Database lookup, etc).

            public RequestState(WebRequest request, object data, string siteUrl)
            {
                this.Request = request;
                this.Data = data;
                this.SiteUrl = siteUrl;
            }

        }
    }
}

Бонусные баллы для всех, кто также может сказать мне, как ограничить количество одновременных потоков. Например, если мне нужно обработать 100 сайтов, как мне настроить его так, чтобы одновременно обрабатывались 10 сайтов, но не более. Я не хочу открывать 100 тем.


person liquidgraph    schedule 27.05.2011    source источник
comment
Похоже, я полностью проигнорировал часть об асинхронном чтении потока. Мои извенения. См., Например, MSDN: msdn.microsoft.com /en-us/library/86wf6409%28v=vs.71%29.aspx Это значительно усложняет ситуацию, но вы сможете понять это.   -  person BFree    schedule 01.06.2011
comment
Итак ... Interlocked.Increment (ref _count); еще нужно использовать? Потому что я заметил, что время от времени он дает мне неправильные подсчеты. Кроме того, мне нужно реализовать тайм-аут с ThreadPool.RegisterWaitForSingleObject   -  person liquidgraph    schedule 01.06.2011


Ответы (1)


Вот небольшой пример, который я собрал. Я удалил реализацию WebClient, так как похоже, что вы используете WebRequest. Я также использую ConcurrentBag .Net 4:

public class Scraper
{
    private readonly IEnumerable<string> _sites;
    private readonly ConcurrentBag<string> _data;
    private volatile int _count;
    private readonly int _total;
    public Scraper(IEnumerable<string> sites)
    {
        _sites = sites;
        _data = new ConcurrentBag<string>();
        _total = sites.Count();
    }

    public void Start()
    {
        foreach (var site in _sites)
        {
            ScrapeSite(site);
        }
    }

    private void ScrapeSite(string site)
    {
        var req = WebRequest.Create(site);
        req.BeginGetResponse(AsyncCallback, req);
    }

    private void AsyncCallback(IAsyncResult ar)
    {
        Interlocked.Increment(ref _count);
        var req = ar.AsyncState as WebRequest;

        var result = req.EndGetResponse(ar);
        var reader = new StreamReader(result.GetResponseStream());
        var data = reader.ReadToEnd();
        this.OnSiteScraped(req.RequestUri.AbsoluteUri, data);
        _data.Add(data);
        if (_count == _total)
        {
            OnScrapingComplete();
        }
    }

    private void OnSiteScraped(string site, string data)
    {
        var handler = this.SiteScraped;
        if (handler != null)
        {
            handler(this, new SiteScrapedEventArgs(site, data));
        }
    }

    private void OnScrapingComplete()
    {
        var handler = this.ScrapingComplete;
        if (handler != null)
        {
            handler(this, new ScrapingCompletedEventArgs(_data));
        }
    }

    public event EventHandler<SiteScrapedEventArgs> SiteScraped;
    public event EventHandler<ScrapingCompletedEventArgs> ScrapingComplete;
}

public class SiteScrapedEventArgs : EventArgs
{
    public string Site { get; private set; }
    public string Data { get; private set; }
    public SiteScrapedEventArgs(string site, string data)
    {
        this.Site = site;
        this.Data = data;
    }
}

Хорошо, я создал несколько базовых классов, и это должно помочь. Если этого недостаточно, извините, я просто не могу вам помочь:

 public class RankedPage
    {
        public int Rank { get; set; }
        public string Site { get; set; }
    }

    public class WebRequestData
    {
        public WebRequest WebRequest { get; set; }
        public RankedPage Page { get; set; }
    }

    public class Scraper
    {
        private readonly IEnumerable<RankedPage> _sites;
        private readonly ConcurrentBag<KeyValuePair<RankedPage,string>> _data;
        private volatile int _count;
        private readonly int _total;
        public Scraper(IEnumerable<RankedPage> sites)
        {
            _sites = sites;
            _data = new ConcurrentBag<KeyValuePair<RankedPage, string>>();
            _total = sites.Count();
        }

        public void Start()
        {
            foreach (var site in _sites)
            {
                ScrapeSite(site);
            }
        }

        private void ScrapeSite(RankedPage site)
        {
            var req = WebRequest.Create(site.Site);
            req.BeginGetResponse(AsyncCallback, new WebRequestData{ Page = site, WebRequest = req});
        }

        private void AsyncCallback(IAsyncResult ar)
        {
            Interlocked.Increment(ref _count);
            var webRequestData = ar.AsyncState as WebRequestData;

            var req = webRequestData.WebRequest;
            var result = req.EndGetResponse(ar);
            var reader = new StreamReader(result.GetResponseStream());
            var data = reader.ReadToEnd();
            this.OnSiteScraped(webRequestData.Page, data);
            _data.Add(new KeyValuePair<RankedPage, string>(webRequestData.Page,data));
            if (_count == _total)
            {
                OnScrapingComplete();
            }
        }

        private void OnSiteScraped(RankedPage page, string data)
        {
            var handler = this.SiteScraped;
            if (handler != null)
            {
                handler(this, new SiteScrapedEventArgs(page, data));
            }
        }

        private void OnScrapingComplete()
        {
            var handler = this.ScrapingComplete;
            if (handler != null)
            {
                handler(this, new ScrapingCompletedEventArgs(_data));
            }
        }

        public event EventHandler<SiteScrapedEventArgs> SiteScraped;
        public event EventHandler<ScrapingCompletedEventArgs> ScrapingComplete;
    }

    public class SiteScrapedEventArgs : EventArgs
    {
        public RankedPage Site { get; private set; }
        public string Data { get; private set; }
        public SiteScrapedEventArgs(RankedPage site, string data)
        {
            this.Site = site;
            this.Data = data;
        }
    }

    public class ScrapingCompletedEventArgs : EventArgs
    {
        public IEnumerable<KeyValuePair<RankedPage,string >> SiteData { get; private set; }
        public ScrapingCompletedEventArgs(IEnumerable<KeyValuePair<RankedPage, string>> siteData)
        {
            this.SiteData = siteData;
        }
    }
person BFree    schedule 27.05.2011
comment
@BFree: Я благодарен за помощь, но ваш код полностью отличается от моего даже во втором случае. Я полагаюсь на статические функции во всем своем коде, я не знаком с ConcurrentBag и не понимаю, как ваш код обрабатывает тайм-ауты. Я пытался использовать ваш код, но чувствую себя еще более потерянным. - person liquidgraph; 27.05.2011
comment
Я стараюсь. На данный момент я не понимаю, как преобразовать мой ArrayList URL-адресов в IEnumerable ‹string›. - person liquidgraph; 27.05.2011
comment
Пытаюсь выполнить приведение, но не работает: Scraper myScraper = new Scraper ((IEnumerable ‹string›) alSites); - person liquidgraph; 27.05.2011
comment
Приведение приведено в действие, поэтому приложение просматривает список URL-адресов, но конечная цель все еще ускользает от меня: ничего не срабатывает, когда все потоки завершены. Я поместил Console.WriteLine в функцию OnScrapingComplete, но она никогда не срабатывает. - person liquidgraph; 27.05.2011
comment
@ user77282: Извините, я неправильно понял ваш вопрос, я думал, вы хотите, чтобы событие сработало, когда все будет готово. Я только что обновил свой код, чтобы запускать событие, когда каждое из них будет выполнено. Я также удалил версию WebClient, поскольку, похоже, вы используете WebReqeust. - person BFree; 27.05.2011
comment
Хорошо, я получил срабатывание триггера по завершении всех потоков (это то, что я хотел). Теперь проблема в том, что мне нужно получить доступ к URL-адресу сайта, который использовался для каждого конкретного WebRequest внутри AsyncCallback, поэтому я могу объединить URL-адреса с их html и другими данными. Есть ли способ получить сайт внутри AsyncCallback? - person liquidgraph; 27.05.2011
comment
@ user772282: См. мое последнее изменение. Теперь каждый раз, когда сайт завершается, он запускает событие, которое передает как сайт, так и HTML-код в событии. Вы также можете обновить событие, которое запускается, когда все будет готово, чтобы передать словарь site = ›html. Если вы не знаете, как это сделать, дайте мне знать, и я обновлю ... - person BFree; 27.05.2011
comment
Мне нужно передавать данные при каждой очистке: я начинаю со списка URL-адресов и соответствующих им значений PageRank. После очистки каждого URL-адреса html я хотел бы вывести его в listArray, который содержит URL-адрес, html и PageRank каждого сайта в табличном формате. Я не знаю, как снова синхронизировать значение PageRank. То есть мне как-то нужен доступ к PageRank в OnSiteScraped (). - person liquidgraph; 27.05.2011
comment
@ user772282: Второй параметр BeginGetResponse - это объект. Когда я вызываю его из ScrapeSite, я просто использую его для передачи самого WebRequest. Что вы можете сделать, так это создать собственный класс, который объединяет объект WebRequest, URL, PageRank и т. Д. Затем, в обратном вызове, приведите ar.AsyncState к этому типу объекта, и у вас есть все, что вам нужно. - person BFree; 27.05.2011
comment
Звучит здорово ... и к тому же сложно. Не могли бы вы показать пример? Я думаю, что это последнее, что мне нужно, чтобы все работало правильно. - person liquidgraph; 27.05.2011
comment
Понятия не имею, откуда вы берете данные PageRank и все такое. Я вижу только то, что вы передаете список строк. Я не могу показать вам пример, не зная об этом. Более того, то, что я обрисовал в общих чертах, совсем не сложно, и если вы действительно не можете это понять, то, возможно, вам сначала нужно обратиться к некоторым учебным пособиям по объектно-ориентированному программированию. Не пытаюсь быть придурком, но на самом деле это не так уж и сложно. - person BFree; 27.05.2011
comment
Мне не нужно полное решение, просто не понимаю, как передать строку в этот настраиваемый класс и объединить ее с WebRequest. Честно говоря, сегодня я потратил 10 часов, исследуя каждый уголок Интернета, чтобы найти свое оригинальное решение, но его не удалось. Ваше решение кажется хорошим, но я не знаю, как передать переменную через WebRequest. У каждого URL просто есть переменная PageRank, связанная с ним на начальном этапе. Представьте, что в начале есть два списка ArrayList, Sites и PR, так что Sites [0] = google.com и PR [0] = 10 и т. Д. - person liquidgraph; 27.05.2011
comment
Приходите один мужчина, используйте свое воображение. Вместо того, чтобы передавать IEnumerable ‹string› в Scraper, передайте какую-то структуру данных, которая содержит все данные, которые вам нужны для каждого сайта. - person BFree; 27.05.2011
comment
Это не недостаток воображения, это недостаток близкого знакомства с методами C # Async Response. Чтобы понять это дерьмо, нужны недели, когда голова упирается в стену. - person liquidgraph; 27.05.2011