分布式系統(tǒng)組件原理介紹_第1頁
分布式系統(tǒng)組件原理介紹_第2頁
分布式系統(tǒng)組件原理介紹_第3頁
分布式系統(tǒng)組件原理介紹_第4頁
分布式系統(tǒng)組件原理介紹_第5頁
已閱讀5頁,還剩27頁未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

分布式系統(tǒng)組件介紹

Hadoop原理

分為HDFS與Yarn兩個(gè)部分。HDFS有Namenode和Datanode兩個(gè)部分。每個(gè)節(jié)點(diǎn)占

用一個(gè)電腦。Datanode定時(shí)向Namenode發(fā)送心跳包,心跳包中包含Datanode的校驗(yàn)等信

息,用來監(jiān)控Datanode。HDFS將數(shù)據(jù)分為塊,默認(rèn)為64M每個(gè)塊信息按照配置的參數(shù)分

別備份在不同的Datanode,而數(shù)據(jù)塊在哪個(gè)節(jié)點(diǎn)上,這些信息都存儲(chǔ)到Narnenode上面cYarn

是MapReduce2,可以集成更多的組件,如spark>mpi等。MapReduce包括Jobtraker與

Tasktrakcr兩個(gè)部分。其中JobTrakcr是在主節(jié)點(diǎn)上,負(fù)責(zé)整體的調(diào)度。Tasktrakcr在slave

節(jié)點(diǎn)上,當(dāng)提交任務(wù)后,將其交給Jobtraker進(jìn)行調(diào)度,調(diào)度到該任務(wù)之后就會(huì)將jar包發(fā)送

到響應(yīng)的Tasktraker,以實(shí)現(xiàn)分布式中移動(dòng)計(jì)算資源而非移動(dòng)數(shù)據(jù)。因?yàn)檫@些任務(wù)都是并發(fā)

執(zhí)行的,所以每個(gè)任務(wù)需要調(diào)用哪個(gè)節(jié)點(diǎn)的數(shù)據(jù)必須非常的明確,通常這一部分是通過

Jobtraker進(jìn)行指揮。

在MapReduce的開始,每個(gè)block作為一個(gè)Map輸入,Map輸入后就進(jìn)入shuffle階段。

Shuffle階段主:要分為Map和Reduce兩個(gè)階段。在MapShuffle階段,Map輸入按用戶的程

序進(jìn)行處理,生成的結(jié)果按key排序,然后進(jìn)入內(nèi)存,溢出后形成一個(gè)spill寫入磁盤,這

樣多個(gè)spill在這個(gè)節(jié)點(diǎn)中進(jìn)行多路歸并排序(勝者樹)形成一個(gè)排序文件寫入磁盤,這期

間那些Map文件交給那些節(jié)點(diǎn)處理由Jobtraker進(jìn)行調(diào)度,最后生成一個(gè)大排序的文件,并

且刪除spilL之后,再將多個(gè)節(jié)點(diǎn)上已經(jīng)排序的文件進(jìn)行行多路歸并排序(一個(gè)大文件N

分到n個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)分為k個(gè)spill,?個(gè)spill長(zhǎng)度s,時(shí)間復(fù)雜度是N(logn(n個(gè)節(jié)點(diǎn)

多路歸并排序)+k)gk(每個(gè)節(jié)點(diǎn)內(nèi)k個(gè)spill排序)-logs(每個(gè)spill內(nèi)部進(jìn)行排序)),

N=nks,所以最后的復(fù)雜度亦是NlogN)。

完成MapShuffle階段后通知Jobtraker進(jìn)入ReduceShuffle階段。在這個(gè)階段,因?yàn)橐?/p>

經(jīng)排序,很容易將用戶的程序直接作用到相同key的數(shù)據(jù)上,這些數(shù)據(jù)作為Reduce的輸入

進(jìn)行處理,最終將輸出的結(jié)果數(shù)據(jù)寫入到HDFS上,并刪除磁盤數(shù)據(jù)。Map一般多,Reduce

