PDA

Archiv verlassen und diese Seite im Standarddesign anzeigen : Schnelle Thread-Kommunikation ...


zeckensack
2004-02-10, 16:12:34
Die Frage vorweg:
Ich habe den Verdacht, dass ReleaseSemaphore immer, sofort den auf die Semaphore wartenden Thread wieder aufnimmt, auch wenn der aktuelle Thread gerade erst wieder angelaufen ist. Stimmt's? Und wenn ja, kann man das auch verhindern?

Nachfolgender Code misst die Transportgeschwindigkeit von "Kommandos" über einen FIFO. Ein Thread schreibt, ein Thread liest. Synchronisiert wird über zwei Semaphoren, "push_semaphore" (ist signaled, solange noch "channels" zum Senden frei sind) und "pull_semaphore" (ist signaled, wenn mindestens ein "channel" Daten enthält, die noch nicht empfangen wurden).

Der Empfänger wartet also darauf, dass "pull_semaphore" neue hereinkommende Daten signalisiert. Nach dem Auslesen der Daten wird die "push_semaphore" inkrementiert, um dem sendenden Thread den wieder freigewordenen Transportkanal zu signalisieren.

Der Sender arbeitet quasi genauso, nur umgekert. Er wartet auf die "push_semaphore", weil er mindestens einen freien Kanal zum Senden braucht. In diesen schreibt er dann, und inkrementiert die "pull_semaphore", die wiederum den Empfänger-Thread mobilisiert.

Auf Win98SE erreiche ich 570000 Transfers/s, auf Win2kSP4 605000 Transfers/s, entsprechend etwa 1,6ms Pro Transfer. Das muss IMO Scheduler-Overhead sein.

Zur Geschwindigkeitsmessung wurde die Anzeige der übertragenen Daten (via printf) natürlich auskommentiert. Also nur der Vollständigkeit halber: es funktioniert. Die Daten kommen korrekt an. Die 5 Sekunden-Timeouts werden nicht erreicht, hatte ich nur zur Vorsicht eingebaut, falls mir das Teil absemmelt.

Die Anzahl der Kanäle beeinflusst natürlich auch die Geschwindigkeit. Mit 128 habe ich IMO das Optimum erreicht. Verändere ich diesen Wert nach oben oder nach unten, sinkt (zumindest auf meinem System) der Durchsatz. Daran liegt's also nicht.


#include "..\stdafx.h"

uint send_offset=0;
uint receive_offset=0;

const uint channels=128;
const uint max_channel_mask=127;

struct ThreadPipe
{
ubyte command[channels];
ubyte command_data[channels];
};
volatile ThreadPipe thread_pipe;

DWORD thread_id;
HANDLE thread_handle;
HANDLE push_semaphore;
HANDLE pull_semaphore;

uint bytes_received=0;

DWORD WINAPI
ThreadFunc(void* param)
{
// printf("Thread runs!\n");

bool end_signal=false;

while (!end_signal)
{
WaitForSingleObject(pull_semaphore,5000);
ubyte command=thread_pipe.command[receive_offset];

if (command==0)
{
end_signal=true;
}
else
if (command==1)
{
++bytes_received;
// printf("%c",thread_pipe.command_data[receive_offset]);
}

++receive_offset;
receive_offset&=max_channel_mask;
ReleaseSemaphore(push_semaphore,1,NULL);
}
return(0);
}

