Вот статья Delphi3000, в которой показано, как использовать IOCP для создания пула потоков. Я не автор этого кода, но информация об авторе есть в исходниках.
Я повторно публикую комментарии и код здесь:
Теперь все должны понимать, что такое поток, принципы работы потоков и так далее. Для тех, кто в этом нуждается, простая функция потока состоит в том, чтобы отделить обработку от одного потока к другому, чтобы обеспечить одновременное и параллельное выполнение. Основной принцип потоков так же прост: выделенная память, на которую ссылаются потоки, должна быть упорядочена для обеспечения безопасности доступа. Есть ряд других принципов, но это действительно тот, о котором нужно заботиться.
И на..
Потокобезопасная очередь позволит нескольким потокам безопасно добавлять и удалять, помещать и извлекать значения в очередь и из нее в порядке поступления. С эффективной и хорошо написанной очередью вы можете получить очень полезный компонент при разработке многопоточных приложений, от помощи в безопасном логировании потоков до асинхронной обработки запросов.
Пул потоков — это просто поток или несколько потоков, которые чаще всего используются для управления очередью запросов. Например, веб-сервер, который будет иметь непрерывную очередь запросов, которые необходимо обработать, использует пулы потоков для управления http-запросами, или сервер COM+ или DCOM использует пул потоков для обработки запросов rpc. Это сделано для того, чтобы обработка одного запроса на другой оказывала меньшее влияние, скажем, если вы выполнили 3 запроса синхронно, а выполнение первого запроса заняло 1 минуту, то вторые два запроса не выполнялись бы в течение как минимум 1 минуты, добавляя сверху времени на обработку, а для большинства клиентов это неприемлемо.
Итак, как это сделать..
Начнем с очереди!!
Delphi предоставляет объект TQueue, который доступен, но, к сожалению, не является потокобезопасным и не слишком эффективным, но люди должны посмотреть файл Contnrs.pas, чтобы увидеть, как borland записывает туда стеки и очереди. Для очереди необходимы только две основные функции: добавить и удалить/протолкнуть и вытолкнуть. Add/push добавит значение, указатель или объект в конец очереди. А remove/pop удалит и вернет первое значение в очереди.
Вы можете извлечь из объекта TQueue и переопределить защищенные методы и добавить в критические разделы, это поможет вам в некоторой степени, но я бы хотел, чтобы моя очередь ждала, пока новые запросы не появятся в очереди, и поместите поток в состояние отдых, пока он ждет новых запросов. Это можно сделать, добавив мьютексы или сигнализируя о событиях, но есть более простой способ. Windows API предоставляет очередь завершения ввода-вывода, которая предоставляет нам потокобезопасный доступ к очереди и состояние покоя при ожидании нового запроса в очереди.
Реализация пула потоков
Пул потоков будет очень простым и будет управлять x желаемым количеством потоков и передавать каждый запрос очереди на событие, предназначенное для обработки. Редко возникает необходимость в реализации класса TThread, а ваша логика должна быть реализована и инкапсулирована в событии выполнения класса, поэтому можно создать простой класс TSimpleThread, который будет выполнять любой метод в любом объекте в контексте другого потока. Как только люди поймут это, все, что вам нужно будет сделать, это выделить память.
Вот как это реализовано.
Реализация TThreadQueue и TThreadPool
(* Implemented for Delphi3000.com Articles, 11/01/2004
Chris Baldwin
Director & Chief Architect
Alive Technology Limited
http://www.alivetechnology.com
*)
unit ThreadUtilities;
uses Windows, SysUtils, Classes;
type
EThreadStackFinalized = class(Exception);
TSimpleThread = class;
// Thread Safe Pointer Queue
TThreadQueue = class
private
FFinalized: Boolean;
FIOQueue: THandle;
public
constructor Create;
destructor Destroy; override;
procedure Finalize;
procedure Push(Data: Pointer);
function Pop(var Data: Pointer): Boolean;
property Finalized: Boolean read FFinalized;
end;
TThreadExecuteEvent = procedure (Thread: TThread) of object;
TSimpleThread = class(TThread)
private
FExecuteEvent: TThreadExecuteEvent;
protected
procedure Execute(); override;
public
constructor Create(CreateSuspended: Boolean; ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean);
end;
TThreadPoolEvent = procedure (Data: Pointer; AThread: TThread) of Object;
TThreadPool = class(TObject)
private
FThreads: TList;
FThreadQueue: TThreadQueue;
FHandlePoolEvent: TThreadPoolEvent;
procedure DoHandleThreadExecute(Thread: TThread);
public
constructor Create( HandlePoolEvent: TThreadPoolEvent; MaxThreads: Integer = 1); virtual;
destructor Destroy; override;
procedure Add(const Data: Pointer);
end;
implementation
{ TThreadQueue }
constructor TThreadQueue.Create;
begin
//-- Create IO Completion Queue
FIOQueue := CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
FFinalized := False;
end;
destructor TThreadQueue.Destroy;
begin
//-- Destroy Completion Queue
if (FIOQueue <> 0) then
CloseHandle(FIOQueue);
inherited;
end;
procedure TThreadQueue.Finalize;
begin
//-- Post a finialize pointer on to the queue
PostQueuedCompletionStatus(FIOQueue, 0, 0, Pointer($FFFFFFFF));
FFinalized := True;
end;
(* Pop will return false if the queue is completed *)
function TThreadQueue.Pop(var Data: Pointer): Boolean;
var
A: Cardinal;
OL: POverLapped;
begin
Result := True;
if (not FFinalized) then
//-- Remove/Pop the first pointer from the queue or wait
GetQueuedCompletionStatus(FIOQueue, A, Cardinal(Data), OL, INFINITE);
//-- Check if we have finalized the queue for completion
if FFinalized or (OL = Pointer($FFFFFFFF)) then begin
Data := nil;
Result := False;
Finalize;
end;
end;
procedure TThreadQueue.Push(Data: Pointer);
begin
if FFinalized then
Raise EThreadStackFinalized.Create('Stack is finalized');
//-- Add/Push a pointer on to the end of the queue
PostQueuedCompletionStatus(FIOQueue, 0, Cardinal(Data), nil);
end;
{ TSimpleThread }
constructor TSimpleThread.Create(CreateSuspended: Boolean;
ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean);
begin
FreeOnTerminate := AFreeOnTerminate;
FExecuteEvent := ExecuteEvent;
inherited Create(CreateSuspended);
end;
procedure TSimpleThread.Execute;
begin
if Assigned(FExecuteEvent) then
FExecuteEvent(Self);
end;
{ TThreadPool }
procedure TThreadPool.Add(const Data: Pointer);
begin
FThreadQueue.Push(Data);
end;
constructor TThreadPool.Create(HandlePoolEvent: TThreadPoolEvent;
MaxThreads: Integer);
begin
FHandlePoolEvent := HandlePoolEvent;
FThreadQueue := TThreadQueue.Create;
FThreads := TList.Create;
while FThreads.Count < MaxThreads do
FThreads.Add(TSimpleThread.Create(False, DoHandleThreadExecute, False));
end;
destructor TThreadPool.Destroy;
var
t: Integer;
begin
FThreadQueue.Finalize;
for t := 0 to FThreads.Count-1 do
TThread(FThreads[t]).Terminate;
while (FThreads.Count > 0) do begin
TThread(FThreads[0]).WaitFor;
TThread(FThreads[0]).Free;
FThreads.Delete(0);
end;
FThreadQueue.Free;
FThreads.Free;
inherited;
end;
procedure TThreadPool.DoHandleThreadExecute(Thread: TThread);
var
Data: Pointer;
begin
while FThreadQueue.Pop(Data) and (not TSimpleThread(Thread).Terminated) do begin
try
FHandlePoolEvent(Data, Thread);
except
end;
end;
end;
end.
Как вы можете видеть, это довольно просто, и с этим вы можете очень легко реализовать любую очередь запросов по потокам, и действительно любой тип требований, требующий потоковой передачи, может быть выполнен с использованием этого объекта и сэкономить вам много времени и усилий.
Вы можете использовать это для постановки запросов из одного потока в несколько потоков или для очереди запросов из нескольких потоков в один поток, что делает это довольно хорошим решением.
Вот несколько примеров использования этих объектов.
Потокобезопасное ведение журнала
Разрешить нескольким потокам асинхронно записывать в файл журнала.
uses Windows, ThreadUtilities,...;
type
PLogRequest = ^TLogRequest;
TLogRequest = record
LogText: String;
end;
TThreadFileLog = class(TObject)
private
FFileName: String;
FThreadPool: TThreadPool;
procedure HandleLogRequest(Data: Pointer; AThread: TThread);
public
constructor Create(const FileName: string);
destructor Destroy; override;
procedure Log(const LogText: string);
end;
implementation
(* Simple reuse of a logtofile function for example *)
procedure LogToFile(const FileName, LogString: String);
var
F: TextFile;
begin
AssignFile(F, FileName);
if not FileExists(FileName) then
Rewrite(F)
else
Append(F);
try
Writeln(F, DateTimeToStr(Now) + ': ' + LogString);
finally
CloseFile(F);
end;
end;
constructor TThreadFileLog.Create(const FileName: string);
begin
FFileName := FileName;
//-- Pool of one thread to handle queue of logs
FThreadPool := TThreadPool.Create(HandleLogRequest, 1);
end;
destructor TThreadFileLog.Destroy;
begin
FThreadPool.Free;
inherited;
end;
procedure TThreadFileLog.HandleLogRequest(Data: Pointer; AThread: TThread);
var
Request: PLogRequest;
begin
Request := Data;
try
LogToFile(FFileName, Request^.LogText);
finally
Dispose(Request);
end;
end;
procedure TThreadFileLog.Log(const LogText: string);
var
Request: PLogRequest;
begin
New(Request);
Request^.LogText := LogText;
FThreadPool.Add(Request);
end;
Поскольку это запись в файл, он будет обрабатывать все запросы до одного потока, но вы можете делать расширенные уведомления по электронной почте с большим количеством потоков или, что еще лучше, обрабатывать профилирование с тем, что происходит или шаги в вашей программе, которые я продемонстрирую. в другой статье, так как эта уже довольно длинная.
А пока я оставлю вас с этим, наслаждайтесь.. Оставьте комментарий, если есть что-то, с чем люди застряли.
Крис
person
Mick
schedule
03.02.2011