Hi everyone, I am quite new to this mailing list, but I hope anyone can answer my question. The problem I am experiencing is as follows: I get MPEG2 TS-data from a realtime hardware encoder who is multicasting into the local network. I am using TWSocket to receive the datapackes. As some of you might know, MPEG2 Transport Stream (TS) consists of 188 Byte including a 4Byte header, which I can check to see, if the received package is OK. Now, if I am working with only one thread, everything is fine. Once I am adding a second thread (OnDataAvailable -> RingBuffer; RingBuffer-> Thread2 -> File), it seems that OnDataAvailable isn't executed for every event and so buffers the data. The hardware encoder packs 7 MPEG2 packages into one udp-packet, so the standard-size package I get is 1316 Byte. Now, once I am applying the second thread, packages get multiples of that (2632), but they all share the same characteristics: The first header is damaged, 2nd - 7th are fine, 8th - whatever size are empty or random data. I was wondering if maybe TWSocket does not copy the second 1316Byte packet into the buffer-area after the first 1316Byte package if OnDataAvailable hasn't been called in between, but somewhere into the first bytes, which would explain why the first header (with 188 Bytes following is broken). This is how parts of my debugfile look like
1316 Byte udp package received header 1 - OKAY, pid 256 -> payload video header 2 - OKAY, pid 256 -> payload video header 3 - OKAY, pid 256 -> payload video header 4 - OKAY, pid 259 -> payload audio header 5 - OKAY, pid 256 -> payload video header 6 - OKAY, pid 256 -> payload video header 7 - OKAY, pid 256 -> payload video 2632 Byte udp package received header 1 - ERROR, pid 19440 header 2 - OKAY, pid 256 header 3 - OKAY, pid 256 header 4 - OKAY, pid 256 header 5 - OKAY, pid 256 header 6 - OKAY, pid 256 header 7 - OKAY, pid 256 header 8 - ERROR, pid 0 header 9 - ERROR, pid 0 header 10 - ERROR, pid 0 header 11 - ERROR, pid 0 header 12 - ERROR, pid 0 header 13 - ERROR, pid 0 header 14 - ERROR, pid 0 The code I am using is in analogy to the TWSocket multithreading example. Code looks like this: type TPos = RECORD Buffersize, Blocksize : Int64; CurrentWPos, CurrentRPos, StartPos : Int64; Diskrepanz : Int64; Richtung : Byte; END; TStreamThread = class(TThread) private FAddrDatei : Pointer; BlockMem : Pointer; procedure CalculateDiscrepancy; procedure WriteMemToFile; procedure ResetPosition; public FDatenspeicher : Pointer; FPos : ^TPos; FAddrLink : Pointer; FAusgabe : TLabel; Ffinalizethread: Boolean; FExchangeFile : Boolean; procedure SetLabel; procedure Execute; override; end; TChangeThread = class(TThread) private procedure ExchangeFile; procedure PrepareFile; public Mode : Byte; procedure Execute; override; end; TRingBuffer = class(TObject) private Speicher : Pointer; Groesse : Int64; Start : Pointer; Writer : Int64; Reader : Int64; Diskrepanz : Int64; Richtung : Integer; public fReadyToWrite : TEvent; fReadyToRead : TEvent; CS : TCriticalSection; constructor Create; procedure GetData(pData : Pointer; pSize : Int64); procedure PutData(pData : Pointer; pSize : Int64); end; var Datei : ARRAY[0..1] OF TFileStream; AddrDatei : Pointer; AddrLink : Pointer; DoExchange : Byte; RootName : String; Socket:TWSocket; StreamThread : TStreamThread; ChangeThread : TChangeThread; ExchangeThread : TChangeThread; public { Public-Deklarationen } DataRes : Pointer; Pos : ^TPos; MyVar : Integer; CritSec : TCriticalSection; Wholesize : Int64; CurrentFileSize: Int64; WrittenSize : Int64; Testvariable : Int64; ReceivedPacks : Int64; MaxSize : Int64; PrerunSize : Int64; FileCount : Integer; StreamID : Integer; RecordingID : Integer; ErrorPacks : Int64; procedure DataAvailable(Sender : TObject; Error : Word); procedure WriteToMem; end; var Form2: TForm2; implementation {$R *.dfm} {procedure TForm2.SetLabel(lname: string); begin StatusLA.Caption:=lname; end; } constructor TForm2.TRingBuffer.Create; begin Inherited Create; GetMem(Start,Groesse); Writer:=0; Reader:=0; Richtung:=0; CS:=TCriticalSection.Create; end; procedure TForm2.TRingbuffer.GetData(pData : Pointer; pSize : Int64); var zwSize1,zwSize2 : Int64; begin IF fReadyToRead.WaitFor(INFINITE) = wrSignaled THEN BEGIN fReadyToWrite.ResetEvent; try CS.Enter; Diskrepanz:=Writer+Groesse*Richtung - Reader; IF (Diskrepanz-pSize) > 0 THEN BEGIN IF Reader+pSize <= Groesse THEN BEGIN copyMemory(pData,POINTER(Int64(Speicher)+Reader),pSize); Inc(Reader,pSize); IF Reader=Groesse THEN BEGIN Reader:=0; Richtung:=0; END; END ELSE BEGIN zwSize1:=Groesse-Reader; zwSize2:=Reader+pSize-Groesse; copyMemory(pData,POINTER(Int64(Speicher)+Reader),zwSize1); copyMemory(POINTER(Int64(pData)+zwSize1),Speicher,zwSize2); Reader:=zwSize2; Richtung:=0; END; END; finally CS.Leave; fReadyToWrite.SetEvent; END; END; end; procedure TForm2.TRingbuffer.PutData(pData : Pointer; pSize : Int64); var zwSize1, zwSize2 : Int64; begin if fReadyToWrite.WaitFor(INFINITE)=wrSignaled THEN BEGIN fReadyToRead.ResetEvent; try CS.Enter; Diskrepanz:=Writer+Groesse*Richtung - Reader; IF Diskrepanz < Groesse THEN BEGIN IF Writer + pSize <= Groesse THEN BEGIN CopyMemory(POINTER(Int64(Speicher)+Writer),pData,pSize); Inc(Writer,pSize); IF Writer=Groesse THEN BEGIN Writer:=0; Richtung:=1; END; END ELSE BEGIN zwSize1:=Groesse-Writer; zwSize2:=Writer+pSize-Groesse; CopyMemory(POINTER(Int64(Speicher)+Writer),pData,zwSize1); CopyMemory(Speicher,POINTER(INTEGER(pData)+zwsize1),zwSize2); Writer:=zwsize2; Richtung:=1; END; END; finally CS.Leave; fReadyToRead.SetEvent; end; END; end; procedure TForm2.TChangeThread.ExchangeFile; begin Form2.StreamThread.FExchangeFile:=true; end; procedure TForm2.TChangeThread.PrepareFile; BEGIN Inc(Form2.FileCount,1); Form2.Datei[Form2.FileCount MOD 2]:=TFileStream.Create(Form2.rootname+format(fmstring,[Form2.FileCount]),fmCreate); END; procedure TForm2.TStreamThread.SetLabel; begin Form2.StatusLA.Caption:='StreamThread'; end; procedure TForm2.TChangeThread.Execute; begin FreeOnTerminate:=true; if Mode=ctmPrepare then BEGIN Synchronize(PrepareFile); END; IF Mode=ctmExchange THEN BEGIN Synchronize(ExchangeFile); END; end; procedure TForm2.TStreamThread.CalculateDiscrepancy; begin FPos^.Diskrepanz:=(FPos^.Buffersize * FPos^.Richtung + FPos^.CurrentWPos) - FPos^.CurrentRPos; end; procedure TForm2.TStreamThread.WriteMemToFile; begin //Code IF (FAddrDatei <> nil) THEN BEGIN TFileStream(FAddrDatei^).Write(BlockMem^,FPos^.Blocksize); Inc(FPos^.CurrentRPos,FPos^.BlockSize); Inc(Form2.WrittenSize,FPos^.BlockSize); END ELSE Form2.StatusLA.Caption:='Error'; end; procedure TForm2.TStreamThread.ResetPosition; begin //Code FPos^.CurrentRPos:=0; FPos^.Richtung:=0; end; procedure TForm2.TStreamThread.Execute; var zwPointer : Pointer; zwSize : Int64; doneit : Boolean; dummymem : String; begin GetMem(BlockMem,FPos^.BlockSize); fExchangeFile:=false; WHILE not terminated DO BEGIN if FExchangeFile THEN BEGIN Form2.AddrDatei:[EMAIL PROTECTED] MOD 2]; FreeAndNil(Form2.Datei[(Form2.FileCount-1) MOD 2]); FExchangeFile:=False; END; FAddrDatei:=POINTER(FAddrLink^); IF {(FPos^.Diskrepanz>=FPos^.Blocksize) AND} (assigned(TFileStream(FAddrDatei^))) AND (Form2.WrittenSize < Form2.Wholesize) THEN BEGIN IF (FPos^.CurrentRPos + FPos^.BlockSize <= FPos^.Buffersize) THEN BEGIN CopyMemory(BlockMem,POINTER(FPos^.StartPos+FPos^.CurrentRPos),FPos^.BlockSize); Synchronize(WriteMemToFile); END; //if fpos^.currentrpos+fpos^.blocksize <= fpos^.buffersize IF (FPos^.CurrentRPos >= FPos^.Buffersize) THEN BEGIN Synchronize(ResetPosition); END; // if fpos^.currentrpos >= fpos^.buffersize END; // if Diskrepanz>0 END; //while not terminated dispose(BlockMem); end; procedure TForm2.WriteToMem; begin //Code end; procedure TForm2.DataAvailable(Sender : TObject; Error : Word); var receive, dest,zw : Pointer; rcvsize : Int64; zwsize : Int64; begin // StreamThread.Suspend; rcvsize:=Socket.RcvdCount; IF rcvsize <> Pos^.Blocksize THEN Label5.Caption:=inttostr(rcvsize); GetMem(receive,rcvsize); Socket.Receive(receive,rcvsize); IF (rcvsize > 0) THEN BEGIN Inc(CurrentFilesize,rcvsize); Inc(WholeSize,rcvsize); Inc(ReceivedPacks,rcvsize DIV Pos^.Blocksize); IF (rcvsize MOD Pos^.Blocksize <> 0) THEN Inc(ErrorPacks); IF ((Pos^.CurrentWPos+rcvsize) <= Pos^.Buffersize) THEN BEGIN dest:=POINTER(Pos^.StartPos+Pos^.CurrentWPos); CritSec.Enter; CopyMemory(dest,receive,rcvsize); CritSec.Leave; END ELSE BEGIN zwsize:=Pos^.Buffersize - Pos^.CurrentWPos; dest:=POINTER(Pos^.StartPos+Pos^.CurrentWPos); CritSec.Enter; CopyMemory(dest,receive,zwsize); CritSec.Leave; zw:=POINTER(Int64(receive)+zwsize); dest:=POINTER(Pos^.StartPos); zwsize:=(Pos^.CurrentWPos+rcvsize)-Pos^.Buffersize; CritSec.Enter; CopyMemory(dest,zw,zwsize); Pos^.CurrentWPos:=zwsize; Pos^.Richtung:=1; CritSec.Leave; END; IF Pos^.CurrentWPos+rcvsize > Pos^.Buffersize THEN BEGIN CritSec.Enter; Pos^.CurrentWPos:=0; Pos^.Richtung:=1; CritSec.Leave; Label5.Caption:='Error on size:'+inttostr(rcvsize); END ELSE BEGIN CritSec.Enter; Inc(Pos^.CurrentWPos,rcvsize); CritSec.Leave; END; END; // StreamThread.Resume; IF (CurrentFileSize >= (MaxSize-PreRunSize)) AND (DoExchange = 0) THEN BEGIN DoExchange:=1; ChangeThread:=TChangeThread.Create(true); ChangeThread.Mode:=ctmPrepare; ChangeThread.Resume; END; IF (CurrentFileSize >= MaxSize) AND (DoExchange=1) THEN BEGIN DoExchange:=0; CurrentFileSize:=0; ExChangeThread:=TChangeThread.Create(true); ExChangeThread.Mode:=ctmExchange; ExChangeThread.Resume; END; dispose(receive); Label1.Caption:=Inttostr(round((Pos^.CurrentWPos/Pos^.Buffersize)*100)); Label2.Caption:=Inttostr(round((Pos^.CurrentRPos/Pos^.Buffersize)*100)); Label3.Caption:=Inttostr(Pos^.Diskrepanz); Label4.Caption:=Inttostr(CurrentFileSize); Label5.Caption:=Inttostr(FileCount+1); Label11.Caption:=Inttostr(WholeSize); Label12.Caption:=Inttostr(WrittenSize); Label17.Caption:=Inttostr(receivedPacks); Label19.Caption:=Inttostr(ErrorPacks); end; procedure TForm2.Button2Click(Sender: TObject); begin new(Pos); Pos^.Buffersize:=StrToInt(BuffED.Text); Pos^.BlockSize:=StrToInt(BlockED.Text); GetMem(DataRes,Pos^.Buffersize); Pos^.CurrentWPos:=0; Pos^.CurrentRPos:=0; Pos^.StartPos:=Int64(DataRes); Pos^.Richtung:=0; Pos^.Diskrepanz:=0; FileCount:=0; MaxSize:=strtoint(MAXED.Text); Prerunsize:=500000; wholesize:=0; CurrentFileSize:=0; WrittenSize:=0; ReceivedPacks:=0; ErrorPacks:=0; end; procedure TForm2.Button1Click(Sender: TObject); var TestPointer : Pointer; begin WholeSize:=0; CurrentFileSize:=0; MyVar:=0; Socket:=TWSocket.Create(self); Socket.Addr:='0.0.0.0'; Socket.MultiCastAddrStr:=IPED.Text; Socket.Port:=PortED.Text; Socket.Proto:='udp'; Socket.MultiCast:=true; Socket.ReuseAddr:=true; Socket.MultiThreaded:=true; Socket.OnDataAvailable:=DataAvailable; CritSec:=TCriticalSection.Create; CreateDir(DIRED.Text); Rootname:=DIRED.Text+'\'+FILED.Text+format(fmstring,[recordingid])+format(fmstring,[streamid]); Datei[0]:=TFileStream.Create(Rootname+format(fmstring,[FileCount]),fmCreate); AddrDatei:[EMAIL PROTECTED]; AddrLink:[EMAIL PROTECTED]; StreamThread:=TStreamThread.Create(true); StreamThread.FDatenspeicher:=DataRes; StreamThread.FPos:=Pointer(Int64(Pos)); StreamThread.FAusgabe:=StatusLA; StreamThread.FAddrLink:=AddrLink; StreamThread.FreeOnTerminate:=true; StreamThread.Ffinalizethread:=false; Socket.Listen; StreamThread.Resume; end; end. -- Christian Hinske Schleißheimerstraße 157 D-80797 München Tel. :(++ 49 89) 36 0 37 445 Mobil :(++49)176 234 10 146 e-mail: [EMAIL PROTECTED] Der GMX SmartSurfer hilft bis zu 70% Ihrer Onlinekosten zu sparen! Ideal für Modem und ISDN: http://www.gmx.net/de/go/smartsurfer -- To unsubscribe or change your settings for TWSocket mailing list please goto http://www.elists.org/mailman/listinfo/twsocket Visit our website at http://www.overbyte.be