unit kolstreams; // // purpose: Extended streams for KOL // author: © 2004, Thaddy de Koning // Remarks: See below, some is based on code written and (C) by Julian Bucknall // Released as freeware {Notes: The BufferedStream provides a method to buffer data to/from any stream. Since it is a TStream itself, it can be used in place of any stream. This makes it easy to buffer HandleStream or FileStream instances, for example. The stream which needs to be buffered is passed as a parameter to the Create function. The Buffered Stream merely provides a buffer between the user and the stream that holds the data. It will greatly enhance performance on filestreams when using the advanced read/write methods like readstr and readstrz, since they read and write a byte at a time. The debugstream outputs logging to the debugger window, if available. The WindowedStream reads/writes from an offset in the original stream as if it was its start. Handy for any files that have a header, like bitmaps, database files, wave files etc. The WindowedStreamEx reads/writes from an offset in the original stream as if it was its start AND upto an offset as if it was its end of file. Handy for windowing on data for FFT's etc. This is a "true" window stream. } interface uses Windows, KOL; {$DEFINE CanSetSize} type PByteArray = ^TByteArray; TByteArray = array[0..32767] of Byte; type TSetSize = procedure (aStream : PStream; aNewSize : Longint); {-procedure prototype for setting the size of a stream} type PBufStreamData = ^TBufSTreamData; TBufStreamData = object(Tobj) bsPage : PByteArray; {buffer} bsPageSize : dword; {size of buffer (multiple of 1K)} bsPageStart : Longint; {start of buffer as offset in stream} bsPosInPage : dword; {current position in buffer} bsByteCount : dword; {count of valid bytes in buffer} bsSize : Longint; {count of bytes in stream} bsDirty : boolean; {whether the buffer is dirty or not} bsStream : PStream; {actual stream containing data} bsSetSize : TSetSize; destructor destroy;virtual; procedure bsReadBuffer; procedure bsWriteBuffer; procedure Commit; end; // A buffered stream function NewbufferedStream(aStream : PStream; aBufSize : dword):PStream; // A stream that outputs debug messages to the debugwindow function NewDebugStream(aStream:PStream):pStream; // A stream that acts as a window on another stream // Handy for files with a header portion, like dBase or wave files. // You can probably also use KOL's NewMemoryStreamEx for this... function NewWindowedStream(Stm:Pstream;Offset:Dword):PStream; // A stream with optimized Block reads /writes function NewBlockStream(stm:Pstream;BlockSize:Cardinal):Pstream; implementation destructor TBufStreamData.Destroy; begin {destroy the buffer, after writing it to the actual stream} if (bsPage <> nil) then begin Commit; FreeMem(bsPage, bsPageSize); end; {let our ancestor clean up} inherited Destroy; end; {--------} procedure TBufStreamData.bsReadBuffer; begin bsStream.Seek(bsPageStart, spBegin); bsByteCount := bsStream.Read(bsPage^, bsPageSize); end; {--------} procedure TBufStreamData.bsWriteBuffer; begin bsStream.Seek(bsPageStart, spBegin); bsStream.Write(bsPage^, bsByteCount); end; {--------} procedure TBufStreamData.Commit; begin if bsDirty then begin bsWriteBuffer; bsDirty := false; end; end; {--------} function BufferedStreamRead(stm:Pstream;var Buffer; Count : dword) : dword; var BufAsBytes : TByteArray absolute Buffer; BufInx : Longint; BytesToGo : Longint; BytesToRead : integer; Data:PBufSTreamData; begin Data:=stm.methods.fCustom; {reading is complicated by the fact we can only read in chunks of bsPageSize: we need to partition out the overall read into a read from part of the buffer, zero or more reads from complete buffers and then a possible read from part of a buffer} with data^ do begin {calculate the actual number of bytes we can read - this depends on the current position and size of the stream as well as the number of bytes requested} BytesToGo := Count; if (bsSize < (bsPageStart + bsPosInPage + Count)) then BytesToGo := bsSize - (bsPageStart + bsPosInPage); if (BytesToGo <= 0) then begin Result := 0; Exit; end; {remember to return the result of our calculation} Result := BytesToGo; {initialise the byte index for the caller's buffer} BufInx := 0; {is there anything in the buffer? if not, go read something from the actual stream} if (bsByteCount = 0) then bsReadBuffer; {calculate the number of bytes we can read prior to the loop} BytesToRead := bsByteCount - bsPosInPage; if (BytesToRead > BytesToGo) then BytesToRead := BytesToGo; {copy from the stream buffer to the caller's buffer} Move(bsPage^[bsPosInPage], BufAsBytes[BufInx], BytesToRead); {calculate the number of bytes still to read} dec(BytesToGo, BytesToRead); {while we have bytes to read, read them} while (BytesToGo > 0) do begin {advance the byte index for the caller's buffer} inc(BufInx, BytesToRead); {as we've exhausted this buffer-full, advance to the next, check to see whether we need to write the buffer out first} if bsDirty then begin bsWriteBuffer; bsDirty := false; end; inc(bsPageStart, bsPageSize); bsPosInPage := 0; bsReadBuffer; {calculate the number of bytes we can read in this cycle} BytesToRead := bsByteCount; if (BytesToRead > BytesToGo) then BytesToRead := BytesToGo; {copy from the stream buffer to the caller's buffer} Move(bsPage^, BufAsBytes[BufInx], BytesToRead); {calculate the number of bytes still to read} dec(BytesToGo, BytesToRead); end; {remember our new position} inc(bsPosInPage, BytesToRead); if (bsPosInPage = bsPageSize) then begin inc(bsPageStart, bsPageSize); bsPosInPage := 0; bsByteCount := 0; end; end; end; {--------} function BufferedStreamSeek(stm:Pstream; Offset : integer; Origin : TmoveMethod) : dword; var NewPageStart : Longint; NewPos : Longint; data:PBufStreamData; begin Data:=stm.methods.fCustom; with data^ do begin {calculate the new position} case Origin of spBegin : NewPos := Offset; spCurrent : NewPos := bsPageStart + bsPosInPage + Offset; spEnd : NewPos := bsSize + Offset; end; {calculate which page of the file we need to be at} NewPageStart := NewPos and not(pred(longint(bsPageSize))); {if the new page is different than the old, mark the buffer as being ready to be replenished, and if need be write out any dirty data} if (NewPageStart <> bsPageStart) then begin if bsDirty then begin bsWriteBuffer; bsDirty := false; end; bsPageStart := NewPageStart; bsByteCount := 0; end; {save the new position} bsPosInPage := NewPos - NewPageStart; Result := NewPos; end; end; {--------} procedure BufferedStreamSetSize(stm:Pstream;aNewSize : Longint); var Data:PBufSTreamData; begin Data:=stm.methods.fCustom; with data^ do begin {save the new size and alter the position if required} bsSize := aNewSize; if ((bsPageStart + bsPosInPage) > aNewSize) then stm.Seek(0, spEnd); {now set the size of the actual stream} if Assigned(bsSetSize) then bsSetSize(bsStream, aNewSize) else bsStream.Size := aNewSize; end; end; {--------} function BufferedStreamWrite(stm:Pstream;var Buffer; Count : dword) : dword; var BufAsBytes : TByteArray absolute Buffer; BufInx : Longint; BytesToGo : Longint; BytesToWrite: integer; Data:PBufStreamData; begin Data:=stm.methods.fCustom; with data^ do begin {writing is complicated by the fact we write in chunks of bsPageSize: we need to partition out the overall write into a write from part of the buffer, zero or more writes from complete buffers and then a possible write from part of a buffer} {when we write to this stream we always assume that we can write the requested number of bytes: if we can't (eg, the disk is full) we'll get an exception somewhere eventually} BytesToGo := Count; {remember to return the result of our calculation} Result := BytesToGo; {initialise the byte index for the caller's buffer} BufInx := 0; {is there anything in the buffer? if not, go try read a block from the actual stream - we might be overwriting existing data rather than appending data to the end of the stream} if (bsByteCount = 0) and (bsSize > bsPageStart) then bsReadBuffer; {calculate the number of bytes we can write prior to the loop} BytesToWrite := bsPageSize - bsPosInPage; if (BytesToWrite > BytesToGo) then BytesToWrite := BytesToGo; {copy from the caller's buffer to the stream buffer} Move(BufAsBytes[BufInx], bsPage^[bsPosInPage], BytesToWrite); {mark our stream buffer as requiring a save to the actual stream, note that this will suffice for the rest of the routine as well: no inner routine will turn off the dirty flag} bsDirty := true; {calculate the number of bytes still to write} dec(BytesToGo, BytesToWrite); {while we have bytes to write, write them} while (BytesToGo > 0) do begin {advance the byte index for the caller's buffer} inc(BufInx, BytesToWrite); {as we've filled this buffer, write it out to the actual stream and advance to the next buffer, reading it if required} bsByteCount := bsPageSize; bsWriteBuffer; inc(bsPageStart, bsPageSize); bsPosInPage := 0; bsByteCount := 0; if (bsSize > bsPageStart) then bsReadBuffer; {calculate the number of bytes we can write in this cycle} BytesToWrite := bsPageSize; if (BytesToWrite > BytesToGo) then BytesToWrite := BytesToGo; {copy from the caller's buffer to our buffer} Move(BufAsBytes[BufInx], bsPage^, BytesToWrite); {calculate the number of bytes still to write} dec(BytesToGo, BytesToWrite); end; {remember our new position} inc(bsPosInPage, BytesToWrite); {make sure the count of valid bytes is correct} if (bsByteCount < bsPosInPage) then bsByteCount := bsPosInPage; {make sure the stream size is correct} if (bsSize < (bsPageStart + bsByteCount)) then bsSize := bsPageStart + bsByteCount; {if we're at the end of the buffer, write it out and advance to the start of the next page} if (bsPosInPage = bsPageSize) then begin bsWriteBuffer; bsDirty := false; inc(bsPageStart, bsPageSize); bsPosInPage := 0; bsByteCount := 0; end; end; end; {====================================================================} function NewBufferedStream(aStream:PStream;aBufSize:dword):PStream; var data:PBuFStreamData; ActBufSize : Longint; begin Result:=_NewStream(aStream.Methods^);//inherited base methods; New(Data,Create); Result.add2AutoFree(Data); Result.Methods.fseek:=BufferedstreamSeek; Result.Methods.fread:=BufferedStreamRead; Result.methods.fwrite:=BufferedSTreamWrite; Result.Methods.fCustom:=Data; with Data^ do begin {save the actual stream} bsStream := aStream; {round up the buffer size to a multiple of 1K and a maximum of 32K} ActBufSize := (Longint(aBufSize) + 1023) and $FFFFFC00; if (ActBufSize > 32 * 1024) then bsPageSize := 32 * 1024 else bsPageSize := ActBufSize; {create the buffer} //GetMem(bsPage, bsPageSize); bsPage:=AllocMem(bsPageSize); {set the page/buffer variables to the start of the stream} bsPosInPage := 0; bsByteCount := 0; bsPageStart := 0; bsDirty := false; bsSize := aStream.Size; end; end; function DebugSeek( Strm: PStream; MoveTo: Integer; MoveMethod: TMoveMethod ): DWORD; var Stm:Pstream; begin stm:=strm.methods.fCustom; end; function DebugGetSize( Strm: PStream ): DWORD; var Data:Pstream; begin Data:=strm.methods.fCustom; OutputDebugstring('Getting size...'); Result:=Data.methods.fGetSiz(data); OutputDebugstring(Pchar(format('Size is %d',[Result]))); end; procedure DebugSetSize( Strm: PStream; Value: DWORD ); var Data:Pstream; begin Data:=strm.methods.fCustom; OutputDebugstring(Pchar(Format('Setting size to %d',[Value]))); Data.Methods.fSetSiz(data,Value); OutputDebugstring(Pchar(format('Size is %d',[Data.Size]))); end; function DebugRead( Strm: PStream; var Buffer; Count: DWORD ): DWORD; var Data:Pstream; begin Data:=strm.methods.fCustom; OutputDebugstring(Pchar(Format('Start reading %d bytes',[count]))); Result:=Data.Read(Buffer,Count); OutputDebugstring(Pchar(format('%d actual bytes read',[Result]))); end; function DebugWrite( Strm: PStream; var Buffer; Count: DWORD ): DWORD; var Data:Pstream; begin Data:=strm.methods.fCustom; OutputDebugstring(Pchar(Format('Start writing %d bytes',[count]))); Result:=Data.write (Buffer,Count); OutputDebugstring(Pchar(format('%d actual bytes written',[Result]))); end; procedure DebugClose( Strm: PStream ); var Data:Pstream; begin Data:=strm.methods.fCustom; OutputDebugstring('Start closing stream'); Data.methods.fClose(data); OutputDebugstring('Stream closed'); end; procedure DebugWait( Strm: PStream ); var Data:Pstream; begin Data:=strm.methods.fCustom; OutputDebugstring('Start Wait'); Data.Wait; OutputDebugstring('End wait'); end; function NewDebugStream(aStream:PStream):Pstream; begin Result:=_NewSTream(aStream.Methods^); Result.methods.fSeek:=DebugSeek; Result.Methods.fRead:=DebugRead; Result.Methods.fGetSiz:=DebugGetSize; Result.Methods.fSetSiz:=DebugSetSize; Result.Methods.fWrite:=DebugWrite; Result.Methods.fClose:=DebugClose; Result.Methods.fWait:=DebugWait; Result.Methods.fCustom:=aStream; end; type PWindowData=^TWindowData; TWindowData=object(Tobj) Stream:PStream; ZeroPos:Dword; end; function WindowSeek(stm:Pstream;Offset : longint; Origin : Tmovemethod) : dword; var Data:PWindowData; NewPos : dword; begin Data:=stm.methods.fCustom; case Origin of spBegin: NewPos := data.Stream.Seek(Offset + data.ZeroPos, spBegin); spCurrent : NewPos := data.Stream.Seek(Offset, spCurrent); spEnd : NewPos := data.Stream.Seek(Offset, spEnd); end; if (NewPos < data.ZeroPos) then NewPos := data.Stream.Seek(data.ZeroPos, spBegin); Result := NewPos - data.ZeroPos; end; function NewWindowedStream(Stm:Pstream;Offset:Dword):PStream; var data:PWindowData; begin Result:=_NewStream(stm.methods^); New(Data,Create); data.Stream:=stm; Result.methods.fSeek:=WindowSeek; Result.methods.fCustom:=Data; Result.Add2autoFree(Data); end; function BlockStreamRead ( Strm: PStream; var Buffer; Count: DWORD = 0): DWORD; begin if Count=0 then Count:= Strm.Tag; Strm.Read(Buffer,Count); end; function BlockStreamWrite( Strm: PStream; var Buffer; Count: DWORD = 0): DWORD; begin if Count=0 then Count:= Strm.Tag; Strm.Write(Buffer,Count); end; Type // To get at protected TStream Methods PStmHack = ^TStmHack; TStmHack = object(Tstream) end; function NewBlockStream(stm:Pstream;BlockSize:Cardinal):PStream; begin with PStmHack(Result)^ do begin Result:=_Newstream(PStreamMethods(nil)^); // Copy over data and methods fData:=Stm.Data; fMethods:=Stm.Methods^; // Replace read and write methods fMethods.fRead:=BlockStreamRead; fMethods.fWrite:=BlockStreamWrite; fTag:=BlockSize; end; end; end.