默认文件格式是Parquet with spark.read ..和文件读取csv,说明你获得异常的原因。使用您尝试使用的api指定csv格式
加载CSV文件并将结果作为DataFrame返回。
df=sparksession.read.option("header", true).csv("file_name.csv")
Dataframe将文件视为csv格式。
Penny的Spark 2示例是在spark2中实现它的方法。还有一个技巧:通过设置选项,通过对数据进行初始扫描为您生成标头 inferSchema 至 true
inferSchema
true
然后,在这里,假设 spark 是您设置的火花会话,是加载在S3上的亚马逊主机的所有Landsat图像的CSV索引文件中的操作。
spark
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ val csvdata = spark.read.options(Map( "header" -> "true", "ignoreLeadingWhiteSpace" -> "true", "ignoreTrailingWhiteSpace" -> "true", "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ", "inferSchema" -> "true", "mode" -> "FAILFAST")) .csv("s3a://landsat-pds/scene_list.gz")
坏消息是:这会触发扫描文件;对于像这个20 + MB压缩CSV文件那样大的东西,在长途连接上可能需要30秒。请记住:一旦进入架构,最好手动编写架构编码。
(代码片段Apache软件许可证2.0被授权以避免所有歧义;我作为S3集成的演示/集成测试所做的事情)
如果您正在使用scala 2.11和Apache 2.0或更高版本构建jar。
没有必要创建一个 sqlContext 要么 sparkContext 宾语。只是一个 SparkSession 对象满足所有需求的要求。
sqlContext
sparkContext
SparkSession
以下是mycode工作正常:
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession} import org.apache.log4j.{Level, LogManager, Logger} object driver { def main(args: Array[String]) { val log = LogManager.getRootLogger log.info("**********JAR EXECUTION STARTED**********") val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate() val df = spark.read.format("csv") .option("header", "true") .option("delimiter","|") .option("inferSchema","true") .load("d:/small_projects/spark/test.pos") df.show() } }
如果你在集群中运行只是改变 .master("local") 至 .master("yarn") 在定义时 sparkBuilder 宾语
.master("local")
.master("yarn")
sparkBuilder
Spark Doc涵盖了这个: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
解析CSV文件存在很多挑战,如果文件大小较大,如果列值中存在非英语/转义/分隔符/其他字符,则可能会导致解析错误。
然后神奇的是在使用的选项中。那些适合我和希望的应该覆盖大多数边缘情况的代码如下:
### Create a Spark Session spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate() ### Note the options that are used. You may have to tweak these in case of error html_df = spark.read.csv(html_csv_file_path, header=True, multiLine=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, encoding="UTF-8", sep=',', quote='"', escape='"', maxColumns=2, inferSchema=True)
希望有所帮助。更多参考: 使用PySpark 2读取具有HTML源代码的CSV
注意:上面的代码来自Spark 2 API,其中CSV文件读取API与Spark可安装的内置包捆绑在一起。
注意:PySpark是Spark的Python包装器,与Scala / Java共享相同的API。
spark-csv是Spark核心功能的一部分,不需要单独的库。 所以你可以这样做
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
在Java 1.8中此代码片段完美地用于读取CSV文件
的pom.xml
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.10</artifactId> <version>1.4.0</version> </dependency>
Java的
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); // create Spark Context SparkContext context = new SparkContext(conf); // create spark Session SparkSession sparkSession = new SparkSession(context); Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); System.out.println("========== Print Schema ============"); df.printSchema(); System.out.println("========== Print Data =============="); df.show(); System.out.println("========== Print title =============="); df.select("title").show();