int
main()
{
memset((void*)&thread_pipe,0,sizeof(thread_pipe));

push_semaphore=CreateSemaphore(0,channels,channels,NULL);
if (push_semaphore==NULL)
{
printf("semaphore creation failed\n");
return(1);
}
pull_semaphore=CreateSemaphore(0,0,channels,NULL);
if (pull_semaphore==NULL)
{
CloseHandle(push_semaphore);
printf("semaphore creation failed\n");
return(1);
}

thread_handle=CreateThread(NULL,0,ThreadFunc,0,CREATE_SUSPENDED,&thread_id);

if (thread_handle==0)
{
CloseHandle(push_semaphore);
CloseHandle(pull_semaphore);
printf("thread creation failed\n");
return(1);
}
printf("Starting thread ...\n");

AbsTimer t;
ResumeThread(thread_handle);

for (int i=0;i<1000000;++i)
{
WaitForSingleObject(push_semaphore,5000);
thread_pipe.command[send_offset]=1;
// thread_pipe.command_data[send_offset]='A'+(i%26);
ReleaseSemaphore(pull_semaphore,1,NULL);

++send_offset;
send_offset&=max_channel_mask;
}
WaitForSingleObject(push_semaphore,5000);

thread_pipe.command[send_offset]=0;
thread_pipe.command_data[send_offset]=0;
ReleaseSemaphore(pull_semaphore,1,NULL);

++send_offset;
send_offset&=max_channel_mask;

WaitForSingleObject(thread_handle,5000);

double dt=t.elapsed_seconds();

printf("Thread has run\n");
printf("%g commands per second\n",bytes_received/dt);

CloseHandle(push_semaphore);
CloseHandle(pull_semaphore);
CloseHandle(thread_handle);

return(0);
}

Demirug
2004-02-10, 16:56:38
Ja das Verhalten ist konform zu dem in der Kernelspezifikation definierten Verhalten von Semaphore. Sobald ein wartender Thread freigegeben wird (also vom Wait in den Ready Zustand wechselt) bekommt er einen dynamischen priority Boost. Dieser Boost führt nun dazu das der lesende Thread eine höhere Priorität als der sendende bekommt. Der Kern ist dadurch gezwungen den Threadwechsel vorzunehmen.

Umgehen können müsste man das indem man dem lesenden Thread eine kleinere Basispriorität gibt. Dann sollte auch der dynamische Boost nicht reichen um den Wechsel zu verhindern.

Warum aber überhaupt eine Semaphore? Die Teile sind ja sowas vom lahm.

zeckensack
2004-02-10, 17:14:11
Original geschrieben von Demirug
Ja das Verhalten ist konform zu dem in der Kernelspezifikation definierten Verhalten von Semaphore. Sobald ein wartender Thread freigegeben wird (also vom Wait in den Ready Zustand wechselt) bekommt er einen dynamischen priority Boost. Dieser Boost führt nun dazu das der lesende Thread eine höhere Priorität als der sendende bekommt. Der Kern ist dadurch gezwungen den Threadwechsel vorzunehmen.Hmmm ... finde ich bisserl doof. Das könnte man auch mit Sleep(0) lösen, und dann hätte man die volle Kontrolle über dieses Verhalten.
Umgehen können müsste man das indem man dem lesenden Thread eine kleinere Basispriorität gibt. Dann sollte auch der dynamische Boost nicht reichen um den Wechsel zu verhindern.Basispriorität ist ein heikles Thema ... davon abgesehen bräuchte ich das Verhalten auch für beide Threads, es bringt mir also IMO wenig, wenn ich einem eine höhere Priorität zuordne.

Ich habe dank deines Hinweises aber etwas interessantes gefunden, und zwar SetThreadPriorityBoost. Damit kann man diesen Schub abschalten.
Bringt nur leider nichts. Setze ich es für einen der beiden Threads, sinkt der Durchsatz (eigentlich auch logisch). Setze ich es für beide Threads, steigt der Durchsatz wieder, bleibt aber trotzdem noch leicht unter dem Wert "ohne alles", ie 530000 T/s.
Warum aber überhaupt eine Semaphore? Die Teile sind ja sowas vom lahm. Es erschien mir die passendste Lösung zu sein, um einen richtigen FIFO zu bauen, auf den die beiden Threads auch wirklich gleichzeitig zugreifen können, ohne dass Unfälle passieren (ich habe zwar kein SMP/SMT-System, aber wenn ich schonmal dabei bin ...).

Ich habe nunmal n Kanäle, und nicht nur einen einzigen. Die Semaphore ist das einzige Sync-Objekt, das zählen kann.

Hast du einen besseren Vorschlag? :)

Demirug
2004-02-10, 17:28:01
Original geschrieben von zeckensack
Es erschien mir die passendste Lösung zu sein, um einen richtigen FIFO zu bauen, auf den die beiden Threads auch wirklich gleichzeitig zugreifen können, ohne dass Unfälle passieren (ich habe zwar kein SMP/SMT-System, aber wenn ich schonmal dabei bin ...).

