虽然我迟到了,但是为了将来的参考。在我们的项目中,我们正是这样做的,我更喜欢Sqoop而不是Spark。
原因:我使用Glue从Mysql读取数据到S3并且读取不是并行的(有AWS支持查看它,这就是Glue(使用Pyspark)工作的方式,但是一旦读取完成并且并行写入S3)。这样效率不高而且速度慢。要读取和写入S3的100GB数据需要1.5Hr。
因此我在EMR上启用了Sqoop并启用了Glue目录(所以hive Metastore在AWS上)我可以直接从Sqoop写入S3,这样可以更快地将100GB的数据读取需要20分钟。
您必须设置set hive.metastore.warehouse.dir = s3://如果您执行hive-import或直接写入,您应该会看到正在写入S3的数据。
您可以按如下方式创建配置单元外部表
create external table table_a ( siteid string, nodeid string, aggregation_type string ) PARTITIONED BY (day string) STORED AS PARQUET LOCATION 's3://mybucket/table_a';
然后,您可以运行以下命令将每天目录下存储的分区文件注册到HiveMatastore
MSCK REPAIR TABLE table_a;
现在,您可以通过配置单元查询访问您的文件。我们在项目中使用了这种方法并且运行良好。完成上述命令后,即可运行查询
select * from table_a where day='day_1';
希望这可以帮助。
-Ravi
Spark是一个非常好的实用工具。你很容易 连接到JDBC数据源 ,您可以通过指定凭据和S3路径(例如, Pyspark将数据帧保存到S3 )。
如果您使用AWS,Spark,Presto和Hive的最佳选择是使用AWS Glue Metastore。这是一个数据目录,它将您的s3对象注册为数据库中的表,并提供用于查找这些对象的API。
Q2的答案是肯定的,你可以有一个引用多个文件的表。如果您有分区数据,通常需要这样做。
Spark读取jdbc使用mutliple连接拉取数据。链接在这里
def jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): Construct a DataFrame representing the database table accessible via JDBC URL url named table. Partitions of the table will be retrieved in parallel based on the parameters passed to this function. Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems. url JDBC database url of the form jdbc:subprotocol:subname. table Name of the table in the external database. columnName the name of a column of integral type that will be used for partitioning. lowerBound the minimum value of columnName used to decide partition stride. upperBound the maximum value of columnName used to decide partition stride. numPartitions the number of partitions. This, along with lowerBound (inclusive), upperBound (exclusive), form partition strides for generated WHERE clause expressions used to split the column columnName evenly. When the input is less than 1, the number is set to 1. connectionProperties JDBC database connection arguments, a list of arbitrary string tag/value. Normally at least a "user" and "password" property should be included. "fetchsize" can be used to control the number of rows per fetch.DataFrame
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader
的 http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases 强>
创建具有分区列的hive表作为日期并保存并指定以下位置
create table table_name ( id int, dtDontQuery string, name string ) partitioned by (date string) Location s3://s3://mybucket/table_name/
在数据中添加一个名为date的列,并使用sysdate填充它。如果不需要,您无需添加列,我们可以填充该位置。但它也可以作为分析的审核列。 使用火花 dataframe.partitionBy(date).write.parquet.location(s3://mybucket/table_name/)
dataframe.partitionBy(date).write.parquet.location(s3://mybucket/table_name/)
每日表演 MSCK repair on the hive table 因此,新分区将添加到表中。
MSCK repair on the hive table
在非数字列上应用numPartitions是通过将该列的哈希函数创建为所需的连接数并使用该列