unit KolSyncObjs; // // purpose: Synchronization primitives unit for KOL // author: © 2002, 2004 Thaddy de Koning // Portions © Julian M Bucknall, 2002 // Remarks: Freely adapted from VCL SyncObject unit. // I Added some useful extra objects and properties. // You also need my KOL translation of Julian Bucknall's // Charq.pas (KolCharQ.pas) // // The Spinlock and the threadlogger objects are NT/XP only! // // Note: - by Julian - this unit is released as freeware. In other words, // you are free to use this unit in your own applications, however // I retain all copyright to the code. JMB} {$H+,X+} interface uses Windows, Messages, Kol; type TActiveThreadRecord = record ThreadID: Integer; RecursionCount: Integer; end; TActiveThreadArray = array of TActiveThreadRecord; // Based on the old - lightweight - version from Delphi 4 // and code by Julian Bucknall from the Delphi Magazine // // TdK. PMultiReadExclusiveWriteSynchronizer =^TMultireadExclusiveWriteSynchronizer; TMultiReadExclusiveWriteSynchronizer = object(Tobj) private FLock: TRTLCriticalSection; FReadExit: THandle; FCount: Integer; FSaveReadCount: Integer; FActiveThreads: TActiveThreadArray; FWriteRequestorID: Integer; FReallocFlag: Integer; FWriting: Boolean; function WriterIsOnlyReader: Boolean; public destructor Destroy; virtual; procedure BeginRead; procedure EndRead; procedure BeginWrite; procedure EndWrite; end; PSecurityAttributes = pointer; PByteArray =^TByteArray; TByteArray=array [0..0] of Byte; PSynchroObject =^TSynchroObject; TSynchroObject = object(TObj) public procedure Acquire; virtual;abstract; procedure Release; virtual;abstract; end; PHandleObject=^THandleObject; THandleObject = object(TSynchroObject) private FHandle: THandle; FLastError: Integer; public destructor Destroy; virtual; property LastError: Integer read FLastError; property Handle: THandle read FHandle; end; TWaitResult = (wrSignaled, wrTimeout, wrAbandoned, wrError); PEvent=^TEvent; TEvent = object(THandleObject) public function WaitFor(Timeout: LongWord): TWaitResult; procedure SetEvent; procedure ResetEvent; procedure PulseEvent; end; PCriticalSection =^TCriticalSection; TCriticalSection = object(TSynchroObject) protected FSection: TRTLCriticalSection; public destructor Destroy; virtual; procedure Acquire; virtual; procedure Release; virtual; end; // After Julian Bucknall, // High performance Critical section replacement // NT/XP only PSpinlock=^TSpinLock; TSpinLock = object(TSynchroObject) private FLock : pointer; protected procedure init;virtual; public procedure Acquire;virtual; procedure Release;virtual; end; // Thread safe logger object. NT/XP only PThreadLogger=^TThreadLogger; TThreadLogger = object(Tobj) private FBuffer : PByteArray; FBufSize : integer; FBufPos : integer; protected public destructor Destroy; virtual; procedure LogString(const aStr : string); procedure GetLog(aList : PStrList); end; function NewSimpleEvent:PEvent; Function NewEvent(EventAttributes: PSecurityAttributes; ManualReset, InitialState: Boolean; const Name: string):PEvent; function NewCriticalSection:PCriticalSection; function NewSpinLock:PSpinLock; function NewThreadLogger(aSize:integer = 0):PThreadLogger; function NewMultiReadExclusiveWriteSynchronizer:PMultiReadExclusiveWriteSynchronizer; var CPUCount : integer; implementation uses kolcharq; { THandleObject } destructor THandleObject.Destroy; begin CloseHandle(FHandle); inherited Destroy; end; { TEvent } Function NewEvent(EventAttributes: PSecurityAttributes; ManualReset, InitialState: Boolean; const Name: string):PEvent; begin New(Result,Create); Result.FHandle := CreateEvent(EventAttributes, ManualReset, InitialState, PChar(Name)); end; function TEvent.WaitFor(Timeout: LongWord): TWaitResult; begin case WaitForSingleObject(Handle, Timeout) of WAIT_ABANDONED: Result := wrAbandoned; WAIT_OBJECT_0: Result := wrSignaled; WAIT_TIMEOUT: Result := wrTimeout; WAIT_FAILED: begin Result := wrError; FLastError := GetLastError; end; else Result := wrError; end; end; procedure TEvent.SetEvent; begin Windows.SetEvent(Handle); end; procedure TEvent.ResetEvent; begin Windows.ResetEvent(Handle); end; procedure TEvent.PulseEvent; begin Windows.PulseEvent(Handle); end; { TSimpleEvent } function NewSimpleEvent:PEvent; begin Result:=NewEvent(nil, True, False, ''); end; { TCriticalSection } function NewCriticalSection:PCriticalSection; begin New(Result,Create); InitializeCriticalSection(Result.FSection); end; destructor TCriticalSection.Destroy; begin DeleteCriticalSection(FSection); inherited Destroy; end; procedure TCriticalSection.Acquire; begin EnterCriticalSection(FSection); end; procedure TCriticalSection.Release; begin LeaveCriticalSection(FSection); end; { TSpinLock } const cUnlocked = nil; cLocked = pointer(1); {Helper routine} function GetCPUCount : integer; var ProcessMask : DWORD; SystemMask : DWORD; begin {Note: this routine calculates the number of CPUs available to the process, not necessarily on the system} Result := 1; if GetProcessAffinityMask(GetCurrentProcess, ProcessMask, SystemMask) then begin while (ProcessMask <> 0) do begin if Odd(ProcessMask) then inc(Result); ProcessMask := ProcessMask shr 1; end; dec(Result); end; end; function NewSpinlock:PSpinLock; begin New(Result,Create); end; procedure TSpinLock.Acquire; begin {wait for the lock var to become unlocked; while it isn't, loop; when it does set it locked} while (InterlockedCompareExchange(FLock, cLocked, cUnlocked) = cLocked) do if (CPUCount = 1) then Sleep(0); // release timeslice end; procedure TSpinLock.init; begin inherited; Flock:=cUnLocked; end; procedure TSpinLock.Release; begin {just set the lock var to unlocked; there's no thread safety problem} FLock := cUnlocked; end; { TThreadLogger } function NewThreadLogger(aSize:integer = 0):PThreadLogger; begin if (aSize <= 0) then aSize := 1024 * 1024; New(Result,Create); with Result^ do begin FBUffer := AllocMem(aSize); FBufPos := 0; FBufSize := aSize; end; end; destructor TThreadLogger.Destroy; begin if (FBuffer <> nil) then FreeMem(FBuffer); inherited Destroy; end; procedure TThreadLogger.GetLog(aList: PStrList); var BufPos : integer; StartBufPos : integer; CharQ : PCharQueue; Ch : char; begin {clear the list} aList.Clear; {remember the start of the buffer} StartBufPos := FBufPos mod FBufSize; {skip past null bytes that may exist at the beginning} BufPos := StartBufPos; if (FBuffer[BufPos] = 0) then begin BufPos := succ(BufPos) mod FBufSize; while (FBuffer[BufPos] = 0) and (BufPos <> StartBufPos) do BufPos := succ(BufPos) mod FBufSize; if (BufPos = StartBufPos) then Exit; end; {create strings and add them to the list until we reach the start position again} CharQ := NewCharQueue; try repeat Ch := char(FBuffer[BufPos]); if (Ch <> #0) then CharQ.Append(Ch) else begin aList.Add(CharQ.AsString); CharQ.Clear; end; BufPos := succ(BufPos) mod FBufSize; until (BufPos = StartBufPos); if not CharQ.IsEmpty then aList.Add(CharQ.AsString); finally CharQ.Free; end; end; procedure TThreadLogger.LogString(const aStr: string); var S : string; StrPos : integer; BufPos : integer; RemBufSize : integer; begin {Add the thread id to the string for logging purposes} S := Format('%.8x %s', [GetCurrentThreadID, aStr]); {reserve space for the string (plus null terminator) in the log, get logical position} StrPos := InterlockedExchangeAdd(FBufPos, length(S) + 1); {calculate physical position} BufPos := StrPos mod FBufSize; {if there is enough room to perform a single string copy, copy the string plus the null terminator} RemBufSize := FBufSize - BufPos; if (RemBufSize > length(S)) then Move(S[1], FBuffer^[BufPos], length(S) + 1) {otherwise, copy the string in two parts, the second part to include the terminating null} else begin Move(S[1], FBuffer^[BufPos], RemBufSize); Move(S[RemBufSize + 1], FBuffer[0], length(S) - RemBufSize + 1); end; end; {TMultiReadExclusiveWriteSynchronizer} function NewMultiReadExclusiveWriteSynchronizer:PMultiReadExclusiveWriteSynchronizer; begin New(Result,Create); with Result^ do begin InitializeCriticalSection(FLock); FReadExit := CreateEvent(nil, True, True, nil); // manual reset, start signaled SetLength(FActiveThreads, 4); end; end; destructor TMultiReadExclusiveWriteSynchronizer.Destroy; begin BeginWrite; inherited Destroy; CloseHandle(FReadExit); DeleteCriticalSection(FLock); end; function TMultiReadExclusiveWriteSynchronizer.WriterIsOnlyReader: Boolean; var I, Len: Integer; begin Result := False; if FWriteRequestorID = 0 then Exit; // We know a writer is waiting for entry with the FLock locked, // so FActiveThreads is stable - no BeginRead could be resizing it now I := 0; Len := High(FActiveThreads); while (I < Len) and ((FActiveThreads[I].ThreadID = 0) or (FActiveThreads[I].ThreadID = FWriteRequestorID)) do Inc(I); Result := I >= Len; end; procedure TMultiReadExclusiveWriteSynchronizer.BeginWrite; begin EnterCriticalSection(FLock); // Block new read or write ops from starting if not FWriting then begin FWriteRequestorID := GetCurrentThreadID; // Indicate that writer is waiting for entry if not WriterIsOnlyReader then // See if any other thread is reading WaitForSingleObject(FReadExit, INFINITE); // Wait for current readers to finish FSaveReadCount := FCount; // record prior read recursions for this thread FCount := 0; FWriteRequestorID := 0; FWriting := True; end; Inc(FCount); // allow read recursions during write without signalling FReadExit event end; procedure TMultiReadExclusiveWriteSynchronizer.EndWrite; begin Dec(FCount); if FCount = 0 then begin FCount := FSaveReadCount; // restore read recursion count FSaveReadCount := 0; FWriting := False; end; LeaveCriticalSection(FLock); end; procedure TMultiReadExclusiveWriteSynchronizer.BeginRead; var I: Integer; ThreadID: Integer; ZeroSlot: Integer; begin EnterCriticalSection(FLock); try if not FWriting then begin // This will call ResetEvent more than necessary on win95, but still work if InterlockedIncrement(FCount) = 1 then ResetEvent(FReadExit); // Make writer wait until all readers are finished. I := 0; // scan for empty slot in activethreads list ThreadID := GetCurrentThreadID; ZeroSlot := -1; while (I < High(FActiveThreads)) and (FActiveThreads[I].ThreadID <> ThreadID) do begin if (FActiveThreads[I].ThreadID = 0) and (ZeroSlot < 0) then ZeroSlot := I; Inc(I); end; if I >= High(FActiveThreads) then // didn't find our threadid slot begin if ZeroSlot < 0 then // no slots available. Grow array to make room begin // spin loop. wait for EndRead to put zero back into FReallocFlag while InterlockedExchange(FReallocFlag, ThreadID) <> 0 do Sleep(0); try SetLength(FActiveThreads, High(FActiveThreads) + 3); finally FReallocFlag := 0; end; end else // use an empty slot I := ZeroSlot; // no concurrency issue here. We're the only thread interested in this record. FActiveThreads[I].ThreadID := ThreadID; FActiveThreads[I].RecursionCount := 1; end else // found our threadid slot. Inc(FActiveThreads[I].RecursionCount); // thread safe = unique to threadid end; finally LeaveCriticalSection(FLock); end; end; procedure TMultiReadExclusiveWriteSynchronizer.EndRead; var I, ThreadID, Len: Integer; begin if not FWriting then begin // Remove our threadid from the list of active threads I := 0; ThreadID := GetCurrentThreadID; // wait for BeginRead to finish any pending realloc of FActiveThreads while InterlockedExchange(FReallocFlag, ThreadID) <> 0 do Sleep(0); try Len := High(FActiveThreads); while (I < Len) and (FActiveThreads[I].ThreadID <> ThreadID) do Inc(I); assert(I < Len); // no concurrency issues here. We're the only thread interested in this record. Dec(FActiveThreads[I].RecursionCount); // threadsafe = unique to threadid if FActiveThreads[I].RecursionCount = 0 then FActiveThreads[I].ThreadID := 0; // must do this last! finally FReallocFlag := 0; end; if (InterlockedDecrement(FCount) = 0) or WriterIsOnlyReader then SetEvent(FReadExit); // release next writer end; end; initialization CPUCount := GetCPUCount; end.