欧美中文字幕第一页-欧美中文字幕一区-欧美中文字幕一区二区三区-欧美中文字幕在线-欧美中文字幕在线播放-欧美中文字幕在线视频

spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

我是創(chuàng)始人李巖:很抱歉!給自己產(chǎn)品做個廣告,點(diǎn)擊進(jìn)來看看。  

spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset 本文講解Spark的結(jié)構(gòu)化數(shù)據(jù)處理,主要包括:Spark SQL、DataFrame、Dataset以及Spark SQL服務(wù)等相關(guān)內(nèi)容。本文主要講解Spark 1.6.x的結(jié)構(gòu)化數(shù)據(jù)處理相關(guān)東東,但因Spark發(fā)展迅速(本文的寫作時值Spark 1.6.2發(fā)布之際,并且Spark 2.0的預(yù)覽版本也已發(fā)布許久),因此請隨時關(guān)注Spark SQL官方文檔以了解最新信息。

文中使用Scala對Spark SQL進(jìn)行講解,并且代碼大多都能在spark-shell中運(yùn)行,關(guān)于這點(diǎn)請知曉。

概述

相比于Spark RDD API,Spark SQL包含了對結(jié)構(gòu)化數(shù)據(jù)和在其上的運(yùn)算的更多信息,Spark SQL使用這些信息進(jìn)行了額外的優(yōu)化,使對結(jié)構(gòu)化數(shù)據(jù)的操作更加高效和方便。

有多種方式去使用Spark SQL,包括SQL、DataFrames API和Datasets API。但無論是哪種API或者是編程語言,它們都是基于同樣的執(zhí)行引擎,因此你可以在不同的API之間隨意切換,它們各有各的特點(diǎn),看你喜歡。

SQL

使用Spark SQL的一種方式就是通過SQL語句來執(zhí)行SQL查詢。當(dāng)在編程語言中使用SQL時,其返回結(jié)果將被封裝為一個DataFrame。

DataFrame

DataFrame是一個分布式集合,其中數(shù)據(jù)被組織為命名的列。它概念上等價于關(guān)系數(shù)據(jù)庫中的表,但底層做了更多的優(yōu)化。DataFrame可以從很多數(shù)據(jù)源構(gòu)建,比如:已經(jīng)存在的RDD、結(jié)構(gòu)化文件、外部數(shù)據(jù)庫、Hive表。

DataFrame的前身是SchemaRDD,從Spark 1.3.0開始SchemaRDD更名為DataFrame。與SchemaRDD的主要區(qū)別是:DataFrame不再直接繼承自RDD,而是自己實(shí)現(xiàn)了RDD的絕大多數(shù)功能。你仍舊可以在DataFrame上調(diào)用.rdd方法將其轉(zhuǎn)換為一個RDD。RDD可看作是分布式的對象的集合,Spark并不知道對象的詳細(xì)模式信息,DataFrame可看作是分布式的Row對象的集合,其提供了由列組成的詳細(xì)模式信息,使得Spark SQL可以進(jìn)行某些形式的執(zhí)行優(yōu)化。DataFrame和普通的RDD的邏輯框架區(qū)別如下所示:

spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset
DataFrame不僅比RDD有更加豐富的算子,更重要的是它可以進(jìn)行執(zhí)行計(jì)劃優(yōu)化(得益于Catalyst SQL解析器),另外Tungsten項(xiàng)目給DataFrame的執(zhí)行效率帶來了很大提升(不過Tungsten優(yōu)化也可能在后續(xù)開發(fā)中加入到RDD API中)。

但是在有些情況下RDD可以表達(dá)的邏輯用DataFrame無法表達(dá),所以后續(xù)提出了Dataset API,Dataset結(jié)合了RDD和DataFrame的好處。

關(guān)于Tungsten優(yōu)化可以參見:Project Tungsten:讓Spark將硬件性能壓榨到極限

Dataset

Dataset是Spark 1.6新添加的一個實(shí)驗(yàn)性接口,其目的是想結(jié)合RDD的好處(強(qiáng)類型(這意味著可以在編譯時進(jìn)行類型安全檢查)、可以使用強(qiáng)大的lambda函數(shù))和Spark SQL的優(yōu)化執(zhí)行引擎的好處。可以從JVM對象構(gòu)造出Dataset,然后使用類似于RDD的函數(shù)式轉(zhuǎn)換算子(map/flatMap/filter等)對其進(jìn)行操作。

Dataset通過Encoder實(shí)現(xiàn)了自定義的序列化格式,使得某些操作可以在無需解序列化的情況下直接進(jìn)行。另外Dataset還進(jìn)行了包括Tungsten優(yōu)化在內(nèi)的很多性能方面的優(yōu)化。

