您似乎正在尝试将规范化数据读入Scala对象树。你当然可以用Spark做到这一点,但Spark可能不是最好的工具。如果数据足够小以适应内存,我认为这是你的问题,对象关系映射(ORM)库可能更适合这项工作。
如果你仍然想要使用Spark,那么你就是在正确的道路上 groupBy 和 collect_list 。你缺少的是 struct() 功能。
groupBy
collect_list
struct()
case class Customer(id: Int) case class Invoice(id: Int, customer_id: Int) val customers = spark.createDataset(Seq(Customer(1))).as("customers") val invoices = spark.createDataset(Seq(Invoice(1, 1), Invoice(2, 1))) case class CombinedCustomer(id: Int, invoices: Option[Seq[Invoice]]) customers .join( invoices .groupBy('customer_id) .agg(collect_list(struct('*)).as("invoices")) .withColumnRenamed("customer_id", "id"), Seq("id"), "left_outer") .as[CombinedCustomer] .show
struct('*) 建立一个 StructType 整行的列。您也可以选择任何列,例如, struct('x.as("colA"), 'colB) 。
struct('*)
StructType
struct('x.as("colA"), 'colB)
这产生了
+---+----------------+ | id| invoices| +---+----------------+ | 1|[[1, 1], [2, 1]]| +---+----------------+
现在,在预期客户数据不适合存储器的情况下,即使用简单的情况 collect 不是一种选择,你可以采取许多不同的策略。
collect
最简单的,您应该考虑的而不是收集到驱动程序,要求对每个客户的数据进行独立处理是可以接受的。在这种情况下,请尝试使用 map 并将每个客户的处理逻辑分发给工人。
map
如果客户无法接受独立处理,则一般策略如下:
使用上述方法根据需要将数据聚合到结构化行中。
重新分区数据以确保处理所需的所有内容都在一个分区中。
(可选地) sortWithinPartitions 确保分区中的数据按需要排序。
sortWithinPartitions
使用 mapPartitions 。
mapPartitions
您可以使用Spark-SQL,并为客户,发票和项目分别提供一个数据集。 然后,您可以简单地在这些数据集之间使用连接和聚合函数来获得所需的输出。
Spark SQL在sql风格和编程方式之间具有非常微不足道的性能差异。