Ich habe nunmal n Kanäle, und nicht nur einen einzigen. Die Semaphore ist das einzige Sync-Objekt, das zählen kann.

Du brauchst kein SMP/SMT System damit da was daneben geht. Ich habe da viele Erfahrungswerte. Mit einem SMP/SMT System geht es nur schneller.

Hast du einen besseren Vorschlag? :)

Eine Citical Section und selber zählen. Wobei mir ehrlich gesagt das Prinzip mit den Kanälen das du hier benutzt nicht ganz klar ist. Kannst du mal die Aufgabenstellung genauer angeben. Soweit ich das verstehe willst du von einem Thread zum anderen möglichst schnell und viele 1 Byte Kommandos schicken?

Crushinator
2004-02-10, 17:52:31
Ich würde Citical Sections dazu verwenden, und zwar jeweils nur vor Zuweisung der Variablen/Proerties betreten bzw. danach sofort wieder verlassen.

zeckensack
2004-02-10, 18:00:41
Original geschrieben von Demirug
Du brauchst kein SMP/SMT System damit da was daneben geht. Ich habe da viele Erfahrungswerte. Mit einem SMP/SMT System geht es nur schneller.Ist klar :D
Es geht auch nicht um die möglichen Unfälle, sondern um den maximalen Nutzen. Rein für Single-CPU könnte ich auch einfach einen Mutex nehmen, um den jeweils anderen Thread komplett zu blockieren.

Eine Citical Section und selber zählen. Wobei mir ehrlich gesagt das Prinzip mit den Kanälen das du hier benutzt nicht ganz klar ist. Kannst du mal die Aufgabenstellung genauer angeben. Soweit ich das verstehe willst du von einem Thread zum anderen möglichst schnell und viele 1 Byte Kommandos schicken?Das ist bisher nur "proof of concept". 1 Byte pro Kommando, um den Bandbreitenverbrauch zu minimieren.
Die Kommandos sollen später auch mit grösseren Datenpaketen zusammengeschnürt werden. Die Grösse von Kommando+Daten ist variabel, jeder Transfer belegt aber mindestens einen "Kanal".

Die Aufgabenstellung entspricht beinahe exakt dem DirectX thunk layer. Ich will Daten aus mehreren (nicht von mir erzeugten) Threads entgegennehmen, multiplexen, und in den Kontext eines einzelnen Worker-Threads übertragen. Ich habe nur für Single Thread die Garantie, dass mein Backend wie erwartet funktioniert.

Die Idee hinter den Kanälen ist eben dass ich asynchron und trotzdem gleichzeitig schreibend und lesend auf den FIFO zugreifen können möchte. Also zB dass der Worker-Thread das Kommando in Kanal 0 ausführt, während der Sender-Thread schon viel weiter ist, und in Kanal 10 weitere Kommandos hinterlegt. Der Empfänger müsste nur dann warten, wenn er den FIFO schneller leert als der Sender ihn nachfüllen kann. Der Sender müsste nur dann warten, wenn er den FIFO schneller füllt, als der Empfänger ihn ausleeren kann. Im Optimalfall (mit SMP/SMT) arbeiten eben beide Threads gleichzeitig.

Eben wie ein echter Hardware-FIFO :)

Eine critical section erlaubt keine echte Parallelität mehr, und verhält sich daher auch nicht besser als ein Mutex (auch wenn sie auf OS-Ebene effizienter ist, darum geht's nicht).

edit:
Oder meintest du (bzw) ihr "selber zählen, aber den Zähler nur in einer CS anfassen"?
Das probiere ich gleich mal aus :)

Demirug
2004-02-10, 18:20:07
Ja den Zähler nur in der CS anfassen.

Dein Problem kommt mir bekannt vor. Ich hatte mal etwas ähnliches. Über viele Netzwerkports kommen viele Aufträge herein die von einem anderen Thread abgearbeitet werden sollen. Dieser Thread füllte nun wiederum Buffer mit den Antworten die von den anderen Threads wieder versendet wurden.

Ich hätte noch ein paar Fragen zu deiner Umgebung. Kommen die Aufträge kontinuirlich oder Stossweise? Ist die Bearbeitunzeit der unterschiedlichen Kommandos stark unterschiedlich oder eher gleich gross? Wie gross ist die vertretbare Latenz zwischen Auftragserteilung und Ausführung?