少,所以通過Hash的方法將Map文件映射到Reduce上,進(jìn)行處理,這個(gè)階段叫做Pa(ition<,

為了避免譬如所有數(shù)據(jù)相加這種操作使得數(shù)據(jù)負(fù)載移動(dòng)的數(shù)量少的Reduce階段,造成效率

低下的結(jié)果,我們可以在在MapShuffle階段加一個(gè)Combine階段,這個(gè)Combine是在每一

臺(tái)節(jié)點(diǎn)上將已經(jīng)排序好的文件進(jìn)行一次Reduce并將結(jié)果作為ReduceShuffle階段的輸入,

這樣可以大大減少數(shù)據(jù)的輸入量。通常Reduce的個(gè)數(shù)通過用戶來指定,通常和CPU個(gè)數(shù)相

適應(yīng)才能使其效率達(dá)到最大。

HBase原理

Hbase是列存儲(chǔ)數(shù)據(jù)庫。其存儲(chǔ)的組織結(jié)構(gòu)就是將相同的列族存儲(chǔ)在?起,因此得名的。

Hbase存儲(chǔ)有行鍵,作為唯一標(biāo)識(shí),列表示為〈列族〉:〈列〉存儲(chǔ)信息,如address:city,address:

provice,然后是時(shí)間戳。

Hbase物理模型中,有一個(gè)總結(jié)點(diǎn)HMaster,通過其自帶的zookeeper與客戶端相連接。

Hbse作為分布式每一個(gè)節(jié)點(diǎn)作為一個(gè)RegionServer,維乎Region的狀態(tài)和管理。Region是

數(shù)據(jù)管理的基本單位。最初只有一-個(gè),通過擴(kuò)充后達(dá)到閾值然后分裂,通過Server控制其

規(guī)模。在RegionServer中,每一個(gè)store作為一個(gè)列族。當(dāng)數(shù)據(jù)插入進(jìn)來,新數(shù)據(jù)成為

Memstore,寫入內(nèi)存,當(dāng)Memstore達(dá)到閾值后,通過Flashcache進(jìn)程將數(shù)據(jù)寫入storeFile,

也就是當(dāng)內(nèi)存數(shù)據(jù)增多后溢出成一個(gè)SloreFile寫入磁盤,這里和Hadoop的spill類似,這

據(jù)后一并寫入。當(dāng)StoreFile數(shù)量過多時(shí),進(jìn)行合并,將形成一個(gè)大的StoreFile并且刪

除掉原來的StoreFile。再當(dāng)SloreFile大小超過一定閾值后,分裂成Region。

HBase有一個(gè)ROOT表和META表。META表記錄用戶Region的信息,但是隨著數(shù)據(jù)

增多,META也會(huì)增大,進(jìn)而分裂成多個(gè)Region,那我們用ROOT表記錄下META的信

息,是一個(gè)二級(jí)表,而zookeeper中記錄ROOT表的locationo當(dāng)我們需找找到一條信息時(shí),

先去zookccpcr看找ROOT,從ROOT中查找META找到META位置,在進(jìn)入META表中

尋找該數(shù)據(jù)所在Region,再讀取該Region的信息。HBase適合大量插入乂同時(shí)讀的情況,

其瓶頸是硬盤的傳輸速度而不再是像Oracle一樣瓶頸在硬盤的尋道速度。

Zookeeper原理

Zookeepcr是一個(gè)資源管理庫,對(duì)節(jié)點(diǎn)進(jìn)行協(xié)調(diào)、通信、失敗處理、節(jié)點(diǎn)損壞的處理等,

是一個(gè)無中心設(shè)計(jì),主節(jié)點(diǎn)通過選舉產(chǎn)生。Zookeeper的節(jié)點(diǎn)是Znode。每一個(gè)節(jié)點(diǎn)可以存

放1M的數(shù)據(jù),client訪問服務(wù)器時(shí)創(chuàng)建一個(gè)Znode,可以是短暫的Znode,其上可以放.上觀

察Waicher對(duì)node進(jìn)行監(jiān)控。Zookeeper有高可用性,每個(gè)機(jī)器復(fù)制一份數(shù)據(jù),只要有一般

以上的機(jī)器可以正常的運(yùn)行,整個(gè)集群就可以工作。比如6臺(tái)的集群容忍2臺(tái)斷開,超過兩

臺(tái)達(dá)到一般的數(shù)最就不可以,因此集群通常都是奇數(shù)來節(jié)約資源。

Zookeeper使用zab協(xié)議,是一個(gè)無中心協(xié)議,通過選舉的方式產(chǎn)生leader,通過每臺(tái)

機(jī)器的信息擴(kuò)散選舉最閑的資源利用較少的節(jié)點(diǎn)作為主控。同時(shí)當(dāng)配置數(shù)據(jù)有更改更新時(shí),

在每個(gè)節(jié)點(diǎn)上有配置watcher并觸發(fā)讀取更改,。因此能?哆保證一致性。每個(gè)節(jié)點(diǎn)通過leader

廣播的方式,使所有follower同步。

Zookeeper可以實(shí)現(xiàn)分布式鎖機(jī)制。通過waicher監(jiān)控,對(duì)每個(gè)Znode的鎖都有一個(gè)獨(dú)

一的編號(hào),按照序號(hào)的大小比較,來分配鎖。當(dāng)一個(gè)暫時(shí)Znode完結(jié)后刪除本節(jié)點(diǎn),通知

leader完結(jié),之后下一個(gè)Znode獲取鎖進(jìn)行操作。

Kafka原理

Kafka是分布式發(fā)布-訂閱消息系統(tǒng)。它最初由Linkedln公司開發(fā),之后成為Apache項(xiàng)

目的一部分。Kafka是一種快速、可擴(kuò)展的、設(shè)計(jì)內(nèi)在就是分布式的,分區(qū)的和可復(fù)制的提

交日志服務(wù)。它被設(shè)計(jì)為一個(gè)分布式系統(tǒng),易于向外擴(kuò)展;它何時(shí)為發(fā)布和訂閱提供高吞吐

量;它支持多訂閱者,當(dāng)失敗時(shí)能自動(dòng)平衡消費(fèi)者;它將消息持久化到磁盤,因此可用于批

量消費(fèi),例如ETL,以及實(shí)時(shí)應(yīng)用程序。broker和生產(chǎn)者、消費(fèi)者各自都是集群,集群中的

各個(gè)實(shí)例他們之間是對(duì)等的,集群擴(kuò)充節(jié)點(diǎn)很方便。

Kafka的基本概念包括話題、生產(chǎn)者、消費(fèi)者、代理或者kafka集群。話題是特定類型

的消息流。消息是字節(jié)的有效負(fù)載,話題是消息的分類名或種子名。生產(chǎn)者是能夠發(fā)布消息

到話題的任何對(duì)象。已發(fā)布的消息保存在?組服務(wù)器中,它們被稱為代理或Kafka集群。消

費(fèi)者可以訂閱一個(gè)或多個(gè)話題,并從Brokei?拉數(shù)據(jù),從而消費(fèi)這些已發(fā)布的消息、。

Kafka的存儲(chǔ)布局非常簡(jiǎn)單。話題的每個(gè)分區(qū)對(duì)應(yīng)一個(gè)邏輯日志。物理上,一個(gè)日志為

相同大小的一組分段文件,每次生產(chǎn)者發(fā)布消息到一個(gè)分區(qū),代理就將消息追加到最后一個(gè)

段文件中。當(dāng)發(fā)布的消息數(shù)量達(dá)到設(shè)定值或者經(jīng)過一定的時(shí)間后,段文件真正寫入磁盤中。

寫入完成后,消息公開給消費(fèi)者。段文件機(jī)制和Hadoop中spill類似。消費(fèi)者始終從特定分

區(qū)順序地獲取消息,如果消費(fèi)者知道特定消息的偏移量,也就說明消費(fèi)者已經(jīng)消費(fèi)了之前的

所有消息。消費(fèi)者向代理發(fā)出異步拉請(qǐng)求,準(zhǔn)備字節(jié)緩沖區(qū)用于消費(fèi)。每個(gè)異步拉請(qǐng)求都包

含要消費(fèi)的消息偏移量與其它消息系統(tǒng)不同,Kafka代理是無狀態(tài)的。這意味著消費(fèi)者必須

維護(hù)已消費(fèi)的狀態(tài)信息。這些信息由消費(fèi)者自己維護(hù),代理完全不管。消費(fèi)者可以故意倒回

到老的偏移量再次消費(fèi)數(shù)據(jù)。這違反了隊(duì)列的常見約定,但被證明是許多消費(fèi)者的基本特征。

kaflca的broker在配置文件中可以配置最多保存多少小時(shí)的數(shù)據(jù)和分區(qū)最大的空間占

用,過期的和超量的數(shù)據(jù)會(huì)被broker自動(dòng)清除掉。

Kafka會(huì)記錄offset到zk,同時(shí)又在內(nèi)存中維護(hù)offset,允許快速的checkpoint,如果

consumer比partition多是浪費(fèi),因?yàn)閗afka不允許partition上并行consumer讀取。同時(shí),

