曰本美女∴一区二区特级A级黄色大片, 国产亚洲精品美女久久久久久2025, 页岩实心砖-高密市宏伟建材有限公司, 午夜小视频在线观看欧美日韩手机在线,国产人妻奶水一区二区,国产玉足,妺妺窝人体色WWW网站孕妇,色综合天天综合网中文伊,成人在线麻豆网观看

您好,歡迎來電子發(fā)燒友網(wǎng)! ,新用戶?[免費注冊]

您的位置:電子發(fā)燒友網(wǎng)>源碼下載>數(shù)值算法/人工智能>

如何使用Apache Spark中的DataSource API以實現(xiàn)數(shù)據(jù)源混合計算的實踐

大?。?/span>0.6 MB 人氣: 2017-10-10 需要積分:1

  本文主要介紹如何使用Apache Spark中的DataSource API以實現(xiàn)多個數(shù)據(jù)源混合計算的實踐,那么這么做的意義何在,其主要歸結(jié)于3個方面:

  首先,我們身邊存在大量的數(shù)據(jù),結(jié)構(gòu)化、非結(jié)構(gòu)化,各種各樣的數(shù)據(jù)結(jié)構(gòu)、格局格式,這種數(shù)據(jù)的多樣性本身即是大數(shù)據(jù)的特性之一,從而也決定了一種存儲方式不可能通吃所有。因此,數(shù)據(jù)本身決定了多種數(shù)據(jù)源存在的必然性。 其次:從業(yè)務(wù)需求來看,因為每天會開發(fā)各種各樣的應(yīng)用系統(tǒng),應(yīng)用系統(tǒng)中所遇到的業(yè)務(wù)場景是互不相同的,各種各樣的需求決定了目前市面上不可能有一種軟件架構(gòu)同時能夠解決這么多種業(yè)務(wù)場景,所以在數(shù)據(jù)存儲包括數(shù)據(jù)查詢、計算這一塊也不可能只有一種技術(shù)就能解決所有問題。最后,從軟件的發(fā)展來看,現(xiàn)在市面上出現(xiàn)了越來越多面對某一個細分領(lǐng)域的軟件技術(shù),比如像數(shù)據(jù)存儲、查詢搜索引擎,MPP數(shù)據(jù)庫,以及各種各樣的查詢引擎。這么多不同的軟件中,每一個軟件都相對擅長處理某一個領(lǐng)域的業(yè)務(wù)場景,只是涉及的領(lǐng)域大小不相同。因此,越來越多軟件的產(chǎn)生也決定了我們所接受的數(shù)據(jù)會存儲到越來越多不同的數(shù)據(jù)源。

  Apache Spark的多數(shù)據(jù)源方案

  傳統(tǒng)方案中,實現(xiàn)多數(shù)據(jù)源通常有兩種方案:冗余存儲,一份業(yè)務(wù)數(shù)據(jù)有多個存儲,或者內(nèi)部互相引用;集中的計算,不同的數(shù)據(jù)使用不同存儲,但是會在統(tǒng)一的地方集中計算,算的時候把這些數(shù)據(jù)從不同位置讀取出來。下面一起討論這兩種解決方案中存在的問題:

  如何使用Apache Spark中的DataSource API以實現(xiàn)數(shù)據(jù)源混合計算的實踐

  圖1 多數(shù)據(jù)源方案

  第一種方案中存在的一個問題是數(shù)據(jù)一致性,一樣的數(shù)據(jù)放在不同的存儲里面或多或少會有格式上的不兼容,或者查詢的差異,從而導(dǎo)致從不同位置查詢的數(shù)據(jù)可能出現(xiàn)不一致。比如有兩個報表相同的指標,但是因為是放在不同存儲里查出來的結(jié)果對不上,這點非常致命。第二個問題是存儲的成本,隨著存儲成本越來越低,這點倒是容易解決。

  第二種方案也存在兩個問題,其一是不同存儲出來的數(shù)據(jù)類型不同,從而在計算時需求相互轉(zhuǎn)換,因此如何轉(zhuǎn)換至關(guān)重要。第二個問題是讀取效率,需要高性能的數(shù)據(jù)抽取機制,盡量避免從遠端讀取不必要的數(shù)據(jù),并且需要保證一定的并發(fā)性。

  Spark在1.2.0版本首次發(fā)布了一個新的DataSourceAPI,這個API提供了非常靈活的方案,讓Spark可以通過一個標準的接口訪問各種外部數(shù)據(jù)源,目標是讓Spark各個組件以非常方便的通過SparkSQL訪問外部數(shù)據(jù)源。很顯然,Spark的DataSourceAPI其采用的是方案二,那么它是如何解決其中那個的問題的呢?

  如何使用Apache Spark中的DataSource API以實現(xiàn)數(shù)據(jù)源混合計算的實踐

  圖2 External Datasource API

  首先,數(shù)據(jù)類型轉(zhuǎn)換,Spark中定義了一個統(tǒng)一的數(shù)據(jù)類型標準,不同的數(shù)據(jù)源自己定義數(shù)據(jù)類型的轉(zhuǎn)換方法,這樣解決數(shù)據(jù)源之間相互類型轉(zhuǎn)換的問題;

  關(guān)于數(shù)據(jù)處理效率的問題,Spark定義了一個比較簡單的API的接口,主要有3個方式:

  1./* 全量數(shù)據(jù)抽取 */

  2.trait TableScan {

  3.def buildScan(): RDD[Row]

  4.}

  5.

  6./* 列剪枝數(shù)據(jù)抽取 */

  7.trait PrunedScan {

  8.def buildScan(requiredColumns: Array[String]): RDD[Row]

  9.}

  10.

  11./* 列剪枝+行過濾數(shù)據(jù)抽取 */

  12.trait PrunedFilteredScan {

  13.def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]

  14.}

  TableScan。這種方式需要將1TB的數(shù)據(jù)從數(shù)據(jù)抽取,再把這些數(shù)據(jù)傳到Spark中。在把這1TB的數(shù)據(jù)穿過網(wǎng)絡(luò)IO傳給Spark端之后,Spark還要逐行的進行過濾,從而消耗大量的計算資源,這是目前最低效的方式。

  PrunedScan。這個方式有一個好處是數(shù)據(jù)源只需要從磁盤讀取1TB的數(shù)據(jù),并只返回一些列的數(shù)據(jù),Spark不需要計算就可以使用1GB的數(shù)據(jù),這個過程中節(jié)省了大量的網(wǎng)絡(luò)IO。

  PrunedFilteredScan。它需要數(shù)據(jù)源既支持列過濾也支持行過濾,其好處是在磁盤IO這一層進行數(shù)據(jù)過濾,因此如果需要1GB數(shù)據(jù),可能只抽出2GB大小,經(jīng)過列過濾的規(guī)則再抽出1GB的數(shù)據(jù),隨后傳給Spark,因此這種數(shù)據(jù)源接口最高效,這也是目前市面上實現(xiàn)的最高效的數(shù)據(jù)接口。

  可直接使用的DataSource實現(xiàn)

  目前市面上可以找到的Spark DataSource實現(xiàn)代碼有三大類:Spark自帶;Spark Packages(http://Spark-packages.org/)網(wǎng)站中存放的第三方軟件包;跟隨其他項目一同發(fā)布的內(nèi)置的Spark的實現(xiàn)。這里介紹其中幾個:

  1.JDBCRelation

  1.private[sql] case class JDBCRelation(

  2.url: String,

  3.table: String,

  4.parts: Array[Partition],

  5.properties: Properties = new Properties())(@transient val sqlContext: SQLContext)

  6.extends BaseRelation

  7.with PrunedFilteredScan

  8.with InsertableRelation {

  9…。

  10.}

  以JDBC方式連接外部數(shù)據(jù)源在國內(nèi)十分流行,Spark也內(nèi)置了最高效的PrunedFilteredScan接口,同時還實現(xiàn)了數(shù)據(jù)插入的接口,使用起來非常方便,可以方便地把數(shù)據(jù)庫中的表用到Spark。以Postgres為例:

  1.sqlContext.read.jdbc(

  2.“jdbc:postgresql://testhost:7531/testdb”,

  3.“testTable”,

  4.“idField”, ——-索引列

  5.10000, ——-起始index

  6.1000000, ——-結(jié)束index

  7.10, ——-partition數(shù)量

  8.new Properties

  9.).registerTempTable(“testTable”)

  實現(xiàn)機制:默認使用單個Task從遠端數(shù)據(jù)庫讀取數(shù)據(jù),如果設(shè)定了partitionColumn、lowerBound、upperBound、numPartitions這4個參數(shù),那么還可以控制Spark把針對這個數(shù)據(jù)源的訪問任務(wù)進行拆分,得到numPartitions個任務(wù),每個Executor收到任務(wù)之后會并發(fā)的去連接數(shù)據(jù)庫的Server讀取數(shù)據(jù)。

  具體類型:PostgreSQL, MySQL。

  問題:在實際使用中需要注意一個問題,所有的Spark都會并發(fā)連接一個Server,并發(fā)過高時可能會對數(shù)據(jù)庫造成較大的沖擊(對于MPP等新型的關(guān)系型數(shù)據(jù)庫還好)。

  建議:個人感覺,JDBC的數(shù)據(jù)源適合從MPP等分布式數(shù)據(jù)庫中讀取數(shù)據(jù),對于傳統(tǒng)意義上單機的數(shù)據(jù)庫建議只處理一些相對較小的數(shù)據(jù)。

  2.HadoopFsRelation

  第二個在Spark內(nèi)置的數(shù)據(jù)源實現(xiàn),HadoopFs,也是實現(xiàn)中最高效的PrunedFilteredScan接口,使用起來相對來說比JDBC更方便。

  1.sqlContext

  2..read

  3..parquet(“hdfs://testFS/testPath”)

  4..registerTempTable(“test”)

  實現(xiàn)機制:執(zhí)行的時候Spark在Driver端會直接獲取列表,根據(jù)文件的格式類型和壓縮方式生成多個TASK,再把這些TASK分配下去。Executor端會根據(jù)文件列表訪問,這種方式訪問HDFS不會出現(xiàn)IO集中的地方,所以具備很好的擴展性,可以處理相當大規(guī)模的數(shù)據(jù)。

  具體類型:ORC,Parquet,JSon。

  問題:在實時場景下如果使用HDFS作為數(shù)據(jù)輸出的數(shù)據(jù)源,在寫數(shù)據(jù)就會產(chǎn)生非常大量零散的數(shù)據(jù),在HDFS上積累大量的零碎文件,就會帶來很大的壓力,后續(xù)處理這些小文件的時候也非常頭疼。

  建議:這種方式適合離線數(shù)據(jù)處理程序輸入和輸出數(shù)據(jù),還有一些數(shù)據(jù)處理Pipeline中的臨時數(shù)據(jù),數(shù)據(jù)量比較大,可以臨時放在HDFS。實時場景下不推薦使用HDFS作為數(shù)據(jù)輸出。

  3.ElasticSearch

  越來越多的互聯(lián)網(wǎng)公司開始使用ELK(ElasticSearch+LogStash+Kibana)作為基礎(chǔ)數(shù)據(jù)分析查詢的工具,但是有多少人知道其實ElasticSearch也支持在Spark中掛載為一個DataSource進行查詢呢?

  1.EsSparkSQL

  2..esDF(hc,indexName,esQuery)

  3..registerTempTable(”testTable”)

  實現(xiàn)機制:ES DataSource的實現(xiàn)機制是通過對esQuery進行解析,將實際要發(fā)往多個ES Nodes的請求分為多個Task,在每個Executor上并行執(zhí)行。

非常好我支持^.^

(0) 0%

不好我反對

(0) 0%

      發(fā)表評論

      用戶評論
      評價:好評中評差評

      發(fā)表評論,獲取積分! 請遵守相關(guān)規(guī)定!

      ?