spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset
本文講解Spark的結(jié)構(gòu)化數(shù)據(jù)處理,主要包括:Spark SQL、DataFrame、Dataset以及Spark SQL服務(wù)等相關(guān)內(nèi)容。本文主要講解Spark 1.6.x的結(jié)構(gòu)化數(shù)據(jù)處理相關(guān)東東,但因Spark發(fā)展迅速(本文的寫作時值Spark 1.6.2發(fā)布之際,并且Spark 2.0的預(yù)覽版本也已發(fā)布許久),因此請隨時關(guān)注Spark SQL官方文檔以了解最新信息。
文中使用Scala對Spark SQL進(jìn)行講解,并且代碼大多都能在spark-shell中運(yùn)行,關(guān)于這點(diǎn)請知曉。
概述
相比于Spark RDD API,Spark SQL包含了對結(jié)構(gòu)化數(shù)據(jù)和在其上的運(yùn)算的更多信息,Spark SQL使用這些信息進(jìn)行了額外的優(yōu)化,使對結(jié)構(gòu)化數(shù)據(jù)的操作更加高效和方便。
有多種方式去使用Spark SQL,包括SQL、DataFrames API和Datasets API。但無論是哪種API或者是編程語言,它們都是基于同樣的執(zhí)行引擎,因此你可以在不同的API之間隨意切換,它們各有各的特點(diǎn),看你喜歡。
SQL
使用Spark SQL的一種方式就是通過SQL語句來執(zhí)行SQL查詢。當(dāng)在編程語言中使用SQL時,其返回結(jié)果將被封裝為一個DataFrame。
DataFrame
DataFrame是一個分布式集合,其中數(shù)據(jù)被組織為命名的列。它概念上等價于關(guān)系數(shù)據(jù)庫中的表,但底層做了更多的優(yōu)化。DataFrame可以從很多數(shù)據(jù)源構(gòu)建,比如:已經(jīng)存在的RDD、結(jié)構(gòu)化文件、外部數(shù)據(jù)庫、Hive表。
DataFrame的前身是SchemaRDD,從Spark 1.3.0開始SchemaRDD更名為DataFrame。與SchemaRDD的主要區(qū)別是:DataFrame不再直接繼承自RDD,而是自己實(shí)現(xiàn)了RDD的絕大多數(shù)功能。你仍舊可以在DataFrame上調(diào)用.rdd方法將其轉(zhuǎn)換為一個RDD。RDD可看作是分布式的對象的集合,Spark并不知道對象的詳細(xì)模式信息,DataFrame可看作是分布式的Row對象的集合,其提供了由列組成的詳細(xì)模式信息,使得Spark SQL可以進(jìn)行某些形式的執(zhí)行優(yōu)化。DataFrame和普通的RDD的邏輯框架區(qū)別如下所示:

但是在有些情況下RDD可以表達(dá)的邏輯用DataFrame無法表達(dá),所以后續(xù)提出了Dataset API,Dataset結(jié)合了RDD和DataFrame的好處。
關(guān)于Tungsten優(yōu)化可以參見:Project Tungsten:讓Spark將硬件性能壓榨到極限
Dataset
Dataset是Spark 1.6新添加的一個實(shí)驗(yàn)性接口,其目的是想結(jié)合RDD的好處(強(qiáng)類型(這意味著可以在編譯時進(jìn)行類型安全檢查)、可以使用強(qiáng)大的lambda函數(shù))和Spark SQL的優(yōu)化執(zhí)行引擎的好處。可以從JVM對象構(gòu)造出Dataset,然后使用類似于RDD的函數(shù)式轉(zhuǎn)換算子(map/flatMap/filter等)對其進(jìn)行操作。
Dataset通過Encoder實(shí)現(xiàn)了自定義的序列化格式,使得某些操作可以在無需解序列化的情況下直接進(jìn)行。另外Dataset還進(jìn)行了包括Tungsten優(yōu)化在內(nèi)的很多性能方面的優(yōu)化。
實(shí)際上Dataset是包含了DataFrame的功能的,這樣二者就出現(xiàn)了很大的冗余,故Spark 2.0將二者統(tǒng)一:保留Dataset API,把DataFrame表示為Dataset[Row],即Dataset的子集。
API進(jìn)化
Spark在迅速的發(fā)展,從原始的RDD API,再到DataFrame API,再到Dataset的出現(xiàn),速度可謂驚人,執(zhí)行性能上也有了很大提升。
我們在使用API時,應(yīng)該優(yōu)先選擇DataFrame & Dataset,因?yàn)樗男阅芎芎茫乙院蟮膬?yōu)化它都可以享受到,但是為了兼容早期版本的程序,RDD API也會一直保留著。后續(xù)Spark上層的庫將全部會用 DataFrame & Dataset,比如MLlib、Streaming、Graphx等。
關(guān)于這三種API的更詳細(xì)的討論以及選擇參見:Apache Spark: RDD, DataFrame or Dataset?
入門
起點(diǎn)之SQLContext
要想使用Spark SQL,首先你得創(chuàng)建一個SQLContext對象,在這之前你只需要創(chuàng)建一個SparkContext就行了,如下:
另外,你也可以使用HiveContext,它是SQLContext的超集,提供了一些額外的功能:使用HiveQL解析器、訪問Hive用戶定義函數(shù)、從Hive表讀取數(shù)據(jù)。并且,你不需要安裝Hive就可以使用HiveContext。不過將來版本的Spark SQL可能會逐步縮小SQLContext和HiveContext之間的差距。
對于SQLContext,目前只有一個簡單的SQL語法解析器sql,而對于HiveContext,則可以使用hiveql和sql兩個解析器,默認(rèn)是hiveql,我們可以通過如下語句來修改默認(rèn)解析器:
不過就目前來說,HiveQL解析器更加完善,因此推薦使用HiveQL。
創(chuàng)建并使用DataFrame
通過SQLContext,應(yīng)用程序可以從已經(jīng)存在的RDD、結(jié)構(gòu)化文件、外部數(shù)據(jù)庫以及Hive表中創(chuàng)建出DataFrame。如下代碼從JSON文件(該文件可以從Spark發(fā)行包中找到)創(chuàng)建出一個DataFrame:
DataFrame提供了一個領(lǐng)域特定語言(DSL)以方便操作結(jié)構(gòu)化數(shù)據(jù)。下面是一些使用示例,更多更全的DataFrame操作參見Spark API文檔中的org.apache.spark.sql.DataFrame:
?
另外,org.apache.spark.sql.functions單例對象還包含了一些操作DataFrame的函數(shù),主要有這幾個方面:聚集操作、集合操作、字符串處理、排序、日期計(jì)算、通用數(shù)學(xué)函數(shù)、校驗(yàn)碼操作、窗口操作、用戶定義函數(shù)支持等等。
在程序中執(zhí)行SQL查詢
我們可以通過在程序中使用SQLContext.sql()來執(zhí)行SQL查詢,結(jié)果將作為一個DataFrame返回:
我們還可以將DataFrame注冊為一個臨時表,然后就可以在其上執(zhí)行SQL語句進(jìn)行查詢了,臨時表的生命周期在與之關(guān)聯(lián)的SQLContext結(jié)束生命之后結(jié)束。示例如下:
創(chuàng)建并使用Dataset
Dataset跟RDD相似,但是Dataset并沒有使用Java序列化庫和Kryo序列化庫,而是使用特定Encoder來序列化對象。encoder通常由代碼自動產(chǎn)生(在Scala中是通過隱式轉(zhuǎn)換產(chǎn)生),并且其序列化格式直接允許Spark執(zhí)行很多操作,比如:filtering、sorting、hashing,而不需要解序列化。
與RDD互操作
Spark SQL支持兩種不同的方法將RDD轉(zhuǎn)換為DataFrame。第一種方法使用反射去推斷包含特定類型對象的RDD的模式(schema),該方法可以使你的代碼更加精簡,不過要求在你寫Spark程序時已經(jīng)知道模式信息(比如RDD中的對象是自己定義的case class類型)。第二種方法通過一個編程接口,此時你需要構(gòu)造一個模式并將其應(yīng)用到一個已存在的RDD以將其轉(zhuǎn)換為DataFrame,該方法適用于在運(yùn)行時之前還不知道列以及列的類型的情況。
用反射推斷模式
Spark SQL的Scala接口支持將包含case class的RDD自動轉(zhuǎn)換為DataFrame。case class定義了表的模式,case class的參數(shù)名被反射讀取并成為表的列名。case class也可以嵌套或者包含復(fù)雜類型(如序列或者數(shù)組)。示例如下:
手動編程指定模式
當(dāng)case class不能提前定義時(比如記錄的結(jié)構(gòu)被編碼為字符串,或者當(dāng)文本數(shù)據(jù)集被解析時不同用戶需要映射不同的字段),可以通過下面三步來將RDD轉(zhuǎn)換為DataFrame:
從原始RDD創(chuàng)建得到一個包含Row對象的RDD。
創(chuàng)建一個與第1步中Row的結(jié)構(gòu)相匹配的StructType,以表示模式信息。
通過SQLContext.createDataFrame()將模式信息應(yīng)用到第1步創(chuàng)建的RDD上。
數(shù)據(jù)源
DataFrame可以當(dāng)作標(biāo)準(zhǔn)RDD進(jìn)行操作,也可以注冊為一個臨時表。將DataFrame注冊為一個臨時表,允許你在其上執(zhí)行SQL查詢。DataFrame接口可以處理多種數(shù)據(jù)源,Spark SQL也內(nèi)建支持了若干種極有用的數(shù)據(jù)源格式(包括json、parquet和jdbc,其中parquet是默認(rèn)格式)。此外,當(dāng)你使用SQL查詢這些數(shù)據(jù)源中的數(shù)據(jù)并且只用到了一部分字段時,Spark SQL可以智能地只掃描這些用到的字段。
通用加載/保存函數(shù)
DataFrameReader和DataFrameWriter中包好一些通用的加載和保存函數(shù),所有這些操作都將parquet格式作為默認(rèn)數(shù)據(jù)格式。示例如下:
手動指定選項(xiàng)
你也可以手動指定數(shù)據(jù)源和其他任何想要傳遞給數(shù)據(jù)源的選項(xiàng)。指定數(shù)據(jù)源通常需要使用數(shù)據(jù)源全名(如org.apache.spark.sql.parquet),但對于內(nèi)建數(shù)據(jù)源,你也可以使用它們的短名(json、parquet和jdbc)。并且不同的數(shù)據(jù)源類型之間都可以相互轉(zhuǎn)換。示例如下:
用SQL直接查詢文件
也可以不用read API加載一個文件到DataFrame然后查詢它,而是直接使用SQL語句在文件上查詢:
val df = sqlContext.sql(“SELECT * FROM parquet.`examples/src/main/resources/users.parquet`”)
保存到持久表
可以使用DataFrameWriter.saveAsTable()將一個DataFrame保存到一個持久化表,根據(jù)不同的設(shè)置,表可能被保存為Hive兼容格式,也可能被保存為Spark SQL特定格式,關(guān)于這一點(diǎn)請參見API文檔。
我們可以通過SQLContext.table()或DataFrameReader.table()來加載一個表并返回一個DataFrame。
Parquet文件
Parquet格式是被很多其他的數(shù)據(jù)處理系統(tǒng)所支持的列式數(shù)據(jù)存儲格式。它可以高效地存儲具有嵌套字段的記錄,并支持Spark SQL的全部數(shù)據(jù)類型。Spark SQL支持在讀寫Parquet文件時自動地保存原始數(shù)據(jù)的模式信息。出于兼容性考慮,在寫Parquet文件時,所有列將自動轉(zhuǎn)換為nullable。
加載數(shù)據(jù)
下面是SQL示例:
分區(qū)發(fā)現(xiàn)
在很多系統(tǒng)中(如Hive),表分區(qū)是一個通用的優(yōu)化方法。在一個分區(qū)的表中,數(shù)據(jù)通常存儲在不同的目錄中,列名和列值通常被編碼在分區(qū)目錄名中以區(qū)分不同的分區(qū)。Parquet數(shù)據(jù)源能夠自動地發(fā)現(xiàn)和推斷分區(qū)信息。 如下是人口分區(qū)表目錄結(jié)構(gòu),其中g(shù)ender和country是分區(qū)列:
當(dāng)使用SQLContext.read.parquet 或 SQLContext.read.load讀取path/to/table時,Spark SQL能夠自動從路徑中提取分區(qū)信息,返回的DataFrame的模式信息如下:
上述模式中,分區(qū)列的數(shù)據(jù)類型被自動推斷。目前,支持的數(shù)據(jù)類型有數(shù)字類型和字符串類型。如果你不想數(shù)據(jù)類型被自動推斷,可以配置spark.sql.sources.partitionColumnTypeInference.enabled,默認(rèn)為true,如果設(shè)置為false,將禁用自動類型推斷并默認(rèn)使用字符串類型。
從Spark 1.6.0開始,分區(qū)發(fā)現(xiàn)默認(rèn)只發(fā)現(xiàn)給定路徑下的分區(qū)。如果用戶傳遞path/to/table/gender=male作為路徑讀取數(shù)據(jù),gender將不被作為一個分區(qū)列。你可以在數(shù)據(jù)源選項(xiàng)中設(shè)置basePath來指定分區(qū)發(fā)現(xiàn)應(yīng)該開始的基路徑。例如,還是將path/to/table/gender=male作為數(shù)據(jù)路徑,但同時設(shè)置basePath為path/to/table/,gender將被作為分區(qū)列。
模式合并
就像ProtocolBuffer、Avro和Thrift,Parquet也支持模式演化(schema evolution)。這就意味著你可以向一個簡單的模式逐步添加列從而構(gòu)建一個復(fù)雜的模式。這種方式可能導(dǎo)致模式信息分散在不同的Parquet文件中,Parquet數(shù)據(jù)源能夠自動檢測到這種情況并且合并所有這些文件中的模式信息。
但是由于模式合并是相對昂貴的操作,并且絕大多數(shù)情況下不是必須的,因此從Spark 1.5.0開始缺省關(guān)閉模式合并。開啟方式:在讀取Parquet文件時,設(shè)置數(shù)據(jù)源選項(xiàng)mergeSchema為true,或者設(shè)置全局的SQL選項(xiàng)spark.sql.parquet.mergeSchema為true。示例如下:
JSON數(shù)據(jù)
SQLContext.read.json()可以將RDD[String]或者JSON文件加載并轉(zhuǎn)換為一個DataFrame。
有一點(diǎn)需要注意的是:這里用的JSON文件并不是隨意的典型的JSON文件,每一行必須是一個有效的JSON對象,如果一個對象跨越多行將導(dǎo)致失敗。對于RDD[String]也是一樣。
下面是SQL示例:
數(shù)據(jù)庫
Spark SQL也可以使用JDBC從其他數(shù)據(jù)庫中讀取數(shù)據(jù),你應(yīng)該優(yōu)先使用它而不是JdbcRDD,因?yàn)樗鼘⒎祷氐臄?shù)據(jù)作為一個DataFrame,所以很方便操作。但請注意:這和Spark SQL JDBC服務(wù)是不同的,Spark SQL JDBC服務(wù)允許其他應(yīng)用通過JDBC連接到Spark SQL進(jìn)行查詢。
要在Spark SQL中連接到指定數(shù)據(jù)庫,首先需要通過環(huán)境變量SPARK_CLASSPATH設(shè)置你的數(shù)據(jù)庫的JDBC驅(qū)動的路徑。例如在spark-shell中連接MySQL數(shù)據(jù)庫,你可以使用如下命令:
可以通過SQLContext.read.format(“jdbc”).options(…).load()或SQLContext.read.jdbc(…)從數(shù)據(jù)庫中加載數(shù)據(jù)到DataFrame。如下示例,我們從MySQL數(shù)據(jù)庫中加載數(shù)據(jù):
下面是SQL示例:
JDBC驅(qū)動需要在所有節(jié)點(diǎn)的相同路徑下都存在,因?yàn)槭欠植际教幚砺铮拖馭park核心一樣。
有些數(shù)據(jù)庫需要使用大寫來引用相應(yīng)的名字,好像Oracle中就需要使用大寫的表名。
分布式SQL引擎
Spark SQL也可以扮演一個分布式SQL引擎的角色,你可以使用JDBC/ODBC或者Spark SQL命令行接口連接到它,并直接執(zhí)行交互式SQL查詢。
運(yùn)行Thrift JDBC/ODBC Server
Spark SQL中實(shí)現(xiàn)的Thrift JDBC/ODBC Server跟Hive中的HiveServer2相一致。可以使用如下命令開啟JDBC/ODBC服務(wù),缺省情況下的服務(wù)監(jiān)聽地址為localhost:10000:
這個腳本不僅可以接受spark-submit命令可以接受的所有選項(xiàng),還支持–hiveconf 屬性=值選項(xiàng)來配置Hive屬性。你可以執(zhí)行./sbin/start-thriftserver.sh –help來查看完整的選項(xiàng)列表。
你可以使用beeline連接到上面已經(jīng)開啟的Spark SQL引擎。命令如下:
連接到beeline需要輸入用戶名和密碼。在非安全模式下,簡單地輸入你自己的用戶名就可以了,密碼可以為空。對于安全模式,參見beeline文檔。
運(yùn)行Spark SQL CLI
Spark SQL CLI的引入使得在Spark SQL中可方便地通過Hive metastore對Hive進(jìn)行查詢。目前為止還不能使用Spark SQL CLI與Thrift JDBC/ODBC Server進(jìn)行交互。這個腳本主要對本地開發(fā)比較有用,在共享的集群上,你應(yīng)該讓各用戶連接到Thrift JDBC/ODBC Server。
使用Spark SQL CLI前需要注意:
將hive-site.xml配置文件拷貝到$SPARK_HOME/conf目錄下。
需要在./conf/spark-env.sh中的SPARK_CLASSPATH添加jdbc驅(qū)動的jar包。
啟動Spark SQL CLI的命令如下:
在啟動spark-sql時,如果不指定master,則以local的方式運(yùn)行,master既可以指定standalone的地址,也可以指定yarn。當(dāng)設(shè)定master為yarn時(spark-sql –master yarn)時,可以通過http://master:8088 頁面監(jiān)控到整個job的執(zhí)行過程。如果在./conf/spark-defaults.conf中配置了spark master的話,那么在啟動spark-sql時就不需要再指定master了。spark-sql啟動之后就可以在其中敲擊SQL語句了。
關(guān)于Spark SQL CLI的可用命令和參數(shù),請敲擊./bin/spark-sql –help以查看。
性能調(diào)優(yōu)
緩存數(shù)據(jù)在內(nèi)存中
通過調(diào)用sqlContext.cacheTable(“tableName”)或dataFrame.cache(),Spark SQL可以將表以列式存儲緩存在內(nèi)存中。這樣的話,Spark SQL就可以只掃描那些使用到的列,并且將自動壓縮數(shù)據(jù)以減少內(nèi)存占用和GC開銷。你可以調(diào)用sqlContext.uncacheTable(“tableName”)將緩存的表從內(nèi)存中移除。另外,你也可以在SQL/HiveQL中使用CACHE tablename和UNCACHE tablename來緩存表和移除已緩存的表。
和緩存RDD時的動機(jī)一樣,如果想在同樣的數(shù)據(jù)上多次運(yùn)行任務(wù)或查詢,就應(yīng)該把這些數(shù)據(jù)表緩存起來。
將數(shù)據(jù)緩存在內(nèi)存中,同樣支持一些選項(xiàng),參見Spark SQL官方文檔性能調(diào)優(yōu)部分。
其他調(diào)優(yōu)相關(guān)參數(shù)
還有其他一些參數(shù)可以用于優(yōu)化查詢性能,參見Spark SQL官方文檔性能調(diào)優(yōu)部分。
via:CSDN
End.