consumer比partition少,一個(gè)consumer會(huì)對(duì)應(yīng)多個(gè)partition,有可能導(dǎo)致partition中數(shù)據(jù)的

讀取不均勻,也不能保證數(shù)據(jù)間的順序性,kafka只有在一個(gè)partition讀取的時(shí)候才能保證

時(shí)間上是有順序的。增加partition或者consumer或者broker會(huì)導(dǎo)致rebalance,所以rebalance

后consumer對(duì)應(yīng)的partition會(huì)發(fā)生變化。

Spark原理

spark可以很容易和yam結(jié)合,直接調(diào)用HDFS、Hbase上面的數(shù)據(jù),和hadoop結(jié)合。

配置很容易。spark發(fā)展迅猛,框架比hadoop更加靈活實(shí)用。減少了延時(shí)處理,提高性能效

率實(shí)用靈活性。也可以與hadoop切實(shí)相互結(jié)合。

spark核心部分分為RDD。SparkSQL、SparkStreamingsMLlib、GraphX、SparkR等

核心組件解決了很多的大數(shù)據(jù)問題,其完美的框架日受歡迎。其相應(yīng)的生態(tài)環(huán)境包括zepplin

等可視化方面,正日益壯大。大型公司爭(zhēng)相實(shí)用spark來代替原有hadoop上相應(yīng)的功能模

塊。Spark讀寫過程不像hadoop溢出寫入磁盤,都是基于內(nèi)存,因此速度很快。另外DAG

作業(yè)調(diào)度系統(tǒng)的寬窄依賴讓Spark速度提高。

RDD是彈性分布式數(shù)據(jù)也是spark的核心,完全彈性的,如果數(shù)據(jù)丟失一部分還可以重

建。有自動(dòng)容錯(cuò)、位置感知調(diào)度和可伸縮性,通過數(shù)據(jù)檢杳點(diǎn)和記錄數(shù)據(jù)更新金象容錯(cuò)性檢

查。通過SparkConlexl.texlFile。加載文件變成RDD,然后通過Iransformalion構(gòu)建新的RDD,

通過action將RDD存儲(chǔ)到外部系統(tǒng)。

RDD使用延遲加載,也就是懶加載,只有當(dāng)用到的時(shí)候才加載數(shù)據(jù)。如果加載存儲(chǔ)所

有的中間過程會(huì)浪費(fèi)空間,因此要延遲加載。一旦spark看到整個(gè)變換鏈,他可以計(jì)算僅需

的結(jié)果數(shù)據(jù),如果下面的函數(shù)不需要數(shù)據(jù)那么數(shù)據(jù)也不會(huì)再加載。轉(zhuǎn)換RDD是惰性的,只

有在動(dòng)作中才可以使用它們。

Spark分為driver和executor,driver提交作業(yè),executor是application早worknodc上的

進(jìn)程,運(yùn)行task,driver對(duì)應(yīng)為sparkcontext。Spark的RDD操作有iransformalion、action

Transformation對(duì)RDD進(jìn)行依賴包裝,RDD所對(duì)應(yīng)的依賴都進(jìn)行DAG的構(gòu)建并保存,在

worknode掛掉之后除了通過備份恢復(fù)還可以通過元數(shù)據(jù)對(duì)其保存的依賴再計(jì)算一次得到。

當(dāng)作業(yè)提交也就是調(diào)用runJob時(shí),spark會(huì)根據(jù)RDD構(gòu)建DAG圖,提交給DAGScheduler,

這個(gè)DAGSchcduler是在SparkContext創(chuàng)建時(shí)一同初始化的,他會(huì)對(duì)作業(yè)進(jìn)行調(diào)度處理。當(dāng)

依賴圖構(gòu)建好以后,從action開始進(jìn)行解析,每一個(gè)操作作為一個(gè)task,每遇到shuffle就

切割成為一個(gè)taskSet,并把數(shù)據(jù)輸出到磁盤,如果不是shuffle數(shù)據(jù)還在內(nèi)存中存儲(chǔ)。就這

樣再往前推進(jìn),直到?jīng)]有算子,然后運(yùn)行從前面開始,如果沒有action的算子在這里不會(huì)執(zhí)

行,直到遇到action為止才開始運(yùn)行,這就形成了spark的懶加載,taskset提交給TaskSheduler

生成TaskSetManager并且提交給Executor運(yùn)行,運(yùn)行結(jié)束后反饋給DAGScheduler完成一個(gè)

taskSet,之后再提交下一個(gè),當(dāng)TaskSet運(yùn)行失敗時(shí)就返回DAGScheduler并重新再次創(chuàng)建。

一個(gè)job里面可能有多個(gè)TaskSet,一個(gè)application可能包含多個(gè)job。

HDFS介紹