實(shí)際上Dataset是包含了DataFrame的功能的,這樣二者就出現(xiàn)了很大的冗余,故Spark 2.0將二者統(tǒng)一:保留Dataset API,把DataFrame表示為Dataset[Row],即Dataset的子集。

API進(jìn)化

Spark在迅速的發(fā)展,從原始的RDD API,再到DataFrame API,再到Dataset的出現(xiàn),速度可謂驚人,執(zhí)行性能上也有了很大提升。

我們在使用API時,應(yīng)該優(yōu)先選擇DataFrame & Dataset,因?yàn)樗男阅芎芎茫乙院蟮膬?yōu)化它都可以享受到,但是為了兼容早期版本的程序,RDD API也會一直保留著。后續(xù)Spark上層的庫將全部會用 DataFrame & Dataset,比如MLlib、Streaming、Graphx等。

關(guān)于這三種API的更詳細(xì)的討論以及選擇參見:Apache Spark: RDD, DataFrame or Dataset?

入門

起點(diǎn)之SQLContext

要想使用Spark SQL,首先你得創(chuàng)建一個SQLContext對象,在這之前你只需要創(chuàng)建一個SparkContext就行了,如下: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

另外,你也可以使用HiveContext,它是SQLContext的超集,提供了一些額外的功能:使用HiveQL解析器、訪問Hive用戶定義函數(shù)、從Hive表讀取數(shù)據(jù)。并且,你不需要安裝Hive就可以使用HiveContext。不過將來版本的Spark SQL可能會逐步縮小SQLContext和HiveContext之間的差距。

對于SQLContext,目前只有一個簡單的SQL語法解析器sql,而對于HiveContext,則可以使用hiveql和sql兩個解析器,默認(rèn)是hiveql,我們可以通過如下語句來修改默認(rèn)解析器: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

不過就目前來說,HiveQL解析器更加完善,因此推薦使用HiveQL。

創(chuàng)建并使用DataFrame

通過SQLContext,應(yīng)用程序可以從已經(jīng)存在的RDD、結(jié)構(gòu)化文件、外部數(shù)據(jù)庫以及Hive表中創(chuàng)建出DataFrame。如下代碼從JSON文件(該文件可以從Spark發(fā)行包中找到)創(chuàng)建出一個DataFrame: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

DataFrame提供了一個領(lǐng)域特定語言(DSL)以方便操作結(jié)構(gòu)化數(shù)據(jù)。下面是一些使用示例,更多更全的DataFrame操作參見Spark API文檔中的org.apache.spark.sql.DataFrame: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

?

另外,org.apache.spark.sql.functions單例對象還包含了一些操作DataFrame的函數(shù),主要有這幾個方面:聚集操作、集合操作、字符串處理、排序、日期計(jì)算、通用數(shù)學(xué)函數(shù)、校驗(yàn)碼操作、窗口操作、用戶定義函數(shù)支持等等。

在程序中執(zhí)行SQL查詢

我們可以通過在程序中使用SQLContext.sql()來執(zhí)行SQL查詢,結(jié)果將作為一個DataFrame返回: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

我們還可以將DataFrame注冊為一個臨時表,然后就可以在其上執(zhí)行SQL語句進(jìn)行查詢了,臨時表的生命周期在與之關(guān)聯(lián)的SQLContext結(jié)束生命之后結(jié)束。示例如下: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

創(chuàng)建并使用Dataset

Dataset跟RDD相似,但是Dataset并沒有使用Java序列化庫和Kryo序列化庫,而是使用特定Encoder來序列化對象。encoder通常由代碼自動產(chǎn)生(在Scala中是通過隱式轉(zhuǎn)換產(chǎn)生),并且其序列化格式直接允許Spark執(zhí)行很多操作,比如:filtering、sorting、hashing,而不需要解序列化。 spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

與RDD互操作

Spark SQL支持兩種不同的方法將RDD轉(zhuǎn)換為DataFrame。第一種方法使用反射去推斷包含特定類型對象的RDD的模式(schema),該方法可以使你的代碼更加精簡,不過要求在你寫Spark程序時已經(jīng)知道模式信息(比如RDD中的對象是自己定義的case class類型)。第二種方法通過一個編程接口,此時你需要構(gòu)造一個模式并將其應(yīng)用到一個已存在的RDD以將其轉(zhuǎn)換為DataFrame,該方法適用于在運(yùn)行時之前還不知道列以及列的類型的情況。

用反射推斷模式

