Location>code7788 >text

C# Concurrency Control Framework: Million Dispatches per Second in a Single-Threaded Environment

Popularity:697 ℃/2024-10-16 11:01:37
using System; using ; using ; using ; using ; using Go; namespace WorkerFlow { class Program { static shared_strand strand; static void Log(string msg) { ($"{("HH:mm:")} {msg}"); } static async Task Worker(string name, int time = 1000) { await (time); Log(name); } //1 A, B, C in serial order //A->B->C static async Task Worker1() { await Worker("A"); await Worker("B"); await Worker("C"); } //2 A, B, and C are all parallel and depend on the same strand (implicit parameter, all tasks that depend on the same strand are thread-safe) //A //B //C static async Task Worker2() { children = new (); (() => Worker("A")); (() => Worker("B")); (() => Worker("C")); await children.wait_all(); } //3 After A is executed, B and C are then run in parallel. // -->B // | //A-> // | // -->C static async Task Worker3() { await Worker("A"); children = new (); (() => Worker("B")); (() => Worker("C")); await children.wait_all(); } //4 After B and C are executed in parallel, then A is executed. //B-- // | // -->A // | //C-- static async Task Worker4() { children = new (); (() => Worker("B")); (() => Worker("C")); await children.wait_all(); await Worker("A"); } //5 After any one of B and C is executed, then A is executed. //B-- // | // >-->A // | //C-- static async Task Worker5() { children = new (); var B = (() => Worker("B", 1000)); var C = (() => Worker("C", 2000)); var task = await children.wait_any(); if (task == B) { Log("B Success"); } else { Log("C Success"); } await Worker("A"); } //6 Waiting for a specific task static async Task Worker6() { children = new (); var A = (() => Worker("A")); var B = (() => Worker("B")); await (A); } //7 Timeout for a specific task and then abort all tasks static async Task Worker7() { children = new (); var A = (() => Worker("A", 1000)); var B = (() => Worker("B", 2000)); if (await children.timed_wait(1500, A)) { Log("successes"); } else { Log("overtime pay"); } await (); } //8 Timeout for a group of tasks and then abort all tasks static async Task Worker8() { children = new (); (() => Worker("A", 1000)); (() => Worker("B", 2000)); var tasks = await children.timed_wait_all(1500); await (); Log($"Successful {}"); } //9 Overtime a group of tasks and then abort all tasks with in situ aftercare in aborted tasks static async Task Worker9() { children = new (); (() => Worker("A", 1000)); (async delegate () { try { await Worker("B", 2000); } catch (generator.stop_exception) { Log("B Suspended"); await (500); throw; } catch () { } }); var task = await children.timed_wait_all(1500); await (); Log($"Successful {}"); } //10 Nested tasks static async Task Worker10() { children = new (); (async delegate () { children1 = new (); (() => Worker("A")); (() => Worker("B")); await children1.wait_all(); }); (async delegate () { children1 = new (); (() => Worker("C")); (() => Worker("D")); await children1.wait_all(); }); await children.wait_all(); } //11 Nested aborts static async Task Worker11() { children = new (); (() => Worker("A", 1000)); (async delegate () { try { children1 = new (); (async delegate () { try { await Worker("B", 2000); } catch (generator.stop_exception) { Log("B suspended1"); await (500); throw; } catch () { } }); await children1.wait_all(); } catch (generator.stop_exception) { Log("B suspended2"); throw; } catch () { } }); await (1500); await (); } //12 Parallel execution and waiting for a set of time-consuming algorithms static async Task Worker12() { wait_group wg = new wait_group(); for (int i = 0; i < 2; i++) { (); int idx = i; var _ = (delegate () { try { Log($"Execution algorithm {idx}"); } finally { (); } }); } await (); Log("Execution algorithm completed"); } //13 Serial execution of time-consuming algorithms, time-consuming algorithms must be executed in a thread pool, otherwise scheduling that relies on the same strand will not be timely. static async Task Worker13() { for (int i = 0; i < 2; i++) { await generator.send_task(() => Log($"Execution of algorithm {i}")); } } static async Task MainWorker() { await Worker1(); await Worker2(); await Worker3(); await Worker4(); await Worker5(); await Worker6(); await Worker7(); await Worker8(); await Worker9(); await Worker10(); await Worker11(); await Worker12(); await Worker13(); } static void Main(string[] args) { work_service work = new work_service(); strand = new work_strand(work); (strand, MainWorker); (); (); } } }