我有一个文件夹,里面有14个文件。我在集群上运行带有10个执行程序的spark-submit,它有资源管理器作为yarn。
我创建了我的第一个RDD:
JavaPairRDD<字符串,字符串> …
如果我们有每个文件的大小,那就更清楚了。但代码不会错。我根据spark代码库添加了这个答案
首先,所有, 的 maxSplitSize 强> 将计算取决于 的 目录大小 强> 和 的 分区 强> 传入 wholeTextFiles
wholeTextFiles
def setMinPartitions(context: JobContext, minPartitions: Int) { val files = listStatus(context).asScala val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minPartitions == 0) 1 else minPartitions)).toLong super.setMaxSplitSize(maxSplitSize) } // file: WholeTextFileInputFormat.scala
链接
按照 maxSplitSize splits(Spark中的分区)将从源中提取。
maxSplitSize
inputFormat.setMinPartitions(jobContext, minPartitions) val rawSplits = inputFormat.getSplits(jobContext).toArray // Here number of splits will be decides val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) } // file: WholeTextFileRDD.scala
更多信息,请访问 CombineFileInputFormat#getSplits 阅读文件和准备分裂的课程。
CombineFileInputFormat#getSplits
注意: 我提到了 的 Spark分区为MapReduce分裂 强> 在这里,作为Spark 从MapReduce借用输入和输出格式化程序
我提到了 的 Spark分区为MapReduce分裂 强> 在这里,作为Spark 从MapReduce借用输入和输出格式化程序