zeckensack
2004-02-10, 18:41:16
Original geschrieben von Demirug
Ja den Zähler nur in der CS anfassen.Yay!
3 Mio/s mit 128 Kanälen, skaliert bis rauf zu 9 Mio/s mit 1024 Kanälen :)

Und vielleicht geht da noch mehr =)
edit: nach kleineren Änderungen bis zu 18Mio/s :up:

Dein Problem kommt mir bekannt vor. Ich hatte mal etwas ähnliches. Über viele Netzwerkports kommen viele Aufträge herein die von einem anderen Thread abgearbeitet werden sollen. Dieser Thread füllte nun wiederum Buffer mit den Antworten die von den anderen Threads wieder versendet wurden.

Ich hätte noch ein paar Fragen zu deiner Umgebung. Kommen die Aufträge kontinuirlich oder Stossweise? Ist die Bearbeitunzeit der unterschiedlichen Kommandos stark unterschiedlich oder eher gleich gross? Wie gross ist die vertretbare Latenz zwischen Auftragserteilung und Ausführung? Die Aufträge kommen recht kontinuierlich, haben aber stark unterschiedliche Abarbeitungsszeiten. Durch den FIFO gehen auch nicht die Original-Kommandos, sondern nett verpackte "Batches" (erwähnte ich schon den DXG thunk layer? ;)). Die Verzögerung durch den FIFO ist fast völlig egal, denn dafür gibt's Synchronisierungs-Kommandos. Der sendende Thread legt sich dann eben solange schlafen, bis der FIFO wieder komplett leer ist.

zeckensack
2004-02-10, 19:01:35
*ehrm*

Natürlich dankeschön euch beiden :wink:

Demirug
2004-02-10, 19:05:14
Ich hatte da damals etwas recht heftiges zusammengebaut was auch bei dir gut funktionieren müsste. Ist allerdings wesentlich komplexer als deine Lösung.

Dabei wird pro Quelle ein Vorbuffer verwaltet in den alle Auftrage geschrieben werden. Ist dieser Buffer voll wird er an die globale Auftragsliste am Ende eingefügt. Dieses Einfügen kann durch ein entsprechendes Kommando auch erzwungen werden. Jeder Vorbuffer und die Globale Liste werden durch jeweils eine Critical Section geschützt. Die Critical Sections für die Vorbuffer braucht man nur für die verbesserte Variante (s. unten)

Der Bearbeitunsthread holt sich nun immer den ersten Buffer aus der Globalen Liste und arbeiten in dann ab. Nur während des herausholen des Buffers aus der Globalen Liste wird die Critical Section gelockt. Ist die globale Liste leer wartet der Thread auf ein Event. Dieses Event wird gesetzt sobald ein anderer Thread wieder einen Vorbuffer in die globale Liste einfügt.

Das ganze lässt sich noch etwas verbessern wenn man dem Bearbeitungsthread in dem Fall das die globale Liste leer ist prüfen lässt ob es noch einen Vorbuffer mit arbeit gibt. Dazu kann man neben der globalen Liste noch für jeden Sender einen Zeiger speichern der auf den aktuellen Vorbuffer (wenn es einen gibt) verweisst. Dieser Zeiger wird immer dann gesetzt wenn ein Vorbuffer begonnen wird. Ist die globale liste nun leer schaut man welche Sender vorbuffer haben und versucht die Criticalsection eines Senders an sich zu bringen. Dann nimmt man diesem Sender den Vorbuffer weg. Dieser legt sich ja bei Bedarf wieder einen neuen an. Sind aber auch keine Vorbuffer vorhanden wartetn man wieder auf das Event welches natürlich in diesem Fall auch schon gesetzt werden kann sobald der erste Auftrag in einem Vorbuffer eingetragen ist.

Diese Variante hat den Vorteil das wenn genügend Auftrage da sind alles über die Globale Liste läuft und es sehr selten dazu kommt das beide Seiten auf die gleiche Section zugreifen wollen. Ist wenig Last da greift sich der Bearbeiter die Aufträge sehr schnell. Allerdings steigt hier die Gefahr das sich beide Seiten blockieren.

