数据与智能 本公众号关注大数据与人工智能技术。由一批具备多年实战经验的技术极客参与运营管理,持续输出大数据、数据分析、推荐系统、机器学习、人工智能等方向的原创文章,每周至少输出7篇精品原创。同时,我们会关注和分享大数据与人工智能行业动态。欢迎关注。
来源 | Learning Spark Lightning-Fast Data Analytics,Second Edition
作者 | Damji,et al.
翻译 | 吴邪 大数据4年从业经验,目前就职于广州一家互联网公司,负责大数据基础平台自研、离线计算&实时计算研究
校对 | gongyouliu
编辑 | auroral-L
全文共7361字,预计阅读时间40分钟。
第四章 Spark SQL 和 DataFrames:内置数据源简介
1. 在 Spark 应用程序中使用 Spark SQL
1.1 基本查询示例
2. SQL 表和视图
2.1 托管与非托管表(Managed Versus UnmanagedTables)
2.2 创建 SQL 数据库和表
2.3 创建视图
2.4 查看元数据
2.5 缓存 SQL 表
2.6 读取表写入DataFrame
在上一章中,我们解释了Spark结构化的演变及其合理性。特别是,我们讨论了Spark SQL引擎如何为高级DataFrame和Dataset API提供统一的接口。现在,我们将继续讨论DataFrame,并探讨其与Spark SQL的交互。
本章和下一章还将探讨Spark SQL如何与图4-1中所示的一些外部组件交互。
特别是Spark SQL:
- 提供了用于构建我们在第3章中探讨的高级结构化API的引擎。
- 可以读写各种格式的结构化数据(例如,JSON,Hive表,Parquet,Avro,ORC,CSV)。
- 使你可以使用JDBC / ODBC连接器从Tableau,Power BI,Talend等外部商业智能(BI)数据源或从Postgres和MySQL等RDBMS来查询数据。
- 提供与Spark应用程序中存储为数据库中表或视图的结构化数据进行交互的编程接口。
- 提供一个交互式shell程序,用于对结构化数据执行SQL查询。
- 支持ANSI SQL:2003标准的命令和HiveQL。
让我们从如何在Spark应用程序中使用Spark SQL开始入手。
1. 在Spark应用程序中使用Spark SQL
在Spark2.0中引入的SparkSession为使用结构化API编写Spark提供了一个统一的切入点。你可以使用SparkSession来调用Spark的功能:只需导入类并在代码中创建一个实例。
在SparkSession上使用sql()方法实例化spark执行SQL查询,例如spark.sql("SELECT * FROM myTableName")。以spark.sql这种方式执行的所有查询结果都会返回一个DataFrame,如果你需要,可以在该DataFrame上执行进一步的Spark操作----我们在第3章中探讨的那些操作以及在本章和下一章中将学到的方法。
1.1 基本查询示例
在本节中,我们将通过几个示例查询有关航空公司的航班准点性和航班延误原因的数据集,该数据集包含有关美国航班的数据,包括日期,延误,距离,始发地和目的地。它以CSV文件的形式提供,超过一百万条记录。通过定义schema,我们将数据读取到DataFrame并将该DataFrame注册为一个临时视图(稍后将在临时视图中进行更多介绍),以便我们可以使用SQL查询它。
代码段中提供了查询示例,而本书的GitHub repo中提供了包含此处介绍的所有代码的Python和Scala笔记(notebook)。这些示例将使你了解如何通过spark.sql编程接口在Spark应用程序中使用SQL。与声明性风格的DataFrame API相似,此接口允许你在Spark应用程序中查询结构化数据。
通常,在Standalone模式下的Spark应用程序中,你可以手动创建一个SparkSession实例,如以下示例所示。但是,在Spark Shell(或Databricks 笔记)中,默认为你创建了SparkSession,并赋值给变量spark,你可以通过spark变量进行访问。
接下来让我们开始将数据集读取到一个临时视图中:
// In Scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("SparkSQLExampleApp")
.getOrCreate()
// Path to data set
val csvFile="/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
// Read and create a temporary view
// Infer schema (note that for larger files you may want to specify the schema)
val df = spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(csvFile)
// Create a temporary view
df.createOrReplaceTempView("us_delay_flights_tbl")
# In Python
from pyspark.sql import SparkSession
# Create a SparkSession
spark = (SparkSession
.builder
.appName("SparkSQLExampleApp")
.getOrCreate())
# Path to data set
csv_file = "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
# Read and create a temporary view
# Infer schema (note that for larger files you
# may want to specify the schema)
df = (spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(csv_file))
df.createOrReplaceTempView("us_delay_flights_tbl")
如果要指定schema,则可以使用DDL格式的字符串。例如:
// In Scala
val schema = "date STRING, delay INT, distance INT,
origin STRING, destination STRING"
# In Python
schema = "`date` STRING, `delay` INT, `distance` INT,
`origin` STRING, `destination` STRING"
现在我们已经有了一个临时视图,我们可以使用Spark SQL执行SQL查询。这些查询与你可能针对MySQL或PostgreSQL数据库中的SQL表执行的查询没有什么不同。这里的重点是表明Spark SQL提供了一个符合ANSI:2003的SQL接口,并演示了SQL与DataFrames之间的相互可操作性。
美国航班延误数据集有五列:
- date列包含类似的字符串02190925。转换后,它映射到02-19 09:25 am。
- delay列以分钟为单位给出了计划的起飞时间与实际起飞时间之间的延迟。提早出发显示负数。
- distance列给出了从始发机场到目的地机场的距离(以英里为单位)。
- origin列包含始发国际航空运输协会机场代码。
- destination列包含目的地国际航空运输协会机场代码。
考虑到这一点,让我们尝试针对此数据集进行一些示例查询。
首先,我们将查找距离大于1000英里的所有航班:
spark.sql("""SELECT distance, origin, destination
FROM us_delay_flights_tbl WHERE distance > 1000
ORDER BY distance DESC""").show(10)
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
+--------+------+-----------+
only showing top 10 rows
结果显示,所有最长的航班都在檀香山(HNL)和纽约(JFK)之间。接下来,我们将查找出旧金山(SFO)和芝加哥(ORD)之间延迟超过两个小时的所有航班:
spark.sql("""SELECT date, delay, origin, destination
FROM us_delay_flights_tbl
WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD'
ORDER by delay DESC""").show(10)
+--------+-----+------+-----------+
|date |delay|origin|destination|
+--------+-----+------+-----------+
|02190925|1638 |SFO |ORD |
|01031755|396 |SFO |ORD |
|01022330|326 |SFO |ORD |
|01051205|320 |SFO |ORD |
|01190925|297 |SFO |ORD |
|02171115|296 |SFO |ORD |
|01071040|279 |SFO |ORD |
|01051550|274 |SFO |ORD |
|03120730|266 |SFO |ORD |
|01261104|258 |SFO |ORD |
+--------+-----+------+-----------+
only showing top 10 rows
看来这两个城市之间在不同的日期有很多明显的航班延误。(作为练习,将date列转换为可读的格式,并找出这些延迟最常见的日期或月份。思考这些延迟与冬季或假日有关吗?)
让我们尝试一个更复杂的查询,其中在SQL语句中使用CASE子句。在这个示例中,我们要标记所有美国航班,无论其始发地和目的地如何,以表明其经历的延误:超长延误(> 6小时),长延误(2–6小时)等。将这些人类可读的标签添加到名为的新列中Flight_Delays:
spark.sql("""SELECT delay, origin, destination,
CASE
WHEN delay > 360 THEN 'Very Long Delays'
WHEN delay > 120 AND delay < 360 THEN 'Long Delays'
WHEN delay > 60 AND delay < 120 THEN 'Short Delays'
WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'
WHEN delay = 0 THEN 'No Delays'
ELSE 'Early'
END AS Flight_Delays
FROM us_delay_flights_tbl
ORDER BY origin, delay DESC""").show(10)
+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|333 |ABE |ATL |Long Delays |
|305 |ABE |ATL |Long Delays |
|275 |ABE |ATL |Long Delays |
|257 |ABE |ATL |Long Delays |
|247 |ABE |DTW |Long Delays |
|247 |ABE |ATL |Long Delays |
|219 |ABE |ORD |Long Delays |
|211 |ABE |ATL |Long Delays |
|197 |ABE |DTW |Long Delays |
|192 |ABE |ORD |Long Delays |
+-----+------+-----------+-------------+
only showing top 10 rows
与DataFrame和Dataset API一样,通过spark.sql接口,你可以执行常见的数据分析操作,如我们在上一章中探讨的那样。该计算经过Spark SQL引擎相同的流程(见第三章中“Catalyst优化器”了解详细信息),最终得到相同的结果。
前面的所有三个SQL查询都可以用等效的DataFrame API查询表示。例如,第一个查询可以在Python DataFrame API中表示为:
# In Python
from pyspark.sql.functions import col, desc
(df.select("distance", "origin", "destination")
.where(col("distance") > 1000)
.orderBy(desc("distance"))).show(10)
# Or
(df.select("distance", "origin", "destination")
.where("distance > 1000")
.orderBy("distance", ascending=False).show(10))
这将产生与SQL查询相同的结果:
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
+--------+------+-----------+
only showing top 10 rows
作为练习,请尝试将其他两个SQL查询转换为DataFrame API的形式。
如这些示例所示,使用Spark SQL接口查询数据类似于用常规SQL查询关系数据库表。尽管查询是在SQL中进行的,但你会感觉到与第3章中遇到的DataFrame API操作在可读性和语义上的相似之处,我们将在下一章中进一步探讨。
为了使你能够如前面的示例中所示查询结构化数据,Spark在内存和磁盘中创建和管理视图和表的所有复杂性。这就引出了我们的下一个主题:如何创建和管理表和视图。
2. SQL表和视图
表中包含数据。与Spark中的每个表相关联的是其相关的元数据,它是有关表及其数据的信息:数据结构,描述,表名,数据库名,列名,分区,实际数据所在的物理位置等。所有这些存储在metastore中。
默认情况下,Spark使用位于/user/hive/warehouse的Apache Hive Metastore来保留关于表的所有元数据,而不是为Spark表提供单独的元存储。但是,你可以通过将Spark config的配置spark.sql.warehouse.dir设置为另一个目录来更改默认位置,该位置可以设置为本地目录或外部分布式存储。
2.1 托管与非托管表(Managed Versus UnmanagedTables)
Spark允许你创建两种类型的表:托管表和非托管表。对于托管表,Spark同时管理文件存储中的元数据和数据。这可以是本地文件系统,HDFS或对象存储,例如Amazon S3或Azure Blob。对于非托管表,Spark仅管理元数据,而你自己在外部数据源(例如Cassandra)中管理数据。
对于托管表,由于Spark可以管理所有内容,因此SQL命令如DROP TABLE table_name会将元数据和数据一起删除。对于非托管表,同一命令将仅删除元数据,而不删除实际数据。在下一节中,我们将介绍一些有关如何创建托管表和非托管表的示例。
2.2 创建SQL数据库和表
表驻留在数据库中。默认情况下,Spark在default数据库下创建表。要创建自己的数据库名称,可以从Spark应用程序或笔记执行SQL命令。使用美国航班延误数据集,让我们创建一个托管表和一个非托管表。首先,我们将创建一个名为learn_spark_db的数据库,并告诉Spark我们要使用该数据库:
// In Scala/Python
spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")
从这一点来看,我们在应用程序中执行的用于创建表的任何命令都会作用于learn_spark_db数据库下所有的表。
创建托管表
要在learn_spark_db数据库中创建托管表,可以执行如下SQL查询:
// In Scala/Python
spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING)")
你也可以使用DataFrame API进行相同的操作,如下所示:
# In Python
# Path to our US flight delays CSV file
csv_file = "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
# Schema as defined in the preceding example
schema="date STRING, delay INT, distance INT, origin STRING, destination STRING"
flights_df = spark.read.csv(csv_file, schema=schema)
flights_df.write.saveAsTable("managed_us_delay_flights_tbl")
这两个语句最后都能在learn_spark_db数据库中创建us_delay_flights_tbl托管表。
创建非托管表
相比之下,你可以从自己支持Spark应用读取的数据源(例如Parquet,CSV或JSON文件)创建,要从数据源(例如CSV文件)创建非托管表,请使用如下SQL:
spark.sql("""CREATE TABLE us_delay_flights_tbl(date STRING, delay INT,
distance INT, origin STRING, destination STRING)
USING csv OPTIONS (PATH
'/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')""")
在DataFrame API中使用:
(flights_df
.write
.option("path", "/tmp/data/us_flights_delay")
.saveAsTable("us_delay_flights_tbl"))
为了使你能够浏览这些示例,我们创建了Python和Scala示例笔记,你可以在本书的GitHub repo中找到这些笔记。
2.3 创建视图
除了创建表之外,Spark还可在现有表之上创建视图。视图可以是全局视图(SparkSession在给定集群的所有节点上可见)或会话范围视图(仅对单个SparkSession可见),并且它们是临时的:会随着Spark应用程序终止而被回收。创建视图的语法与在数据库中创建表的语法相似。创建视图后,就可以像查询表一样对其进行查询。视图和表之间的区别在于,视图实际上并不保存数据。Spark应用程序终止后,表仍然存在,但视图会被回收。你可以使用SQL从现有表创建视图。例如,如果你只希望使用纽约(JFK)和旧金山(SFO)的始发机场处理美国航班延误数据集的子集,则以下查询将创建仅由该切片组成的全局临时视图和临时视图表:
-- In SQL
CREATE OR REPLACE GLOBAL TEMP VIEW us_origin_airport_SFO_global_tmp_view AS
SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE
origin = 'SFO';
CREATE OR REPLACE TEMP VIEW us_origin_airport_JFK_tmp_view AS
SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE
origin = 'JFK'
你也可以使用DataFrame API完成相同的操作,如下所示:
# In Python
df_sfo = spark.sql("SELECT date, delay, origin, destination FROM
us_delay_flights_tbl WHERE origin = 'SFO'")
df_jfk = spark.sql("SELECT date, delay, origin, destination FROM
us_delay_flights_tbl WHERE origin = 'JFK'")
# Create a temporary and global temporary view
df_sfo.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view")
df_jfk.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")
一旦创建了这些视图,就可以像对表一样对它们执行查询。请记住,访问全局临时视图时,必须使用前缀,如global_temp.<view_name>,因为Spark在名为的全局临时数据库(global_temp)中创建全局临时视图。
-- In SQL
SELECT * FROM global_temp.us_origin_airport_SFO_global_tmp_view
相比之下,你可以访问不带global_temp前缀的普通临时视图。
-- In SQL
SELECT * FROM us_origin_airport_JFK_tmp_view
// In Scala/Python
spark.read.table("us_origin_airport_JFK_tmp_view")
// Or
spark.sql("SELECT * FROM us_origin_airport_JFK_tmp_view")
你也可以像删除表一样删除视图:
-- In SQL
DROP VIEW IF EXISTS us_origin_airport_SFO_global_tmp_view;
DROP VIEW IF EXISTS us_origin_airport_JFK_tmp_view
// In Scala/Python
spark.catalog.dropGlobalTempView("us_origin_airport_SFO_global_tmp_view")
spark.catalog.dropTempView("us_origin_airport_JFK_tmp_view")
临时视图与全局临时视图
临时视图与全局临时视图之间的差异很微妙,这可能是新加入Spark的开发人员可能有困惑的地方。临时视图绑定到Spark应用程序中的单个Spark会话。相比之下,在Spark应用程序中的多个Spark会话可以看到全局临时视图。是的,你可以在单个Spark应用程序中创建多个SparkSession,这是很方便的,例如,当你要访问(并合并)来自两个不共享同一个Hive MetaStore配置的不同Spark会话的数据时可以这样做。
2.4 查看元数据
如前所述,Spark管理与每个托管或非托管表关联的元数据。这是Spark SQL中用于存储元数据的高级抽象Catalog的功能。Catalog是在Spark 2.x的扩展功能并使用了新的公共方法,使你能够检查与数据库、表和视图关联的元数据。Spark 3.0将其扩展为使用外部catalog(我们将在第12章中进行简要讨论)。例如,在Spark应用程序中,创建SparkSession之后赋值成变量spark,你可以通过以下方法访问所有存储的元数据:
// In Scala/Python
spark.catalog.listDatabases()
spark.catalog.listTables()
spark.catalog.listColumns("us_delay_flights_tbl")
从本书的GitHub仓库中导入笔记,然后尝试一下。
2.5 缓存SQL表
尽管我们将在下一章讨论表缓存策略,但是值得一提的是,像DataFrames一样,你可以缓存SQL表和视图和释放SQL表和视图缓存。在Spark 3.0中,除了其他选项之外,你还可以将表指定为LAZY,这意味着该表仅应在首次使用时进行缓存,而不是立即进行缓存:
-- In SQL
CACHE [LAZY] TABLE <table-name>
UNCACHE TABLE <table-name>
2.6 读取表写入DataFrame
通常,数据工程师会在其常规数据提取和ETL流程中建立数据管道。它们使用清理后的数据填充Spark SQL数据库和表,以供下游应用程序使用。
假设你已经可以使用现有的数据库learn_spark_db和表us_delay_flights_tbl。无需读取外部JSON文件,那么你只需使用SQL查询表并将返回的结果分配给DataFrame即可:
// In Scala
val usFlightsDF = spark.sql("SELECT * FROM us_delay_flights_tbl")
val usFlightsDF2 = spark.table("us_delay_flights_tbl")
# In Python
us_flights_df = spark.sql("SELECT * FROM us_delay_flights_tbl")
us_flights_df2 = spark.table("us_delay_flights_tbl")
现在,你已经从现有的Spark SQL表中生成了一个清洗过的DataFrame了。你还可以使用Spark的内置数据源读取其他格式的数据,从而使你可以灵活地与各种常见文件格式进行交互。