unit KolSyncObjs;
//
// purpose: Synchronization primitives unit for KOL
//  author: © 2002, 2004 Thaddy de Koning
//          Portions © Julian M Bucknall, 2002
// Remarks: Freely adapted from VCL SyncObject unit.
//          I Added some useful extra objects and properties.
//          You also need my KOL translation of Julian Bucknall's
//          Charq.pas (KolCharQ.pas)
//
//          The Spinlock and the threadlogger objects are NT/XP only!
//
// Note: - by Julian - this unit is released as freeware. In other words,
//         you are free to use this unit in your own applications, however
//         I retain all copyright to the code. JMB}

{$H+,X+}

interface

uses
  Windows,
  Messages,
  Kol;

type
  TActiveThreadRecord = record
    ThreadID: Integer;
    RecursionCount: Integer;
  end;

  TActiveThreadArray = array of TActiveThreadRecord;

  // Based on the old - lightweight - version from Delphi 4
  // and code by Julian Bucknall from the Delphi Magazine
  //
  // TdK.
  PMultiReadExclusiveWriteSynchronizer =^TMultireadExclusiveWriteSynchronizer;
  TMultiReadExclusiveWriteSynchronizer = object(Tobj)
  private
    FLock: TRTLCriticalSection;
    FReadExit: THandle;
    FCount: Integer;
    FSaveReadCount: Integer;
    FActiveThreads: TActiveThreadArray;
    FWriteRequestorID: Integer;
    FReallocFlag: Integer;
    FWriting: Boolean;
    function WriterIsOnlyReader: Boolean;
  public
    destructor Destroy; virtual;
    procedure BeginRead;
    procedure EndRead;
    procedure BeginWrite;
    procedure EndWrite;
  end;

  PSecurityAttributes = pointer;
  PByteArray =^TByteArray;
  TByteArray=array [0..0] of Byte;

  PSynchroObject =^TSynchroObject;
  TSynchroObject = object(TObj)
  public
    procedure Acquire; virtual;abstract;
    procedure Release; virtual;abstract;
  end;

  PHandleObject=^THandleObject;
  THandleObject = object(TSynchroObject)
  private
    FHandle: THandle;
    FLastError: Integer;
  public
    destructor Destroy; virtual;
    property LastError: Integer read FLastError;
    property Handle: THandle read FHandle;
  end;

  TWaitResult = (wrSignaled, wrTimeout, wrAbandoned, wrError);

  PEvent=^TEvent;
  TEvent = object(THandleObject)
  public
    function WaitFor(Timeout: LongWord): TWaitResult;
    procedure SetEvent;
    procedure ResetEvent;
    procedure PulseEvent;
  end;


  PCriticalSection =^TCriticalSection;
  TCriticalSection = object(TSynchroObject)
  protected
    FSection: TRTLCriticalSection;
  public
    destructor Destroy; virtual;
    procedure Acquire; virtual;
    procedure Release; virtual;
  end;

  // After Julian Bucknall,
  // High performance Critical section replacement
  // NT/XP only
  PSpinlock=^TSpinLock;
  TSpinLock = object(TSynchroObject)
  private
    FLock : pointer;
  protected
    procedure init;virtual;
  public
    procedure Acquire;virtual;
    procedure Release;virtual;
  end;

  // Thread safe logger object. NT/XP only
  PThreadLogger=^TThreadLogger;
  TThreadLogger = object(Tobj)
    private
      FBuffer  : PByteArray;
      FBufSize : integer;
      FBufPos  : integer;
    protected
    public
      destructor Destroy; virtual;
      procedure LogString(const aStr : string);
      procedure GetLog(aList : PStrList);
  end;

function NewSimpleEvent:PEvent;
Function NewEvent(EventAttributes: PSecurityAttributes; ManualReset,
  InitialState: Boolean; const Name: string):PEvent;
function NewCriticalSection:PCriticalSection;
function NewSpinLock:PSpinLock;
function NewThreadLogger(aSize:integer = 0):PThreadLogger;
function NewMultiReadExclusiveWriteSynchronizer:PMultiReadExclusiveWriteSynchronizer;

var
  CPUCount : integer;

implementation
uses
 kolcharq;

{ THandleObject }

destructor THandleObject.Destroy;
begin
  CloseHandle(FHandle);
  inherited Destroy;
end;

{ TEvent }

Function NewEvent(EventAttributes: PSecurityAttributes; ManualReset,
  InitialState: Boolean; const Name: string):PEvent;
begin
  New(Result,Create);
  Result.FHandle := CreateEvent(EventAttributes, ManualReset, InitialState, PChar(Name));
end;

function TEvent.WaitFor(Timeout: LongWord): TWaitResult;
begin
  case WaitForSingleObject(Handle, Timeout) of
    WAIT_ABANDONED: Result := wrAbandoned;
    WAIT_OBJECT_0: Result := wrSignaled;
    WAIT_TIMEOUT: Result := wrTimeout;
    WAIT_FAILED:
      begin
        Result := wrError;
        FLastError := GetLastError;
      end;
  else
    Result := wrError;
  end;
end;