Spark SQL的Scala接口支持將包含case class的RDD自動轉(zhuǎn)換為DataFrame。case class定義了表的模式,case class的參數(shù)名被反射讀取并成為表的列名。case class也可以嵌套或者包含復(fù)雜類型(如序列或者數(shù)組)。示例如下: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

手動編程指定模式

當(dāng)case class不能提前定義時(比如記錄的結(jié)構(gòu)被編碼為字符串,或者當(dāng)文本數(shù)據(jù)集被解析時不同用戶需要映射不同的字段),可以通過下面三步來將RDD轉(zhuǎn)換為DataFrame:

從原始RDD創(chuàng)建得到一個包含Row對象的RDD。

創(chuàng)建一個與第1步中Row的結(jié)構(gòu)相匹配的StructType,以表示模式信息。

通過SQLContext.createDataFrame()將模式信息應(yīng)用到第1步創(chuàng)建的RDD上。 spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

數(shù)據(jù)源

DataFrame可以當(dāng)作標(biāo)準(zhǔn)RDD進(jìn)行操作,也可以注冊為一個臨時表。將DataFrame注冊為一個臨時表,允許你在其上執(zhí)行SQL查詢。DataFrame接口可以處理多種數(shù)據(jù)源,Spark SQL也內(nèi)建支持了若干種極有用的數(shù)據(jù)源格式(包括json、parquet和jdbc,其中parquet是默認(rèn)格式)。此外,當(dāng)你使用SQL查詢這些數(shù)據(jù)源中的數(shù)據(jù)并且只用到了一部分字段時,Spark SQL可以智能地只掃描這些用到的字段。

通用加載/保存函數(shù)

DataFrameReader和DataFrameWriter中包好一些通用的加載和保存函數(shù),所有這些操作都將parquet格式作為默認(rèn)數(shù)據(jù)格式。示例如下: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

手動指定選項(xiàng)

你也可以手動指定數(shù)據(jù)源和其他任何想要傳遞給數(shù)據(jù)源的選項(xiàng)。指定數(shù)據(jù)源通常需要使用數(shù)據(jù)源全名(如org.apache.spark.sql.parquet),但對于內(nèi)建數(shù)據(jù)源,你也可以使用它們的短名(json、parquet和jdbc)。并且不同的數(shù)據(jù)源類型之間都可以相互轉(zhuǎn)換。示例如下: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

用SQL直接查詢文件

也可以不用read API加載一個文件到DataFrame然后查詢它,而是直接使用SQL語句在文件上查詢:

val df = sqlContext.sql(“SELECT * FROM parquet.`examples/src/main/resources/users.parquet`”)

保存到持久表

可以使用DataFrameWriter.saveAsTable()將一個DataFrame保存到一個持久化表,根據(jù)不同的設(shè)置,表可能被保存為Hive兼容格式,也可能被保存為Spark SQL特定格式,關(guān)于這一點(diǎn)請參見API文檔。

我們可以通過SQLContext.table()或DataFrameReader.table()來加載一個表并返回一個DataFrame。

Parquet文件

Parquet格式是被很多其他的數(shù)據(jù)處理系統(tǒng)所支持的列式數(shù)據(jù)存儲格式。它可以高效地存儲具有嵌套字段的記錄,并支持Spark SQL的全部數(shù)據(jù)類型。Spark SQL支持在讀寫Parquet文件時自動地保存原始數(shù)據(jù)的模式信息。出于兼容性考慮,在寫Parquet文件時,所有列將自動轉(zhuǎn)換為nullable。

加載數(shù)據(jù) spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

下面是SQL示例: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

分區(qū)發(fā)現(xiàn)

在很多系統(tǒng)中(如Hive),表分區(qū)是一個通用的優(yōu)化方法。在一個分區(qū)的表中,數(shù)據(jù)通常存儲在不同的目錄中,列名和列值通常被編碼在分區(qū)目錄名中以區(qū)分不同的分區(qū)。Parquet數(shù)據(jù)源能夠自動地發(fā)現(xiàn)和推斷分區(qū)信息。 如下是人口分區(qū)表目錄結(jié)構(gòu),其中g(shù)ender和country是分區(qū)列: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

當(dāng)使用SQLContext.read.parquet 或 SQLContext.read.load讀取path/to/table時,Spark SQL能夠自動從路徑中提取分區(qū)信息,返回的DataFrame的模式信息如下: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

