我已经更新了代码,工作台比以前更好。差异与List运算符有关。事实上,第一个版本使用append而不是prerend。由于List是链表,因此必须迭代元素才能添加新元素。懒惰,我想使用_运算符,但我不应该。
package benches import java.util.concurrent.TimeUnit import akka.NotUsed import akka.actor.ActorSystem import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape} import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source} import org.openjdk.jmh.annotations._ import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration import scala.collection.immutable.Seq @OutputTimeUnit(TimeUnit.MILLISECONDS) @BenchmarkMode(Array(Mode.SingleShotTime)) @Warmup(iterations = 20) @Measurement(iterations = 20) @Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g")) @Threads(1) class MappingBenchmark { import monix.eval._ import monix.reactive._ import monix.execution.Scheduler.Implicits.global def list: Seq[ClassA] = (1 to 10000).map(ClassA).toList // val l = (1 to 135368).map(Offre).toList // ##### SCALA ##### // def foldClassB = (l:List[ClassB], o:ClassB) => o +: l @Benchmark def map: Seq[ClassB] = list.map(o => ClassB(o, o)) @Benchmark def parMap: Seq[ClassB] = list.par.map(o => ClassB(o, o)).toList // ##### MONIX ##### // @Benchmark def monixTaskGather: Seq[ClassB] = { val task: Task[Seq[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o)))) Await.result(task.runAsync, Duration.Inf) } @Benchmark def monixBatchedObservables: Seq[ClassB] = { val task: Task[Seq[ClassB]] = Observable.fromIterable(list) .bufferIntrospective(256) .flatMap{items => val tasks = items.map(o => Task(ClassB(o,o))) val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b)) val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten) Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i)) }.consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB)) Await.result(task.runAsync, Duration.Inf) } @Benchmark def monixMapFoldLeft: Seq[ClassB] = { val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB)) Await.result(task.runAsync, Duration.Inf) } @Benchmark def monixMapFoldLeftAsync: Seq[ClassB] = { val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l))) Await.result(task.runAsync, Duration.Inf) } @Benchmark def monixMapAsyncFoldLeft: Seq[ClassB] = { val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB)) Await.result(task.runAsync, Duration.Inf) } @Benchmark def monixMapAsyncFoldLeftAsync: Seq[ClassB] = { val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l))) Await.result(task.runAsync, Duration.Inf) } // ##### AKKA-STREAM ##### // @Benchmark def akkaMapFold: Seq[ClassB] = { val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right) runAkkaGraph(graph) } @Benchmark def akkaMapFoldAsync: Seq[ClassB] = { val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right) runAkkaGraph(graph) } @Benchmark def akkaMapSeq: Seq[ClassB] = { val graph = Source(list).map(o => ClassB(o,o)).toMat(Sink.seq)(Keep.right) runAkkaGraph(graph) } @Benchmark def akkaMapAsyncFold: Seq[ClassB] = { def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right) runAkkaGraph(graph) } @Benchmark def akkaMapAsyncFoldAsync: Seq[ClassB] = { def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right) runAkkaGraph(graph) } @Benchmark def akkaMapAsyncSeq: Seq[ClassB] = { val graph = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).toMat(Sink.seq)(Keep.right) runAkkaGraph(graph) } @Benchmark def akkaLoadBalanceMap: Seq[ClassB] = { def graph: RunnableGraph[Future[Seq[ClassB]]] = { val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.fold(List[ClassB]())(foldClassB) RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder => sink => import GraphDSL.Implicits._ val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4)) val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4)) val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o)) Source(list) ~> balance (1 to 4).foreach{ i => balance ~> mapClassB.async ~> merge } merge ~> sink ClosedShape }) } runAkkaGraph(graph) } @Benchmark def akkaLoadBalanceMapSeq: Seq[ClassB] = { def graph: RunnableGraph[Future[Seq[ClassB]]] = { val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.seq RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder => sink => import GraphDSL.Implicits._ val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4)) val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4)) val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o)) Source(list) ~> balance (1 to 4).foreach{ i => balance ~> mapClassB.async ~> merge } merge ~> sink ClosedShape }) } runAkkaGraph(graph) } private def runAkkaGraph(g:RunnableGraph[Future[Seq[ClassB]]]): Seq[ClassB] = { implicit val actorSystem = ActorSystem("app") implicit val actorMaterializer = ActorMaterializer() val eventualBs = g.run() val res = Await.result(eventualBs, Duration.Inf) actorSystem.terminate() res } } case class ClassA(a:Int) case class ClassB(o:ClassA, o2:ClassA)
此更新类的结果是:
[info] Benchmark Mode Cnt Score Error Units [info] MappingBenchmark.akkaLoadBalanceMap ss 20 19,052 ?�C�� 3,779 ms/op [info] MappingBenchmark.akkaLoadBalanceMapSeq ss 20 16,115 ?�C�� 3,232 ms/op [info] MappingBenchmark.akkaMapAsyncFold ss 20 20,862 ?�C�� 3,127 ms/op [info] MappingBenchmark.akkaMapAsyncFoldAsync ss 20 26,994 ?�C�� 4,010 ms/op [info] MappingBenchmark.akkaMapAsyncSeq ss 20 19,399 ?�C�� 7,089 ms/op [info] MappingBenchmark.akkaMapFold ss 20 12,132 ?�C�� 4,111 ms/op [info] MappingBenchmark.akkaMapFoldAsync ss 20 22,652 ?�C�� 3,802 ms/op [info] MappingBenchmark.akkaMapSeq ss 20 10,894 ?�C�� 3,114 ms/op [info] MappingBenchmark.map ss 20 0,625 ?�C�� 0,193 ms/op [info] MappingBenchmark.monixBatchedObservables ss 20 9,175 ?�C�� 4,080 ms/op [info] MappingBenchmark.monixMapAsyncFoldLeft ss 20 11,724 ?�C�� 4,458 ms/op [info] MappingBenchmark.monixMapAsyncFoldLeftAsync ss 20 14,174 ?�C�� 6,962 ms/op [info] MappingBenchmark.monixMapFoldLeft ss 20 1,057 ?�C�� 0,960 ms/op [info] MappingBenchmark.monixMapFoldLeftAsync ss 20 9,638 ?�C�� 4,910 ms/op [info] MappingBenchmark.monixTaskGather ss 20 7,065 ?�C�� 2,428 ms/op [info] MappingBenchmark.parMap ss 20 1,392 ?�C�� 0,923 ms/op
如果我们可以在运行流之前使用scala进行映射似乎仍然更快。
关于异步处理/并行的一个注意事项...通常在并行处理内容时,最终会产生相当多的CPU限制开销来同步结果。
实际上,开销可能非常大,以至于它可以使从并行工作的多个CPU内核获得的时间增益无效。
你也应该熟悉 阿姆达尔定律 。看一下这些数字:75%的并行部分只需4个处理器即可达到最大加速。如果并行部分为50%,则只需2个处理器即可达到最大加速。
这只是理论上的限制,因为你也有处理器之间的共享内存同步,这可能会变得非常混乱;基本上,处理器针对顺序执行进引入并发问题,您需要强制使用内存屏障进行排序,从而使许多CPU优化无效。因此,您可以达到负加速,实际上可以在您的测试中看到。
所以你正在测试异步/并行映射,但测试基本上什么都不做,也可以用身份函数进行测试,这几乎是一回事。换句话说,您正在进行的测试及其结果非常多 在实践中没用 。
作为旁注,这也是我从不喜欢“并行收藏”这一概念的原因。这个概念是有缺陷的,因为你只能使用并行集合来处理纯粹的CPU绑定(即没有I / O,没有实际的异步内容),这可以说它可以做一些计算,除了:
换句话说,并行集合没有有效地使用硬件资源,因为它们完全忽略GPU支持,并且完全不适合混合CPU - I / O任务,因为它们缺乏异步支持。
我觉得有必要提一下这个,因为人们常常认为有些人会“揉搓” 平行 “他们的代码上的小精灵会使它运行得更快,但很多时候它不会。
当你有I / O绑定任务(当然与CPU绑定任务混合)时,并行性很有用,在这种情况下,CPU开销不太重要,因为处理时间将由I / O主导。
PS:Scala集合上的普通映射应该更快,因为它是严格的(取决于集合类型)它使用阵列支持的缓冲区,因此不会丢弃CPU缓存。 Monix的 .map 与Scala相同的开销 Iterable.map ,或者换句话说,接近零开销,但它的应用程序是懒惰的,并引入了一些拳击开销,我们无法摆脱,因为JVM不专门化泛型。
.map
Iterable.map
虽然在实践中该死的很快;-)