procedure TEvent.SetEvent;
begin
  Windows.SetEvent(Handle);
end;

procedure TEvent.ResetEvent;
begin
  Windows.ResetEvent(Handle);
end;

procedure TEvent.PulseEvent;
begin
  Windows.PulseEvent(Handle);
end;

{ TSimpleEvent }

function NewSimpleEvent:PEvent;
begin
  Result:=NewEvent(nil, True, False, '');
end;

{ TCriticalSection }

function NewCriticalSection:PCriticalSection;
begin
  New(Result,Create);
  InitializeCriticalSection(Result.FSection);
end;

destructor TCriticalSection.Destroy;
begin
  DeleteCriticalSection(FSection);
  inherited Destroy;
end;

procedure TCriticalSection.Acquire;
begin
  EnterCriticalSection(FSection);
end;

procedure TCriticalSection.Release;
begin
  LeaveCriticalSection(FSection);
end;

{ TSpinLock }
const
  cUnlocked = nil;
  cLocked = pointer(1);

{Helper routine}
function GetCPUCount : integer;
var
  ProcessMask : DWORD;
  SystemMask  : DWORD;
begin
  {Note: this routine calculates the number of CPUs available to the
         process, not necessarily on the system}
  Result := 1;
  if GetProcessAffinityMask(GetCurrentProcess,
                            ProcessMask, SystemMask) then begin
    while (ProcessMask <> 0) do begin
      if Odd(ProcessMask) then
        inc(Result);
      ProcessMask := ProcessMask shr 1;
    end;
    dec(Result);
  end;
end;

function NewSpinlock:PSpinLock;
begin
  New(Result,Create);
end;

procedure TSpinLock.Acquire;
begin
  {wait for the lock var to become unlocked; while it isn't, loop;
   when it does set it locked}
  while (InterlockedCompareExchange(FLock, cLocked, cUnlocked) = cLocked) do
    if (CPUCount = 1) then
      Sleep(0); // release timeslice
end;

procedure TSpinLock.init;
begin
 inherited;
 Flock:=cUnLocked;
end;

procedure TSpinLock.Release;
begin
  {just set the lock var to unlocked; there's no thread safety
   problem}
  FLock := cUnlocked;
end;

{ TThreadLogger }
function NewThreadLogger(aSize:integer = 0):PThreadLogger;
begin
  if (aSize <= 0) then
    aSize := 1024 * 1024;
  New(Result,Create);
  with Result^ do
  begin
    FBUffer := AllocMem(aSize);
    FBufPos := 0;
    FBufSize := aSize;
  end;
end;

destructor TThreadLogger.Destroy;
begin
  if (FBuffer <> nil) then
    FreeMem(FBuffer);
  inherited Destroy;
end;

procedure TThreadLogger.GetLog(aList: PStrList);
var
  BufPos : integer;
  StartBufPos : integer;
  CharQ : PCharQueue;
  Ch    : char;