上述模式中,分區(qū)列的數(shù)據(jù)類型被自動推斷。目前,支持的數(shù)據(jù)類型有數(shù)字類型和字符串類型。如果你不想數(shù)據(jù)類型被自動推斷,可以配置spark.sql.sources.partitionColumnTypeInference.enabled,默認(rèn)為true,如果設(shè)置為false,將禁用自動類型推斷并默認(rèn)使用字符串類型。

從Spark 1.6.0開始,分區(qū)發(fā)現(xiàn)默認(rèn)只發(fā)現(xiàn)給定路徑下的分區(qū)。如果用戶傳遞path/to/table/gender=male作為路徑讀取數(shù)據(jù),gender將不被作為一個分區(qū)列。你可以在數(shù)據(jù)源選項(xiàng)中設(shè)置basePath來指定分區(qū)發(fā)現(xiàn)應(yīng)該開始的基路徑。例如,還是將path/to/table/gender=male作為數(shù)據(jù)路徑,但同時設(shè)置basePath為path/to/table/,gender將被作為分區(qū)列。

模式合并

就像ProtocolBuffer、Avro和Thrift,Parquet也支持模式演化(schema evolution)。這就意味著你可以向一個簡單的模式逐步添加列從而構(gòu)建一個復(fù)雜的模式。這種方式可能導(dǎo)致模式信息分散在不同的Parquet文件中,Parquet數(shù)據(jù)源能夠自動檢測到這種情況并且合并所有這些文件中的模式信息。

但是由于模式合并是相對昂貴的操作,并且絕大多數(shù)情況下不是必須的,因此從Spark 1.5.0開始缺省關(guān)閉模式合并。開啟方式:在讀取Parquet文件時,設(shè)置數(shù)據(jù)源選項(xiàng)mergeSchema為true,或者設(shè)置全局的SQL選項(xiàng)spark.sql.parquet.mergeSchema為true。示例如下: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

JSON數(shù)據(jù)

SQLContext.read.json()可以將RDD[String]或者JSON文件加載并轉(zhuǎn)換為一個DataFrame。

有一點(diǎn)需要注意的是:這里用的JSON文件并不是隨意的典型的JSON文件,每一行必須是一個有效的JSON對象,如果一個對象跨越多行將導(dǎo)致失敗。對于RDD[String]也是一樣。 spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

下面是SQL示例: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

數(shù)據(jù)庫

Spark SQL也可以使用JDBC從其他數(shù)據(jù)庫中讀取數(shù)據(jù),你應(yīng)該優(yōu)先使用它而不是JdbcRDD,因?yàn)樗鼘⒎祷氐臄?shù)據(jù)作為一個DataFrame,所以很方便操作。但請注意:這和Spark SQL JDBC服務(wù)是不同的,Spark SQL JDBC服務(wù)允許其他應(yīng)用通過JDBC連接到Spark SQL進(jìn)行查詢。

要在Spark SQL中連接到指定數(shù)據(jù)庫,首先需要通過環(huán)境變量SPARK_CLASSPATH設(shè)置你的數(shù)據(jù)庫的JDBC驅(qū)動的路徑。例如在spark-shell中連接MySQL數(shù)據(jù)庫,你可以使用如下命令: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

可以通過SQLContext.read.format(“jdbc”).options(…).load()或SQLContext.read.jdbc(…)從數(shù)據(jù)庫中加載數(shù)據(jù)到DataFrame。如下示例,我們從MySQL數(shù)據(jù)庫中加載數(shù)據(jù): spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

下面是SQL示例: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

JDBC驅(qū)動需要在所有節(jié)點(diǎn)的相同路徑下都存在,因?yàn)槭欠植际教幚砺铮拖馭park核心一樣。

有些數(shù)據(jù)庫需要使用大寫來引用相應(yīng)的名字,好像Oracle中就需要使用大寫的表名。

分布式SQL引擎

Spark SQL也可以扮演一個分布式SQL引擎的角色,你可以使用JDBC/ODBC或者Spark SQL命令行接口連接到它,并直接執(zhí)行交互式SQL查詢。

運(yùn)行Thrift JDBC/ODBC Server

Spark SQL中實(shí)現(xiàn)的Thrift JDBC/ODBC Server跟Hive中的HiveServer2相一致。可以使用如下命令開啟JDBC/ODBC服務(wù),缺省情況下的服務(wù)監(jiān)聽地址為localhost:10000:

spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

這個腳本不僅可以接受spark-submit命令可以接受的所有選項(xiàng),還支持–hiveconf 屬性=值選項(xiàng)來配置Hive屬性。你可以執(zhí)行./sbin/start-thriftserver.sh –help來查看完整的選項(xiàng)列表。

你可以使用beeline連接到上面已經(jīng)開啟的Spark SQL引擎。命令如下: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

