我不确定这是否是Spark的错误处理Pipe()的方式不同,但我在JIRA上打开了一个类似的问题: https://issues.apache.org/jira/projects/SPARK/issues/SPARK-26101
现在问题。显然在YARN集群中Spark Pipe()要求一个容器,无论你的Hadoop是不安全还是由Kerberos保护,容器是否由用户运行都是有区别的 yarn/nobody 或启动容器的用户 your actual user 。
yarn/nobody
your actual user
使用Kerberos来保护您的Hadoop,或者如果您不想通过保护Hadoop,可以在YARN中设置两个配置,使用Linux用户/组来启动容器。 的 注意 强> , 你必须 的 分享 强> 相同 的 用户/组 强> 跨群集中的所有节点。否则,这将无效。 (也许使用LDAP / AD同步您的用户/组)
设置这些:
yarn.nodemanager.linux-container-executor.nonsecure-mode.limit-users = false yarn.nodemanager.container-executor.class = org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor
资源: https://hadoop.apache.org/docs/r2.7.4/hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html (即使在Hadoop 3.0中也是如此)
这个修复工作在Cloudera最新的CDH 5.15.1(yarn-site.xml): http://community.cloudera.com/t5/Batch-Processing-and-Workflow/YARN-force-nobody-user-on-all-jobs-and-so-they-fail/mp/82572/highlight/true# M3882
例:
val test = sc.parallelize(Seq("test user")).repartition(1) val piped = test.pipe(Seq("whoami")) val c = piped.collect() est: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at repartition at <console>:25 piped: org.apache.spark.rdd.RDD[String] = PipedRDD[5] at pipe at <console>:25 c: Array[String] = Array(maziyar)
这将返回 username 在设置了这些配置后,谁开始了Spark会话 yarn-site.xml 并同步所有节点中的所有用户/组。
username
yarn-site.xml