我用我的代码进行了一些实验并获得了适当的结果( 的 异步处理 强> )。我更改了连接链接(添加: Max Pool Size=250;Connection Timeout=60;Connection Lifetime=0;MultipleActiveResultSets=true )即我增加了大小 连接池 和 连接持续时间 。
Max Pool Size=250;Connection Timeout=60;Connection Lifetime=0;MultipleActiveResultSets=true
最大连接池大小(最大池大小)
连接池中的最小连接数(最小池大小)
保持连接池中连接的秒数 (连接寿命)(0是最大值)
的 小费 强> :过多的游泳池 Max Pool Size (默认为100)可以挂断你的服务器(我做:))
Max Pool Size
我也注意到我没有得到例外' 的 连接不支持MultipleActiveResultSets 强> '随着 'MultipleActiveResultSets=true' 在我的连接字符串中,但处理是 的 同步 强> 。你可以在上面阅读(MARS)
'MultipleActiveResultSets=true'
结论 :服务器上的并行执行不是MARS功能,MARS操作不是线程安全的。 MARS的目的不是删除应用程序中多个连接的所有要求。如果应用程序需要对服务器进行真正的并行命令执行, 的 应该使用多个连接 强> 。它通常用于这种原因
你有1500个左右的任务同时执行,并混合异步和阻塞调用(如 .Wait )这会导致死锁。
.Wait
使测试异步并尽量避免 async void 除非它在事件处理程序上。
async void
尝试按顺序迭代它们。这将花费更长的时间,但至少连接将被正确处理,以免使资源过载。您也可以考虑以合理大小的批次进行操作。
[TestMethod] public async Task ParallelSQLMethod() { //real amount is more then 1500 var keys = new long[] { 15072000173475, 15072000173571, //....., n }; var tasks = keys.Select(i => RunStoredProc(i)); var batchSize = 50; //Or smaller //run tasks in batches var sequence = tasks; while (sequence.Any()) { var batch = sequence.Take(batchSize); sequence = sequence.Skip(batchSize); await Task.WhenAll(batch); } }
我担心,我在这里看到了async / await / concurrent / threading的经典问题。测试有多个问题,我将逐一尝试。
1)测试案例架构。您不会告诉您正在编写的单元测试以及SQL服务器是否位于同一个框或不同的框中。
的 如果在同一个盒子上,我会选择Max(n_cores / 2,1)连接。 强>
的 如果不同的盒子,我会去1-3连接。 强>
并且可以根据存储过程行为,长/短计算,传输数据量,连接速度等来调整这些数字。
2)SQL连接并发问题。您无法打开一个连接,然后以某种方式尝试通过此连接同时调用1500个请求。实际上甚至不是两个在同一时间。
的 这就是它告诉你的:连接不支持MultipleActiveResultSets。 强>
您必须使用一个打开的连接,以便当时一个请求使用。
但!您不必仅将其用于一个请求并将其关闭,您可以在第一个请求完成后运行下一个请求,这将比关闭并创建新连接更快。您只需按顺序通过每个连接运行这些请求...
3)因此,正确的测试用例体系结构如下所示:
我非常喜欢使用并发/并行代码,但是在没有协调它们的情况下完成越来越多的任务并没有帮助加快速度,浪费资源......
4)示例:
[TestClass] public class UnitTestNomenclature { [TestMethod] public async Task ParallelSQLMethod() { long[] keys = new long[] { 15072000173475, 15072000173571 }; ConcurrentQueue<long> queue = new ConcurrentQueue<long>(keys); int connections = Math.Max(1, Environment.ProcessorCount / 2); Task[] tasks = Enumerable .Range(0, connections) .Select(i => Task.Run<Task>(() => RunConnection(i, queue)).Unwrap()) .ToArray() ; await Task.WhenAll(tasks); } public async Task RunConnection(int connection, ConcurrentQueue<long> queue) { using (SqlConnection conn = new SqlConnection(@"data source=SERVER;initial catalog=Db;integrated security=True;Trusted_Connection=Yes;")) { await conn.OpenAsync(); Debug.WriteLine($"====== Connection[{connection}] is open: ======"); Debug.WriteLine($"Connection[{connection}]: {conn.ClientConnectionId}"); Debug.WriteLine($"Connection[{connection}].State: {conn.State}"); long scollNumbParam; while (queue.TryDequeue(out scollNumbParam)) { await RunStoredProc(conn, connection, scollNumbParam); Debug.WriteLine($"Connection[{connection}]: {conn.ClientConnectionId}"); Debug.WriteLine($"Connection[{connection}].State: {conn.State}"); } } Debug.WriteLine($"====== Connection[{connection}] is closed ======"); } public async Task RunStoredProc(SqlConnection conn, int connection, long scollNumbParam) { const string strStoredProcName = @"[dbo].[sp]"; using (SqlCommand cmd = new SqlCommand(strStoredProcName, conn) { CommandTimeout = 120, CommandType = CommandType.StoredProcedure }) { SqlParameter scrParam = new SqlParameter() { ParameterName = "@KEYKRT", Value = scollNumbParam, SqlDbType = SqlDbType.BigInt }; cmd.Parameters.Add(scrParam); Debug.WriteLine($"Connection[{connection}] Start of Proccesing: " + scollNumbParam); await cmd.ExecuteNonQueryAsync(); Debug.WriteLine($"Connection[{connection}] End of Proccesing: " + scollNumbParam); } } }