連接到beeline需要輸入用戶名和密碼。在非安全模式下,簡單地輸入你自己的用戶名就可以了,密碼可以為空。對于安全模式,參見beeline文檔。

運(yùn)行Spark SQL CLI

Spark SQL CLI的引入使得在Spark SQL中可方便地通過Hive metastore對Hive進(jìn)行查詢。目前為止還不能使用Spark SQL CLI與Thrift JDBC/ODBC Server進(jìn)行交互。這個腳本主要對本地開發(fā)比較有用,在共享的集群上,你應(yīng)該讓各用戶連接到Thrift JDBC/ODBC Server。

使用Spark SQL CLI前需要注意:

將hive-site.xml配置文件拷貝到$SPARK_HOME/conf目錄下。

需要在./conf/spark-env.sh中的SPARK_CLASSPATH添加jdbc驅(qū)動的jar包。

啟動Spark SQL CLI的命令如下: spark結(jié)構(gòu)化數(shù)據(jù)處理:Spark SQL、DataFrame和Dataset

在啟動spark-sql時,如果不指定master,則以local的方式運(yùn)行,master既可以指定standalone的地址,也可以指定yarn。當(dāng)設(shè)定master為yarn時(spark-sql –master yarn)時,可以通過http://master:8088 頁面監(jiān)控到整個job的執(zhí)行過程。如果在./conf/spark-defaults.conf中配置了spark master的話,那么在啟動spark-sql時就不需要再指定master了。spark-sql啟動之后就可以在其中敲擊SQL語句了。

關(guān)于Spark SQL CLI的可用命令和參數(shù),請敲擊./bin/spark-sql –help以查看。

性能調(diào)優(yōu)

緩存數(shù)據(jù)在內(nèi)存中

通過調(diào)用sqlContext.cacheTable(“tableName”)或dataFrame.cache(),Spark SQL可以將表以列式存儲緩存在內(nèi)存中。這樣的話,Spark SQL就可以只掃描那些使用到的列,并且將自動壓縮數(shù)據(jù)以減少內(nèi)存占用和GC開銷。你可以調(diào)用sqlContext.uncacheTable(“tableName”)將緩存的表從內(nèi)存中移除。另外,你也可以在SQL/HiveQL中使用CACHE tablename和UNCACHE tablename來緩存表和移除已緩存的表。

和緩存RDD時的動機(jī)一樣,如果想在同樣的數(shù)據(jù)上多次運(yùn)行任務(wù)或查詢,就應(yīng)該把這些數(shù)據(jù)表緩存起來。

將數(shù)據(jù)緩存在內(nèi)存中,同樣支持一些選項(xiàng),參見Spark SQL官方文檔性能調(diào)優(yōu)部分。

其他調(diào)優(yōu)相關(guān)參數(shù)

還有其他一些參數(shù)可以用于優(yōu)化查詢性能,參見Spark SQL官方文檔性能調(diào)優(yōu)部分。

via:CSDN

End.

隨意打賞

spark大數(shù)據(jù)非結(jié)構(gòu)化數(shù)據(jù)
提交建議
微信掃一掃,分享給好友吧。
主站蜘蛛池模板: 日韩免费一级毛片欧美一级日韩片 | 九一视频在线 | 精品国产调教最大网站女王 | 欧美国产中文字幕 | 欧洲成人免费高清视频 | 天天干在线影院 | 伊人久久精品一区二区三区 | 亚洲已满18点击进入在线观看 | 色片免费观看 | 国内精品视频 | 99久久免费国产香蕉麻豆 | 毛片大全免费 | 日韩免费成人 | 桃子在线观看 | 国产瑟瑟 | 日日摸日日碰夜夜97 | 亚洲成人网页 | 免费观看一区二区 | 毛片一级毛片 | 91精品日韩| 久久精品国产精品亚洲精品 | 色yeye成人免费视频 | 伊人久久成人成综合网222 | 久久精品亚洲热综合一本奇米 | 老色鬼a∨在线视频在线观看 | 在线播放国产福利视频 | 四虎在线最新地址公告 | 色婷婷免费视频 | 亚洲视频区 | 国内精品久久久久影院蜜芽 | 免费国产午夜在线观看 | 亚洲欧美综合乱码精品成人网 | 欧美一区二区三区视频在线观看 | 亚洲精品国产一区二区三区在 | 伊人操| 精品伦理 | 人人爱操 | 四虎影视永久免费观看 | 狠狠澡夜夜澡人人爽 | 精品视频一区二区三区在线播放 | 国产精品久久精品 |