begin
  {clear the list}
  aList.Clear;

  {remember the start of the buffer}
  StartBufPos := FBufPos mod FBufSize;

  {skip past null bytes that may exist at the beginning}
  BufPos := StartBufPos;
  if (FBuffer[BufPos] = 0) then begin
    BufPos := succ(BufPos) mod FBufSize;
    while (FBuffer[BufPos] = 0) and (BufPos <> StartBufPos) do
      BufPos := succ(BufPos) mod FBufSize;
    if (BufPos = StartBufPos) then
      Exit;
  end;

  {create strings and add them to the list until we reach the start
   position again}
  CharQ := NewCharQueue;
  try
    repeat
      Ch := char(FBuffer[BufPos]);
      if (Ch <> #0) then
        CharQ.Append(Ch)
      else begin
        aList.Add(CharQ.AsString);
        CharQ.Clear;
      end;
      BufPos := succ(BufPos) mod FBufSize;
    until (BufPos = StartBufPos);
    if not CharQ.IsEmpty then
      aList.Add(CharQ.AsString);
  finally
    CharQ.Free;
  end;
end;

procedure TThreadLogger.LogString(const aStr: string);
var
  S : string;
  StrPos : integer;
  BufPos : integer;
  RemBufSize : integer;
begin
  {Add the thread id to the string for logging purposes}
  S := Format('%.8x %s', [GetCurrentThreadID, aStr]);

  {reserve space for the string (plus null terminator) in the log,
   get logical position}
  StrPos := InterlockedExchangeAdd(FBufPos, length(S) + 1);

  {calculate physical position}
  BufPos := StrPos mod FBufSize;

  {if there is enough room to perform a single string copy, copy the
   string plus the null terminator}
  RemBufSize := FBufSize - BufPos;
  if (RemBufSize > length(S)) then
    Move(S[1], FBuffer^[BufPos], length(S) + 1)
  {otherwise, copy the string in two parts, the second part to include
   the terminating null}
  else begin
    Move(S[1], FBuffer^[BufPos], RemBufSize);
    Move(S[RemBufSize + 1], FBuffer[0], length(S) - RemBufSize + 1);
  end;
end;

{TMultiReadExclusiveWriteSynchronizer}

function NewMultiReadExclusiveWriteSynchronizer:PMultiReadExclusiveWriteSynchronizer;
begin
  New(Result,Create);
  with Result^ do
  begin
    InitializeCriticalSection(FLock);
    FReadExit := CreateEvent(nil, True, True, nil);  // manual reset, start signaled
    SetLength(FActiveThreads, 4);
  end;
end;

destructor TMultiReadExclusiveWriteSynchronizer.Destroy;
begin
  BeginWrite;
  inherited Destroy;
  CloseHandle(FReadExit);
  DeleteCriticalSection(FLock);
end;

function TMultiReadExclusiveWriteSynchronizer.WriterIsOnlyReader: Boolean;
var
  I, Len: Integer;
begin
  Result := False;
  if FWriteRequestorID = 0 then Exit;
  // We know a writer is waiting for entry with the FLock locked,
  // so FActiveThreads is stable - no BeginRead could be resizing it now
  I := 0;
  Len := High(FActiveThreads);
  while (I < Len) and
    ((FActiveThreads[I].ThreadID = 0) or (FActiveThreads[I].ThreadID = FWriteRequestorID)) do
    Inc(I);
  Result := I >= Len;
end;

procedure TMultiReadExclusiveWriteSynchronizer.BeginWrite;
begin
  EnterCriticalSection(FLock);  // Block new read or write ops from starting
  if not FWriting then
  begin
    FWriteRequestorID := GetCurrentThreadID;   // Indicate that writer is waiting for entry
    if not WriterIsOnlyReader then              // See if any other thread is reading
      WaitForSingleObject(FReadExit, INFINITE); // Wait for current readers to finish
    FSaveReadCount := FCount;  // record prior read recursions for this thread
    FCount := 0;
    FWriteRequestorID := 0;
    FWriting := True;
  end;
  Inc(FCount);  // allow read recursions during write without signalling FReadExit event
end;

procedure TMultiReadExclusiveWriteSynchronizer.EndWrite;
begin
  Dec(FCount);
  if FCount = 0 then
  begin
    FCount := FSaveReadCount;  // restore read recursion count
    FSaveReadCount := 0;
    FWriting := False;
  end;
  LeaveCriticalSection(FLock);
end;

procedure TMultiReadExclusiveWriteSynchronizer.BeginRead;
var
  I: Integer;
  ThreadID: Integer;
  ZeroSlot: Integer;
begin
  EnterCriticalSection(FLock);
  try
    if not FWriting then
    begin
      // This will call ResetEvent more than necessary on win95, but still work
      if InterlockedIncrement(FCount) = 1 then
        ResetEvent(FReadExit); // Make writer wait until all readers are finished.
      I := 0;  // scan for empty slot in activethreads list
      ThreadID := GetCurrentThreadID;
      ZeroSlot := -1;
      while (I < High(FActiveThreads)) and (FActiveThreads[I].ThreadID <> ThreadID) do
      begin
        if (FActiveThreads[I].ThreadID = 0) and (ZeroSlot < 0) then ZeroSlot := I;
        Inc(I);
      end;
      if I >= High(FActiveThreads) then  // didn't find our threadid slot
      begin
        if ZeroSlot < 0 then  // no slots available.  Grow array to make room
        begin   // spin loop.  wait for EndRead to put zero back into FReallocFlag
          while InterlockedExchange(FReallocFlag, ThreadID) <> 0 do  Sleep(0);
          try
            SetLength(FActiveThreads, High(FActiveThreads) + 3);
          finally
            FReallocFlag := 0;
          end;
        end
        else  // use an empty slot
          I := ZeroSlot;
        // no concurrency issue here.  We're the only thread interested in this record.
        FActiveThreads[I].ThreadID := ThreadID;
        FActiveThreads[I].RecursionCount := 1;
      end
      else  // found our threadid slot.
        Inc(FActiveThreads[I].RecursionCount); // thread safe = unique to threadid
    end;
  finally
    LeaveCriticalSection(FLock);
  end;
end;

procedure TMultiReadExclusiveWriteSynchronizer.EndRead;
var
  I, ThreadID, Len: Integer;
begin
  if not FWriting then
  begin
    // Remove our threadid from the list of active threads
    I := 0;
    ThreadID := GetCurrentThreadID;
    // wait for BeginRead to finish any pending realloc of FActiveThreads
    while InterlockedExchange(FReallocFlag, ThreadID) <> 0 do  Sleep(0);
    try
      Len := High(FActiveThreads);
      while (I < Len) and (FActiveThreads[I].ThreadID <> ThreadID) do Inc(I);
      assert(I < Len);
      // no concurrency issues here.  We're the only thread interested in this record.
      Dec(FActiveThreads[I].RecursionCount); // threadsafe = unique to threadid
      if FActiveThreads[I].RecursionCount = 0 then
        FActiveThreads[I].ThreadID := 0; // must do this last!
    finally
      FReallocFlag := 0;
    end;
    if (InterlockedDecrement(FCount) = 0) or WriterIsOnlyReader then
      SetEvent(FReadExit);     // release next writer
  end;
end;

initialization
  CPUCount := GetCPUCount;

end.