HDFS(HadoopDistributedFileSystem,Hadoop分布式文件系統(tǒng)),它是一個(gè)高度容錯(cuò)性的

系統(tǒng),適合部署在廉價(jià)的磯器上。HDFS能提供高吞吐量的數(shù)據(jù)訪問,適合那些有著超大數(shù)

據(jù)集(largedataset)的應(yīng)用程序。

HDFS的設(shè)計(jì)特點(diǎn)是:

1、大數(shù)據(jù)文件。

2、文件分塊存儲(chǔ),HDFS會(huì)將一個(gè)完整的大文件平均分塊存儲(chǔ)到不同計(jì)算器上,讀寫效率

高。

3、流式數(shù)據(jù)討問,一次寫入多次讀寫,不支持修改(hadoop0.21之前支持Append),

4、廉價(jià)硬件,HDFS可以應(yīng)用在普通PC機(jī)上。

5、硬件故障,HDFS認(rèn)為所有計(jì)算機(jī)都可能會(huì)出問題,通過多個(gè)副本解決。

RacklRack2

HDFS讀寫文件流程-讀

2:獷bkxklocations

1:openDistributedNameNode

HDFSFHeSystem

dient3:read

FSOatanamenode

Inputstream

clientJVM

dientnode

4:read5:read

DataNodeDataNodeDataNode

datanodedatanodedatanode

HDFS讀寫文件流程-寫

HDFS常見操作:

hadoopfs-Is/data/查看所有文件

hadoopfs-du-h/data/列出匹配pattern的指定的文件系統(tǒng)空間總量(單位bytes),等

價(jià)于unix下的針對(duì)目錄的du

hadoopfs-chmod[-R]777/data/修改文件的權(quán)限,-上標(biāo)記遞歸修改。

hadoopfs-mkdir/data/在指定位置創(chuàng)建目錄。

hadoopfs-putxxx.log/data/從本地系統(tǒng)拷貝文件到DFS。

hadoopfs-rmrZdala/xxx.log歸刪掉所有的文件和目錄,等價(jià)于unix下的rm-rf<src>o

hadoopfs-get/data/xxx.log從DFS拷貝文件到本地文彳匕系統(tǒng)