Die Rückkanäle können auf die gleiche Art aufgebaut werden.

Edit: PS: Die Datenbuffer habe ich natürlich nicht ständig neu angelegt. Da die Buffer selbst die Elemente der Liste sind habe ich eine weitere Liste welche die gerade unbenutzte Buffer enthält. Nur wenn diese leer ist wurde ein neuer Buffer vom heap angefordert. Beim Freigeben landeten die Buffer in der Liste.

zeckensack
2004-02-10, 19:55:20
Hmmm, fies :naughty:

Das mit den Vorbuffern brauche ich IMO nicht. Ich schütze sowieso alle meine entry points mit einem (selbsgebastelteten, sehr schnellen) globalen Mutex. Dh dass Schreibzugriffe auf den FIFO auch nur dann erfolgen können, wenn irgendwo unterwegs ein Mutex eingesammelt werden konnte. Die sind also schon automatisch serialisiert.

Ich schiele da auch auf die Bandbreite. Ich kann batches auch direkt in einem FIFO-"Kanal" erzeugen. Wenn er voll ist, ist er eben voll, dann muss ich den nächsten nehmen.

Ich plane derzeit maximal 256 Byte Nutzdaten pro Kanal (ursprünglich wollte ich auch nur 32 Kanäle, um im L1-Cache zu bleiben ...). Grössere Datenklumpen will ich auch nicht in den FIFO kopieren, sondern direkt vom Worker-Thread abholen lassen (in der Zwischenzeit muss ich den aufrufenden Thread dann natürlich blockieren).

Crushinator
2004-02-10, 20:00:31
Original geschrieben von zeckensack
(...) Oder meintest du (bzw) ihr "selber zählen, aber den Zähler nur in einer CS anfassen"? Ja, meine Betonung lag nur deshalb mehr auf zusätzliches Verwenden von CS, weil der Verzicht auf jegliche Semaphores nur dann empfehlenswert ist, wenn die Hauptaufgabe des Rechners Deine Anwendung mit all seinen Threads sein soll. =) (SMP/NUMA läßt grüßen)

zeckensack
2004-02-10, 20:22:29
Original geschrieben von crushinator
Ja, meine Betonung lag nur deshalb mehr auf zusätzliches Verwenden von CS, weil der Verzicht auf jegliche Semaphores nur dann empfehlenswert ist, wenn die Hauptaufgabe des Rechners Deine Anwendung mit all seinen Threads sein soll. =) (SMP/NUMA läßt grüßen) Guter Einwand. Ich glaube aber das wird schon klappen, da ich bei FIFO-Problemchen jetzt statt WaitForGrrrmmmppf Sleep(0) benutze :)
volatile uint commands_in_pipe=0;

<...>

DWORD WINAPI
WorkerThreadFunc(void* param)
{
printf("Worker thread started\n");

bool end_signal=false;

while (!end_signal)
{
while (commands_in_pipe==0)
{
Sleep(0);
}
uint command=thread_pipe.command[receive_offset];

if (command==0)
{
++commands_received;
end_signal=true;
}
else
if (command==1)
{
++commands_received;
// printf("%c",thread_pipe.command_data[receive_offset]);
}
++receive_offset;
if (receive_offset==channels) receive_offset=0;

EnterCriticalSection(&cs);
--commands_in_pipe;
LeaveCriticalSection(&cs);
}
printf("Thread terminates\n");
return(0);
}

void
send_command(uint command,uint command_data)
{
while (commands_in_pipe==channels)
{
Sleep(0);
}

thread_pipe.command[send_offset]=command;
thread_pipe.command_data[send_offset]=command_data;
++send_offset;
if (send_offset==channels) send_offset=0;

EnterCriticalSection(&cs);
++commands_in_pipe;
LeaveCriticalSection(&cs);
}Also bis hierhin brauche ich noch keine Spinlocks, wenn du das meintest ;)

Gast
2006-04-24, 08:21:30
InterlockedIncrement / InterlockedDecrement

Gast
2006-04-24, 14:21:57
InterlockedIncrement / InterlockedDecrementDu bist zwei Jahre zu spät dran ;)

-zecki