hadoopfs-getmerge/data/*Jogdata.log顧名思義,從DFS拷貝多個(gè)文件、合并排序?yàn)橐粋€(gè)

文件到本地文件系統(tǒng)。

hadoopfs-mv<src><dsl>:將制定格式的文件move到指定的目標(biāo)位置。當(dāng)src為多個(gè)文

件時(shí),dst必須是個(gè)目錄

HDFS實(shí)踐?HDFSjavaAPI讀操作

Configurationconf=newConfiguration();

PathuriPath=newPath(uri);

try(

hdfs=FileSystem.get(|uriPath|.toUri(),conf);

ycatch(lOExceptxone)<

e.printStackTrace();

hdfs=null;

y

if(hdfs=null)return;

List<String>configPaths=newLinkedList<String>();

SDatalnputStreamdis=null;

BufferedReaderin=null;

try<_________

if(hdfs.exists(uriPath)){

d±s=hdfs.open(uriPath);

in=newBufferedReader(newInputStreamReader(dis,"LFTF-8"));

Stringline=null;

while((line=in.readLine())!=null){

line=line.trim();

configPaths.addCline):

?

}catch(lOExceptxone)<

e.printStackTrace();

>finally<

if(in!=nulLX

try{

±n.close();

}catch(TOExceptione){

e.printStackTrace();

y

y

_____Hd±s!=_______________________________________________________________

HDFS實(shí)踐?HDFSjavaAPI寫操作

//寫入一個(gè)文件到HDFS

Conf±gurationconf=newConf±guration();

FileSystemfileS=FileSystem.get(conf);

FileSystemlocaLFile=FileSystem.getLocal(conf);

Pathinput=newPath("/User/ryan/1.txt");

Pathout=newPath("/dara/l.txt");

try<

-FileStatus(]inpirtFiLe=LocalFile.TistStatus(input);

FSDataOutputStreamoutStream=fileS.create(out);

for(inti=0;i<inputFile.length;{

System.out.printLn(inputFile[i]-getPath().getName());

FSDa±aInputS±reamin-locatFilc.opcn(inpu±Filc(i].gc±Pa±h());

bytebuffer[]=newbyte[1024];

intbytesRead=0;

//邊讀邊寫

while((bytesRead=in.read(buffer))=>0){

outStream.write(buffer,0,bytesRead);

in.cTose();

>catch(Exceptione){

e.printStackTrace();

HDFS數(shù)據(jù)采用壓縮,減少磁盤空間占,和傳輸時(shí)候的帶寬占用,用(LZO,SNAPPY,GZIP

等),對(duì)于可以拆分的數(shù)據(jù)塊大小>=blocksize,不支持分拆文件采用接近blocksize大小

(Mapreduce)<,數(shù)據(jù)分布在各個(gè)節(jié)點(diǎn)需要均勻,避免熱點(diǎn)(讀寫性能)

MapReduce經(jīng)驗(yàn)與教訓(xùn)

數(shù)據(jù)傾斜問題

背景:H前消息統(tǒng)計(jì)采用以下流程messageid維度二>taskid_叩pid維度=>1)taskid維度2)

appid維度

問題:由于部分taskid或者部分appid推送量很大,導(dǎo)致以taskid,appid為key的mapreduce

出現(xiàn)部分reducer待處理的數(shù)據(jù)量很多,造成reducer之間處理數(shù)據(jù)量嚴(yán)重不平衡,任務(wù)在最

后99%時(shí)候卡在個(gè)別reducertask上。

解決:依據(jù)數(shù)據(jù)特性在以taskid,appid為key的mapreduce中添加combiner,讓數(shù)據(jù)在map

完成之后直接在本地機(jī)器上進(jìn)行一次合并,減少數(shù)據(jù)本身數(shù)量級(jí),進(jìn)而降低部分reducer負(fù)

載過高問題

數(shù)據(jù)大量重復(fù)在reduce中部分日志出現(xiàn)大量重復(fù),但是一個(gè)reducer進(jìn)行行處理的時(shí)候出現(xiàn)

內(nèi)存使用開銷很大或者部分reducer出現(xiàn)數(shù)據(jù)傾斜,解決:采用combiner在本地做一次數(shù)據(jù)

去重?zé)?/p>

shuffle階段出現(xiàn)內(nèi)存不足

例如任務(wù)提示:Error:java.lang.OutOfMcmoryError:Javaheapspaceat

org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOulpu(Copier.shuffleInMemory(Red

uceTask.java:1640)

解決://設(shè)置sort完成后reduce計(jì)算階段用來緩存數(shù)據(jù)的百分比

conf.setFloat("mapred.job.shuffle.input.buffer.percent",0.30f);

conf.sctFloat("maprcducc.job.shufflc.input.buffcr.pcrccnt",0.30f);

一次計(jì)算很多天數(shù)據(jù)占用集群大部分資源并且無法保證任務(wù)成功完成

背景:

標(biāo)注LBS數(shù)據(jù)時(shí),一次性將I個(gè)月行為數(shù)據(jù)放入在一個(gè)MRJob中執(zhí)行,導(dǎo)致占用掉Hado。

集群大部分資源,和其他任務(wù)搶占資源,執(zhí)行3個(gè)小時(shí)最后IO超時(shí)異常。

解決:

避免蠻力計(jì)算,將超大規(guī)模數(shù)據(jù)分拆多次計(jì)算,得到降低數(shù)量級(jí)后的中間結(jié)果,然后再合并

計(jì)算。好處:1、延長(zhǎng)時(shí)叵,保證程序穩(wěn)定。2、程序即使異常數(shù)據(jù)也不用從頭開始計(jì)算,可

以利用中間結(jié)果。

問題:

輸入的數(shù)據(jù)中部分文件不可分割,導(dǎo)致單個(gè)reducer負(fù)載過高,task卡住不進(jìn)行下去。

解決:

I、一個(gè)是數(shù)據(jù)在導(dǎo)入之前分割好來

2、轉(zhuǎn)化為可以分割的格式。

Hive簡(jiǎn)介

Hive是基于Hadoop的一個(gè)數(shù)據(jù)倉庫工具,可以將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫表,

并提供簡(jiǎn)單的sql查詢功能,可以將sql語句轉(zhuǎn)換為MapReduce任務(wù)進(jìn)行運(yùn)行。

優(yōu)點(diǎn)是學(xué)習(xí)成本低,可以通過類SQL語句快速實(shí)現(xiàn)簡(jiǎn)單的MapReduce統(tǒng)計(jì),不必開發(fā)專門

的M叩Reduce應(yīng)用,十分適合數(shù)據(jù)倉庫的統(tǒng)計(jì)分析。

H2c體系結(jié)構(gòu)

Hive實(shí)踐

分區(qū)添加

altertableuscr_qucry_poi_qunaraddpartition(day=20141125)location

7data_resul(/lbs/user_query_poi_qunar/20141125,;

統(tǒng)計(jì)

useuser;

droptableuser_qu

ROWFORMATDELIMITED

FIELDSTERMINATEDBYT

LOCATION7data_result/lbs/usery_poi_qunar_airport_cid_count;

CREATEEXTERNALTABLEuser_query_poi_qunar_airport_cid_coun((namesiring,count

bigint)

er_query_poi_qunar_airpon_cid_count';

insertoverwritetableuser_queiy_poi_qunar_airport_cid_countselect

count(distinct(cid)),concat(poi_name,"\t",city)fromuser_query_poi_qunarwhereday=20141125

andtag='bt_airport'groupbyconcat(poi_name,"\t",city);

UDF

例如我們有個(gè)自定義jar支持md5方法,udf.jar

1、添加jar

hive>addjar/app/yuankai/hadoopjars/gexin_hive_udf.jar;

2、關(guān)聯(lián)內(nèi)置函數(shù)hive>createtemporaryfunctionmd5ascom.igexin.hive.udf.Md5,;

3、執(zhí)行內(nèi)置函數(shù)hivc>sclcctmd5('tcst')fromtestlimit1;

其它內(nèi)置函數(shù)類實(shí)現(xiàn)方式:

publicclassXXXUDFextendsUDF{

publicTextevahiate(Object...args){〃實(shí)現(xiàn)操作returnnewText("yourresult");}}

comigexinhiveudf

orgapache.hadoophiveqlexec.UDF

orgapachehadoopioText

bl■cSelectlntsalledAppUDF{

Textevaluate(Objectargs){

(args.length4){

//com.tencent.pao;116;20141119;20141119;20141119#com.tencent.qq;112;20141118

String[]apps((String)args[0])spliti"#")

(appslength0)0

Stringpackageld(String)args[1]

startDayLongparseLongt(String)args[2])

endDayLongparseLongt(String)args[3])

(Stringappapps){

//com.tencent.pao;116;20141119;20141119;20141119

String[]arrappsplit。';"1)

(arr.length4){

(packageld.equals(arr[0])){

installed_dayLongparseLongi(String)arr[2])

(installed_daystartDayinstall.ed_dayendDay){

returnneuText(app);

}

)

}

}

}

returnnewTextC,H);

}

}

Hive實(shí)踐-遠(yuǎn)程調(diào)用

javasqlSQLException

HiveJdbcClient(

privat*finalStringdriverName"org.apache.hive.jdbc.HiveDriver"

privatefinalst?iStringjdbcUrl-jdbc:hive2://192.168.2.249:19999/defauXf

privatefinalStringjdbcUrl_user"user"

privatefinalstaticStringjdbcurl_pwd"yuanKai”

pu()li'.sf.:i.voidnain(String[]args)thrczSQLException{

tr(

Class.forA/ame(driverName)

)(ClassNotFoundExceptione){

Systemoutprintln(HClassNotFoundException:"egetMessage())

eprintStackTrace()

System.exit(l);

)

ConnectionconDriverManagergetConnection(jdbcUrIjdbcUrl_userjdbcllrl_pwd)

StatementstmtconcreateStatement()

Stringsqlnull;

ResultSet-null;|

stmtexecuteC'usereport_ods_mdp")

StringsubTaskidargs(0]

StringstartActionargs(1)

StringendActionargs(2)

StringstartDayargs(3]

StringendDayargs(4]

sql“selectdistinct(token_md5)fromreport_ods_mdp.msg_aswheretask_id=

subTaskidandaction_id>='*startAction"andaction_id<="endAction

?'andday>="startDay"andday<="endDay

resstmt.executeOuery(sql);

whili(res.next()){

Systemoutprintln(resgetString(l))

〃保存獲取到Cd到對(duì)應(yīng)表格中

//...

用戶是不是在某個(gè)時(shí)間內(nèi)安裝了的內(nèi)置函數(shù)

\addjar/app/yuankai/hadoopjars/hive-udf.jar;

createtemporaryfunctiongx_selectinstallas'com.igexin.hive.udf.SelectlntsallcdApp';

使用舉例:

select*fromuser_app_infowhere

gx.selectinstalKinstalled:'com.tencent.pao\'^OMIII9";'2O141119")!=nulllimiti;

褊一部分:產(chǎn)生背景

產(chǎn)生背景

?為了滿足客戶個(gè)性化的需求,Hive被設(shè)計(jì)成一個(gè)很開放的系統(tǒng),很多內(nèi)容都支持用戶定制,包括:

?文件格式:TextFile.SequenceFile

?內(nèi)存中的數(shù)據(jù)格式:JavaInteger/String,HadoopIntWritable/Text

?用戶提供的map/reduce腳本:不管什么語言,利用stdin/stdout傳輸數(shù)據(jù)

?用戶自定義函數(shù)

自定義函數(shù)

?雖然Hive提供了很多函數(shù),但是有些還是難以滿足我們的需求。因此Hive提供了自定義函數(shù)開發(fā)

?自定義函數(shù)包括三種UDF、JADF、UDTF

?UDF(User-Defined-Functon)

?UDAF(User-DefinedAggregationFuncation)

?UDTF(User-DefinedTable-GeneratingFunctions)用來解決輸入一行輸出多行(On-to-many

maping)的需求。

HIVE中使用定義的函數(shù)的三種方式

?在HIVE會(huì)話中add自定義函數(shù)的jar文件,然后創(chuàng)建function,繼而使用函數(shù)

?在進(jìn)入HIVE會(huì)話之前先自動(dòng)執(zhí)行創(chuàng)建function,不用用戶手工創(chuàng)建

?把自定義的函數(shù)寫到系統(tǒng)函數(shù)中,使之成為HIVE的一個(gè)默認(rèn)函數(shù),這樣就不需要createtemporary

function

第二部分:UDF

UDF用法

?UDF(User-Defined-Function)

?UDF函數(shù)可以直接應(yīng)用于select語句,對(duì)查詢結(jié)構(gòu)做格式化處理后,再輸出內(nèi)容

?編寫UDF函數(shù)的時(shí)候需要注意一下幾點(diǎn)

?自定義UDF需要繼承org.apache.hadoop.hive.ql.UDF

?需要實(shí)現(xiàn)evaluate函數(shù)

?evaluate函數(shù)支持重載

?UDF只能實(shí)現(xiàn)一進(jìn)一出的操作,如果需要實(shí)現(xiàn)多進(jìn)一出,則需要實(shí)現(xiàn)UDAF

UDF用法代碼示例

importorg.apache.Hadoop.hive.ql.exec.UDF

publicclassHellowordextendsUDF(

publicStringevaluate(){

return"helloworld!";

}

publicStringevaluate(Stringstr){

return"helloworld:"+str;

}

)

開發(fā)步驟

?開發(fā)代碼

?把程序打包放到目標(biāo)機(jī)器上去

?進(jìn)入hive客戶端

?添加jar包:hive>addjar/run/jar/udf_test.jar;

?創(chuàng)建臨時(shí)函數(shù):hive〉CREATETEMPORARYFUNCTIONmy_addAS'com.hive.udf.Add'

?查詢HQL語句:

?SELECTmy_add(8,9)FROMscores;

?SELECTmy_add(scores.math,scores.art)FROMscores;

?銷毀臨時(shí)函數(shù):hive>DROPTEMPORARYFUNCTIONmy_add;

?細(xì)節(jié)

?在使用UDF的時(shí)候,會(huì)自動(dòng)進(jìn)行類型轉(zhuǎn)換,例如:

SELECTmy_add(8,9.1)FROMscores;

?結(jié)果是17.1,UDF將類型為Int的參數(shù)轉(zhuǎn)化成double。類型的飲食轉(zhuǎn)換是通過UDFResolver來進(jìn)行

控制的

第三部分:UDAF

UDAF

?Hive查詢數(shù)據(jù)時(shí),有些聚類函數(shù)在HQL沒有自帶,需要用戶自定義實(shí)現(xiàn)

?用戶自定義聚合函數(shù):Sum,Averagen-1

?UDAF(User-DefinedAggregationFuncation)

用法

??下兩個(gè)包是必須的importorg.apache.hadoop.hive.ql.exec.UDAF和

org.apache,hadoop.hive.ql.exec.UDAFEvaluator

開發(fā)步驟

?函數(shù)類需要繼承UDAF類,為部類Evaluator實(shí)UDAFEvaluator接口

?Evaluator"需要實(shí)現(xiàn)init、iterate、terminatePartiaKmerge>terminate這幾個(gè)函數(shù)

a)init函數(shù)實(shí)現(xiàn)接口UDAFEvaluator的init函數(shù)。

b)iterate接收傳入的參數(shù),并進(jìn)行內(nèi)部的輪轉(zhuǎn)。其返回類型為boolean。

c)terminatePartial無參數(shù),其為iterate函數(shù)輪轉(zhuǎn)結(jié)束后,返回輪轉(zhuǎn)數(shù)據(jù),terminatePartial類似于

hadoop的Combiner,

d)merge接收terminatePartial的返回結(jié)果,進(jìn)行數(shù)據(jù)merge操作,其返回類型為boolean。

e)terminate返回最終的聚親函數(shù)結(jié)果。

執(zhí)行步驟

?執(zhí)行求平均數(shù)函數(shù)的步驟

a)將java文件編譯成Avg_test.jar。

b)進(jìn)入hive客戶端添加jar包:

hive>addjar/run/jar/Avg_test.jaro

c)創(chuàng)建臨時(shí)函數(shù):

hive>createtemporaryfunctionavg_test'hive.udaf.Avg';

d)查詢語句:

hivoselectavg_test(scoies.math)fromscores;

e)銷毀臨時(shí)函數(shù):

hive>droptemporaryfunctionavg_test;

UDAF代碼示例

publicclassMyAvgextendsUDAF{

publicstaticclassAvgEvauatorimplementsUDAFEvalustor(

}

publicvoidinit()

publicbooleaniterate(Doubleo){}

publicAvgStateterminatePartial(){}

publicbooleanterminateFartial(Doubleo){}

publicDoubleterminate(){}

)

第四部分:UDTF

UDTF

?UDTF(User-DefinedTable-GeneratingFunctions)用來解決輸入?行輸出多行(On-to-many

maping)的需求。

開發(fā)步驟

?UDTF步驟:

?必須繼承org.apache.Hadoop.hive.ql.udf.generic.GenericUDTF

?實(shí)現(xiàn)initialize,process,close三個(gè)方法

?UDTF首先會(huì)

?調(diào)用initialize方法,此方法返回UDTF的返回行的信息(返回個(gè)數(shù),類型)

初始化完成后,會(huì)調(diào)用process方法,對(duì)傳入的參數(shù)進(jìn)行處理,可以通過forword。方法把結(jié)果返回

?最后close。方法調(diào)用,對(duì)辭要清理的方法進(jìn)行清理

使用方法

?UDTF有兩種使用方法,一種直接放到select后面,一種和lateralview一起使用

?直接select中使用:selectexplode_map(properties)as(coll,col2)fromsrc;

?不可以添加其他字段使用:selecta,explode_map(properties)as(coll,col2)fromsrc

?不可以嵌套調(diào)用:selectexplode_map(explode_map(properties))fromsrc

?不可以和groupby/clusterby/distributeby/sortby一起使用:selectexplode_map(properties)

as(collzcol2)fromsrcgroupbycoll,col2

?和lateralview一起使用:selectsrc.id,mytable.coll,mytable.col2fromsrclateralview

explode_map(properties)mytableascoll,col2;

此方法史為方便H常使用。執(zhí)行過程相當(dāng)于單獨(dú)執(zhí)行了兩次抽取,然后union到一個(gè)表里。

lateralview

?LateralView語法

?lateralview:LATERALVIEWudtf(expression)tableAliasAScolumnAliascolumnAlias)*

fromClause:FROMbaseTable(lateralview)*

?LateralView用于UDTF(user-definedtablegeneratingfunctions)中將行轉(zhuǎn)成列,例如explode().

?目前LateralView不支持有上而下的優(yōu)化。如果使用Where子句,查詢可能將不被編譯。解決方法見:

此時(shí),在查詢之前執(zhí)行sethive.optimize.ppd=false;

?例子

?pageAdy。它有兩個(gè)列

stringpageidArray<int>adidlist

"front_page"[1,2,3]

"contact_page"[3,4,5]

?SELECTpageid,adidFROMpageAdsLATERALVIEWexplode(adidjist)adTableASadid;

?將輸出如下結(jié)果

stringpageidintadid

"front_page"1

Mcontact_page"3

代碼示例

publicclassMyUDTFextendsGenericUDTF{

publicStructObjectlnspectorinitialize(ObjectInspector[]args){}

publicvoidprocess(Object[]args)throwsHiveException(}

}

實(shí)現(xiàn):切分"key:value;key:value;”這種字符串,返回結(jié)果為key,value兩個(gè)字段

Hive使用經(jīng)驗(yàn)

/結(jié)果顯示設(shè)置列名

sethive.cli.print.header=true;

〃壓縮設(shè)置

seterniediate=true;

setprcss.output=false;

〃設(shè)置并行開啟和數(shù)量

sethive.exec.parallel=true;

sethive.exec.parallel.thread.number=10;

1)使用外表,避免drop的時(shí)候?qū)⒃磾?shù)據(jù)刪除。

2)對(duì)于較大兩個(gè)表格進(jìn)行關(guān)聯(lián)查詢的時(shí)候,盡可能將數(shù)據(jù)預(yù)先處理

(例如篩選到單獨(dú)表格),然后再join。

3)在數(shù)據(jù)入庫的時(shí)候,依照時(shí)間或者某個(gè)屬性進(jìn)行分區(qū),方便查詢

時(shí)能夠在一個(gè)分區(qū)內(nèi)檢索,提高效率。

4)復(fù)雜字段使用自定義內(nèi)置函數(shù),避免超級(jí)復(fù)雜的sql。

分桶

CREATETABLEbucketeduser(idINT)nameSTRING)

CLUSTEREDBY(id)INTO4BUCKETS;

對(duì)于每一個(gè)表(table)或者分區(qū),Hive可以進(jìn)一步組織成桶,也就是說桶是更為細(xì)粒度的

數(shù)據(jù)范圍劃分。Hive也是針對(duì)某一列進(jìn)行桶的組織。Hive采用對(duì)列值哈希,然后除以桶的

個(gè)數(shù)求余的方式?jīng)Q定該條記錄存放在哪個(gè)桶當(dāng)中。

把表(或者分區(qū))組織成桶(Bucket)有兩個(gè)理由:

(1)獲得更高的查詢處理效率。桶為表加上了額外的結(jié)構(gòu),Hive在處理有些查詢時(shí)能利用

這個(gè)結(jié)構(gòu)。具體而言,連接兩個(gè)在(包含連接列的)相同列上劃分了桶的表,可以使用M叩端

連接(Map-sidejoin)高效的實(shí)現(xiàn)。比如JOIN操作。對(duì)于JOIN操作兩個(gè)表有一個(gè)相同的

列,如果對(duì)這兩個(gè)表都進(jìn)行了桶操作。那么將保存相同列值的桶進(jìn)行JOIN操作就可以,可

以大大較少JOIN的數(shù)據(jù)量。

(2)使取樣(sampling)更高效。在處理大規(guī)模數(shù)據(jù)集時(shí),在開發(fā)和修改查詢的階段,如

果能在數(shù)據(jù)集的一小部分?jǐn)?shù)據(jù)上試運(yùn)行查詢,會(huì)帶來很多方便。

分區(qū)

分區(qū)是以字段的形式在表結(jié)構(gòu)中存在,通過describetable命令可以查看到字段存在,但是

該字段不存放實(shí)際的數(shù)據(jù)內(nèi)容,僅僅是分區(qū)的表示(偽列)。

(1)靜態(tài)分區(qū)

createtableifnotexistssopdm.wyp2(idstring,telstring)

partitionedby(ageint)

rowformatdelimited

fieldsterminatedby

storedastextfile;

-overwrite是覆蓋,into是追加

insertintotablesopdm.w/p2

partition(age='25')

selectid,name,telfromsopdm.wyp;

(2)動(dòng)態(tài)分區(qū)

--設(shè)置為true表示開啟動(dòng)態(tài)分區(qū)功能(默認(rèn)為false)

sethive.exec.dynamic.partition=true;

-設(shè)置為nonstrict,表示允許所有分區(qū)都是動(dòng)態(tài)的(默認(rèn)為strict)

sethive.exec.dynamic.partition.mode=nonstrict;

-insertoverwrite是覆蓋,insertinto是追加

sethive.exec.dynamic.partition.mode=nonstrict;

insertoverwritetablesopdm.wyp2

partition(age)

selectid,name,tel,agefromsopdm.wyp;

Hive服務(wù):

hive命令行模式,直接輸入#/hive/bin/hive的執(zhí)行程序,或者輸

入#hive-servicecli

2、hiveweb界面的(端口號(hào)9999)啟動(dòng)方式

#hive-servicehwi&

用于通過瀏覽器來訪問hive

http://hadoopO:9999/hwi/

3、hive遠(yuǎn)程服務(wù)(端口號(hào)10000)啟動(dòng)方式(java訪問必須打開)

#hive一servicehiveserver&

與數(shù)據(jù)庫中的Table在概念上是類似

每一個(gè)Table在Hive中都有一個(gè)相應(yīng)的目錄存儲(chǔ)數(shù)據(jù)。例如,一個(gè)表test,它

在HDFS中的路徑為:/warehouse/testowarehouse是在hive-site.xml中

由${hive.metastore,warehouse,dir}

指定的數(shù)據(jù)倉庫的目錄所有的Table數(shù)據(jù)(不包括ExternalTable)都保存在這個(gè)目

錄中。刪除表時(shí),元數(shù)據(jù)與數(shù)據(jù)都會(huì)被刪除

內(nèi)部表和外部表的區(qū)別:

內(nèi)部表:的創(chuàng)建過程和數(shù)據(jù)加載過程(這兩個(gè)過程可以在同一個(gè)語句中完成),在加載數(shù)據(jù)

的過程中,實(shí)際數(shù)據(jù)會(huì)被移動(dòng)到數(shù)據(jù)倉庫目錄中;之后對(duì)數(shù)據(jù)對(duì)訪問將會(huì)直接在數(shù)據(jù)倉庫目

錄中完成。刪除表時(shí),表中的數(shù)據(jù)和元數(shù)據(jù)將會(huì)被同時(shí)刪

除。

外部表:只有一個(gè)過程,加載數(shù)據(jù)和創(chuàng)建表同時(shí)完成,并不會(huì)移動(dòng)到數(shù)據(jù)倉庫H錄中,只是

與外部數(shù)據(jù)建立一個(gè)鏈接,當(dāng)刪除一個(gè)外部表時(shí),僅刪除該鏈接

HiveSql:

創(chuàng)建內(nèi)部表createtableinner_table(keystring);〃如果不指定分隔符,默

認(rèn)分隔符為

createtableinner_table(keystring)rowformatdelimitedfieldsterm

inatedby〃指定分隔符

createtableinner_table(keystring)rowformatdelimitedfieldsterm

inatedbystoredasSEQUENCEFILE;〃用哪種方式存儲(chǔ)數(shù)據(jù),SEQUENCEFILE

是hadoop自帶的文件壓縮格式

查看表結(jié)構(gòu)

describeinner_table;或者descinner_table;

加載數(shù)據(jù)

loaddatalocalinpath*/root/test,txt'intotableinnertable;

loaddatalocalinpath1/root/test.txt*overwi'iterintotableinnertab

le;〃數(shù)據(jù)有誤,重新加載數(shù)據(jù)

查看數(shù)據(jù)select*frominner_table

selectcount(*)frominnertable

刪除表droptableinrer_table

重命名表altertableinnertablerenametonew_table_name;

修改字段altertableinnertablechangekeykey1;

添加字段altertableinner_tableaddcolumns(valuestring);

分區(qū)相關(guān):

創(chuàng)建分區(qū)表

createtablepartition_tab1e(namestring,salaryfloat,genderstring,level

string)partitionedby(dtstring,depstring)rowformatdelimitedfie

Idsterminatedbystoredastextfile;

查看表有哪些分區(qū)SHOWPARTITIONSpartition.table;

查看表結(jié)構(gòu)describepartitiontable;或

者dosepartitiontable;

加載數(shù)據(jù)到分區(qū)

loaddatalocalinp

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論