




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
分布式系統(tǒng)組件介紹
Hadoop原理
分為HDFS與Yarn兩個(gè)部分。HDFS有Namenode和Datanode兩個(gè)部分。每個(gè)節(jié)點(diǎn)占
用一個(gè)電腦。Datanode定時(shí)向Namenode發(fā)送心跳包,心跳包中包含Datanode的校驗(yàn)等信
息,用來監(jiān)控Datanode。HDFS將數(shù)據(jù)分為塊,默認(rèn)為64M每個(gè)塊信息按照配置的參數(shù)分
別備份在不同的Datanode,而數(shù)據(jù)塊在哪個(gè)節(jié)點(diǎn)上,這些信息都存儲(chǔ)到Narnenode上面cYarn
是MapReduce2,可以集成更多的組件,如spark>mpi等。MapReduce包括Jobtraker與
Tasktrakcr兩個(gè)部分。其中JobTrakcr是在主節(jié)點(diǎn)上,負(fù)責(zé)整體的調(diào)度。Tasktrakcr在slave
節(jié)點(diǎn)上,當(dāng)提交任務(wù)后,將其交給Jobtraker進(jìn)行調(diào)度,調(diào)度到該任務(wù)之后就會(huì)將jar包發(fā)送
到響應(yīng)的Tasktraker,以實(shí)現(xiàn)分布式中移動(dòng)計(jì)算資源而非移動(dòng)數(shù)據(jù)。因?yàn)檫@些任務(wù)都是并發(fā)
執(zhí)行的,所以每個(gè)任務(wù)需要調(diào)用哪個(gè)節(jié)點(diǎn)的數(shù)據(jù)必須非常的明確,通常這一部分是通過
Jobtraker進(jìn)行指揮。
在MapReduce的開始,每個(gè)block作為一個(gè)Map輸入,Map輸入后就進(jìn)入shuffle階段。
Shuffle階段主:要分為Map和Reduce兩個(gè)階段。在MapShuffle階段,Map輸入按用戶的程
序進(jìn)行處理,生成的結(jié)果按key排序,然后進(jìn)入內(nèi)存,溢出后形成一個(gè)spill寫入磁盤,這
樣多個(gè)spill在這個(gè)節(jié)點(diǎn)中進(jìn)行多路歸并排序(勝者樹)形成一個(gè)排序文件寫入磁盤,這期
間那些Map文件交給那些節(jié)點(diǎn)處理由Jobtraker進(jìn)行調(diào)度,最后生成一個(gè)大排序的文件,并
且刪除spilL之后,再將多個(gè)節(jié)點(diǎn)上已經(jīng)排序的文件進(jìn)行行多路歸并排序(一個(gè)大文件N
分到n個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)分為k個(gè)spill,?個(gè)spill長(zhǎng)度s,時(shí)間復(fù)雜度是N(logn(n個(gè)節(jié)點(diǎn)
多路歸并排序)+k)gk(每個(gè)節(jié)點(diǎn)內(nèi)k個(gè)spill排序)-logs(每個(gè)spill內(nèi)部進(jìn)行排序)),
N=nks,所以最后的復(fù)雜度亦是NlogN)。
完成MapShuffle階段后通知Jobtraker進(jìn)入ReduceShuffle階段。在這個(gè)階段,因?yàn)橐?/p>
經(jīng)排序,很容易將用戶的程序直接作用到相同key的數(shù)據(jù)上,這些數(shù)據(jù)作為Reduce的輸入
進(jìn)行處理,最終將輸出的結(jié)果數(shù)據(jù)寫入到HDFS上,并刪除磁盤數(shù)據(jù)。Map一般多,Reduce
少,所以通過Hash的方法將Map文件映射到Reduce上,進(jìn)行處理,這個(gè)階段叫做Pa(ition<,
為了避免譬如所有數(shù)據(jù)相加這種操作使得數(shù)據(jù)負(fù)載移動(dòng)的數(shù)量少的Reduce階段,造成效率
低下的結(jié)果,我們可以在在MapShuffle階段加一個(gè)Combine階段,這個(gè)Combine是在每一
臺(tái)節(jié)點(diǎn)上將已經(jīng)排序好的文件進(jìn)行一次Reduce并將結(jié)果作為ReduceShuffle階段的輸入,
這樣可以大大減少數(shù)據(jù)的輸入量。通常Reduce的個(gè)數(shù)通過用戶來指定,通常和CPU個(gè)數(shù)相
適應(yīng)才能使其效率達(dá)到最大。
HBase原理
Hbase是列存儲(chǔ)數(shù)據(jù)庫。其存儲(chǔ)的組織結(jié)構(gòu)就是將相同的列族存儲(chǔ)在?起,因此得名的。
Hbase存儲(chǔ)有行鍵,作為唯一標(biāo)識(shí),列表示為〈列族〉:〈列〉存儲(chǔ)信息,如address:city,address:
provice,然后是時(shí)間戳。
Hbase物理模型中,有一個(gè)總結(jié)點(diǎn)HMaster,通過其自帶的zookeeper與客戶端相連接。
Hbse作為分布式每一個(gè)節(jié)點(diǎn)作為一個(gè)RegionServer,維乎Region的狀態(tài)和管理。Region是
數(shù)據(jù)管理的基本單位。最初只有一-個(gè),通過擴(kuò)充后達(dá)到閾值然后分裂,通過Server控制其
規(guī)模。在RegionServer中,每一個(gè)store作為一個(gè)列族。當(dāng)數(shù)據(jù)插入進(jìn)來,新數(shù)據(jù)成為
Memstore,寫入內(nèi)存,當(dāng)Memstore達(dá)到閾值后,通過Flashcache進(jìn)程將數(shù)據(jù)寫入storeFile,
也就是當(dāng)內(nèi)存數(shù)據(jù)增多后溢出成一個(gè)SloreFile寫入磁盤,這里和Hadoop的spill類似,這
據(jù)后一并寫入。當(dāng)StoreFile數(shù)量過多時(shí),進(jìn)行合并,將形成一個(gè)大的StoreFile并且刪
除掉原來的StoreFile。再當(dāng)SloreFile大小超過一定閾值后,分裂成Region。
HBase有一個(gè)ROOT表和META表。META表記錄用戶Region的信息,但是隨著數(shù)據(jù)
增多,META也會(huì)增大,進(jìn)而分裂成多個(gè)Region,那我們用ROOT表記錄下META的信
息,是一個(gè)二級(jí)表,而zookeeper中記錄ROOT表的locationo當(dāng)我們需找找到一條信息時(shí),
先去zookccpcr看找ROOT,從ROOT中查找META找到META位置,在進(jìn)入META表中
尋找該數(shù)據(jù)所在Region,再讀取該Region的信息。HBase適合大量插入乂同時(shí)讀的情況,
其瓶頸是硬盤的傳輸速度而不再是像Oracle一樣瓶頸在硬盤的尋道速度。
Zookeeper原理
Zookeepcr是一個(gè)資源管理庫,對(duì)節(jié)點(diǎn)進(jìn)行協(xié)調(diào)、通信、失敗處理、節(jié)點(diǎn)損壞的處理等,
是一個(gè)無中心設(shè)計(jì),主節(jié)點(diǎn)通過選舉產(chǎn)生。Zookeeper的節(jié)點(diǎn)是Znode。每一個(gè)節(jié)點(diǎn)可以存
放1M的數(shù)據(jù),client訪問服務(wù)器時(shí)創(chuàng)建一個(gè)Znode,可以是短暫的Znode,其上可以放.上觀
察Waicher對(duì)node進(jìn)行監(jiān)控。Zookeeper有高可用性,每個(gè)機(jī)器復(fù)制一份數(shù)據(jù),只要有一般
以上的機(jī)器可以正常的運(yùn)行,整個(gè)集群就可以工作。比如6臺(tái)的集群容忍2臺(tái)斷開,超過兩
臺(tái)達(dá)到一般的數(shù)最就不可以,因此集群通常都是奇數(shù)來節(jié)約資源。
Zookeeper使用zab協(xié)議,是一個(gè)無中心協(xié)議,通過選舉的方式產(chǎn)生leader,通過每臺(tái)
機(jī)器的信息擴(kuò)散選舉最閑的資源利用較少的節(jié)點(diǎn)作為主控。同時(shí)當(dāng)配置數(shù)據(jù)有更改更新時(shí),
在每個(gè)節(jié)點(diǎn)上有配置watcher并觸發(fā)讀取更改,。因此能?哆保證一致性。每個(gè)節(jié)點(diǎn)通過leader
廣播的方式,使所有follower同步。
Zookeeper可以實(shí)現(xiàn)分布式鎖機(jī)制。通過waicher監(jiān)控,對(duì)每個(gè)Znode的鎖都有一個(gè)獨(dú)
一的編號(hào),按照序號(hào)的大小比較,來分配鎖。當(dāng)一個(gè)暫時(shí)Znode完結(jié)后刪除本節(jié)點(diǎn),通知
leader完結(jié),之后下一個(gè)Znode獲取鎖進(jìn)行操作。
Kafka原理
Kafka是分布式發(fā)布-訂閱消息系統(tǒng)。它最初由Linkedln公司開發(fā),之后成為Apache項(xiàng)
目的一部分。Kafka是一種快速、可擴(kuò)展的、設(shè)計(jì)內(nèi)在就是分布式的,分區(qū)的和可復(fù)制的提
交日志服務(wù)。它被設(shè)計(jì)為一個(gè)分布式系統(tǒng),易于向外擴(kuò)展;它何時(shí)為發(fā)布和訂閱提供高吞吐
量;它支持多訂閱者,當(dāng)失敗時(shí)能自動(dòng)平衡消費(fèi)者;它將消息持久化到磁盤,因此可用于批
量消費(fèi),例如ETL,以及實(shí)時(shí)應(yīng)用程序。broker和生產(chǎn)者、消費(fèi)者各自都是集群,集群中的
各個(gè)實(shí)例他們之間是對(duì)等的,集群擴(kuò)充節(jié)點(diǎn)很方便。
Kafka的基本概念包括話題、生產(chǎn)者、消費(fèi)者、代理或者kafka集群。話題是特定類型
的消息流。消息是字節(jié)的有效負(fù)載,話題是消息的分類名或種子名。生產(chǎn)者是能夠發(fā)布消息
到話題的任何對(duì)象。已發(fā)布的消息保存在?組服務(wù)器中,它們被稱為代理或Kafka集群。消
費(fèi)者可以訂閱一個(gè)或多個(gè)話題,并從Brokei?拉數(shù)據(jù),從而消費(fèi)這些已發(fā)布的消息、。
Kafka的存儲(chǔ)布局非常簡(jiǎn)單。話題的每個(gè)分區(qū)對(duì)應(yīng)一個(gè)邏輯日志。物理上,一個(gè)日志為
相同大小的一組分段文件,每次生產(chǎn)者發(fā)布消息到一個(gè)分區(qū),代理就將消息追加到最后一個(gè)
段文件中。當(dāng)發(fā)布的消息數(shù)量達(dá)到設(shè)定值或者經(jīng)過一定的時(shí)間后,段文件真正寫入磁盤中。
寫入完成后,消息公開給消費(fèi)者。段文件機(jī)制和Hadoop中spill類似。消費(fèi)者始終從特定分
區(qū)順序地獲取消息,如果消費(fèi)者知道特定消息的偏移量,也就說明消費(fèi)者已經(jīng)消費(fèi)了之前的
所有消息。消費(fèi)者向代理發(fā)出異步拉請(qǐng)求,準(zhǔn)備字節(jié)緩沖區(qū)用于消費(fèi)。每個(gè)異步拉請(qǐng)求都包
含要消費(fèi)的消息偏移量與其它消息系統(tǒng)不同,Kafka代理是無狀態(tài)的。這意味著消費(fèi)者必須
維護(hù)已消費(fèi)的狀態(tài)信息。這些信息由消費(fèi)者自己維護(hù),代理完全不管。消費(fèi)者可以故意倒回
到老的偏移量再次消費(fèi)數(shù)據(jù)。這違反了隊(duì)列的常見約定,但被證明是許多消費(fèi)者的基本特征。
kaflca的broker在配置文件中可以配置最多保存多少小時(shí)的數(shù)據(jù)和分區(qū)最大的空間占
用,過期的和超量的數(shù)據(jù)會(huì)被broker自動(dòng)清除掉。
Kafka會(huì)記錄offset到zk,同時(shí)又在內(nèi)存中維護(hù)offset,允許快速的checkpoint,如果
consumer比partition多是浪費(fèi),因?yàn)閗afka不允許partition上并行consumer讀取。同時(shí),
consumer比partition少,一個(gè)consumer會(huì)對(duì)應(yīng)多個(gè)partition,有可能導(dǎo)致partition中數(shù)據(jù)的
讀取不均勻,也不能保證數(shù)據(jù)間的順序性,kafka只有在一個(gè)partition讀取的時(shí)候才能保證
時(shí)間上是有順序的。增加partition或者consumer或者broker會(huì)導(dǎo)致rebalance,所以rebalance
后consumer對(duì)應(yīng)的partition會(huì)發(fā)生變化。
Spark原理
spark可以很容易和yam結(jié)合,直接調(diào)用HDFS、Hbase上面的數(shù)據(jù),和hadoop結(jié)合。
配置很容易。spark發(fā)展迅猛,框架比hadoop更加靈活實(shí)用。減少了延時(shí)處理,提高性能效
率實(shí)用靈活性。也可以與hadoop切實(shí)相互結(jié)合。
spark核心部分分為RDD。SparkSQL、SparkStreamingsMLlib、GraphX、SparkR等
核心組件解決了很多的大數(shù)據(jù)問題,其完美的框架日受歡迎。其相應(yīng)的生態(tài)環(huán)境包括zepplin
等可視化方面,正日益壯大。大型公司爭(zhēng)相實(shí)用spark來代替原有hadoop上相應(yīng)的功能模
塊。Spark讀寫過程不像hadoop溢出寫入磁盤,都是基于內(nèi)存,因此速度很快。另外DAG
作業(yè)調(diào)度系統(tǒng)的寬窄依賴讓Spark速度提高。
RDD是彈性分布式數(shù)據(jù)也是spark的核心,完全彈性的,如果數(shù)據(jù)丟失一部分還可以重
建。有自動(dòng)容錯(cuò)、位置感知調(diào)度和可伸縮性,通過數(shù)據(jù)檢杳點(diǎn)和記錄數(shù)據(jù)更新金象容錯(cuò)性檢
查。通過SparkConlexl.texlFile。加載文件變成RDD,然后通過Iransformalion構(gòu)建新的RDD,
通過action將RDD存儲(chǔ)到外部系統(tǒng)。
RDD使用延遲加載,也就是懶加載,只有當(dāng)用到的時(shí)候才加載數(shù)據(jù)。如果加載存儲(chǔ)所
有的中間過程會(huì)浪費(fèi)空間,因此要延遲加載。一旦spark看到整個(gè)變換鏈,他可以計(jì)算僅需
的結(jié)果數(shù)據(jù),如果下面的函數(shù)不需要數(shù)據(jù)那么數(shù)據(jù)也不會(huì)再加載。轉(zhuǎn)換RDD是惰性的,只
有在動(dòng)作中才可以使用它們。
Spark分為driver和executor,driver提交作業(yè),executor是application早worknodc上的
進(jìn)程,運(yùn)行task,driver對(duì)應(yīng)為sparkcontext。Spark的RDD操作有iransformalion、action
Transformation對(duì)RDD進(jìn)行依賴包裝,RDD所對(duì)應(yīng)的依賴都進(jìn)行DAG的構(gòu)建并保存,在
worknode掛掉之后除了通過備份恢復(fù)還可以通過元數(shù)據(jù)對(duì)其保存的依賴再計(jì)算一次得到。
當(dāng)作業(yè)提交也就是調(diào)用runJob時(shí),spark會(huì)根據(jù)RDD構(gòu)建DAG圖,提交給DAGScheduler,
這個(gè)DAGSchcduler是在SparkContext創(chuàng)建時(shí)一同初始化的,他會(huì)對(duì)作業(yè)進(jìn)行調(diào)度處理。當(dāng)
依賴圖構(gòu)建好以后,從action開始進(jìn)行解析,每一個(gè)操作作為一個(gè)task,每遇到shuffle就
切割成為一個(gè)taskSet,并把數(shù)據(jù)輸出到磁盤,如果不是shuffle數(shù)據(jù)還在內(nèi)存中存儲(chǔ)。就這
樣再往前推進(jìn),直到?jīng)]有算子,然后運(yùn)行從前面開始,如果沒有action的算子在這里不會(huì)執(zhí)
行,直到遇到action為止才開始運(yùn)行,這就形成了spark的懶加載,taskset提交給TaskSheduler
生成TaskSetManager并且提交給Executor運(yùn)行,運(yùn)行結(jié)束后反饋給DAGScheduler完成一個(gè)
taskSet,之后再提交下一個(gè),當(dāng)TaskSet運(yùn)行失敗時(shí)就返回DAGScheduler并重新再次創(chuàng)建。
一個(gè)job里面可能有多個(gè)TaskSet,一個(gè)application可能包含多個(gè)job。
HDFS介紹
HDFS(HadoopDistributedFileSystem,Hadoop分布式文件系統(tǒng)),它是一個(gè)高度容錯(cuò)性的
系統(tǒng),適合部署在廉價(jià)的磯器上。HDFS能提供高吞吐量的數(shù)據(jù)訪問,適合那些有著超大數(shù)
據(jù)集(largedataset)的應(yīng)用程序。
HDFS的設(shè)計(jì)特點(diǎn)是:
1、大數(shù)據(jù)文件。
2、文件分塊存儲(chǔ),HDFS會(huì)將一個(gè)完整的大文件平均分塊存儲(chǔ)到不同計(jì)算器上,讀寫效率
高。
3、流式數(shù)據(jù)討問,一次寫入多次讀寫,不支持修改(hadoop0.21之前支持Append),
4、廉價(jià)硬件,HDFS可以應(yīng)用在普通PC機(jī)上。
5、硬件故障,HDFS認(rèn)為所有計(jì)算機(jī)都可能會(huì)出問題,通過多個(gè)副本解決。
RacklRack2
HDFS讀寫文件流程-讀
2:獷bkxklocations
1:openDistributedNameNode
HDFSFHeSystem
dient3:read
FSOatanamenode
Inputstream
clientJVM
dientnode
4:read5:read
▼
DataNodeDataNodeDataNode
datanodedatanodedatanode
HDFS讀寫文件流程-寫
HDFS常見操作:
hadoopfs-Is/data/查看所有文件
hadoopfs-du-h/data/列出匹配pattern的指定的文件系統(tǒng)空間總量(單位bytes),等
價(jià)于unix下的針對(duì)目錄的du
hadoopfs-chmod[-R]777/data/修改文件的權(quán)限,-上標(biāo)記遞歸修改。
hadoopfs-mkdir/data/在指定位置創(chuàng)建目錄。
hadoopfs-putxxx.log/data/從本地系統(tǒng)拷貝文件到DFS。
hadoopfs-rmrZdala/xxx.log歸刪掉所有的文件和目錄,等價(jià)于unix下的rm-rf<src>o
hadoopfs-get/data/xxx.log從DFS拷貝文件到本地文彳匕系統(tǒng)
hadoopfs-getmerge/data/*Jogdata.log顧名思義,從DFS拷貝多個(gè)文件、合并排序?yàn)橐粋€(gè)
文件到本地文件系統(tǒng)。
hadoopfs-mv<src><dsl>:將制定格式的文件move到指定的目標(biāo)位置。當(dāng)src為多個(gè)文
件時(shí),dst必須是個(gè)目錄
HDFS實(shí)踐?HDFSjavaAPI讀操作
Configurationconf=newConfiguration();
PathuriPath=newPath(uri);
try(
hdfs=FileSystem.get(|uriPath|.toUri(),conf);
ycatch(lOExceptxone)<
e.printStackTrace();
hdfs=null;
y
if(hdfs=null)return;
List<String>configPaths=newLinkedList<String>();
SDatalnputStreamdis=null;
BufferedReaderin=null;
try<_________
if(hdfs.exists(uriPath)){
d±s=hdfs.open(uriPath);
in=newBufferedReader(newInputStreamReader(dis,"LFTF-8"));
Stringline=null;
while((line=in.readLine())!=null){
line=line.trim();
configPaths.addCline):
?
}catch(lOExceptxone)<
e.printStackTrace();
>finally<
if(in!=nulLX
try{
±n.close();
}catch(TOExceptione){
e.printStackTrace();
y
y
_____Hd±s!=_______________________________________________________________
HDFS實(shí)踐?HDFSjavaAPI寫操作
//寫入一個(gè)文件到HDFS
Conf±gurationconf=newConf±guration();
FileSystemfileS=FileSystem.get(conf);
FileSystemlocaLFile=FileSystem.getLocal(conf);
Pathinput=newPath("/User/ryan/1.txt");
Pathout=newPath("/dara/l.txt");
try<
-FileStatus(]inpirtFiLe=LocalFile.TistStatus(input);
FSDataOutputStreamoutStream=fileS.create(out);
for(inti=0;i<inputFile.length;{
System.out.printLn(inputFile[i]-getPath().getName());
FSDa±aInputS±reamin-locatFilc.opcn(inpu±Filc(i].gc±Pa±h());
bytebuffer[]=newbyte[1024];
intbytesRead=0;
//邊讀邊寫
while((bytesRead=in.read(buffer))=>0){
outStream.write(buffer,0,bytesRead);
in.cTose();
>catch(Exceptione){
e.printStackTrace();
HDFS數(shù)據(jù)采用壓縮,減少磁盤空間占,和傳輸時(shí)候的帶寬占用,用(LZO,SNAPPY,GZIP
等),對(duì)于可以拆分的數(shù)據(jù)塊大小>=blocksize,不支持分拆文件采用接近blocksize大小
(Mapreduce)<,數(shù)據(jù)分布在各個(gè)節(jié)點(diǎn)需要均勻,避免熱點(diǎn)(讀寫性能)
MapReduce經(jīng)驗(yàn)與教訓(xùn)
數(shù)據(jù)傾斜問題
背景:H前消息統(tǒng)計(jì)采用以下流程messageid維度二>taskid_叩pid維度=>1)taskid維度2)
appid維度
問題:由于部分taskid或者部分appid推送量很大,導(dǎo)致以taskid,appid為key的mapreduce
出現(xiàn)部分reducer待處理的數(shù)據(jù)量很多,造成reducer之間處理數(shù)據(jù)量嚴(yán)重不平衡,任務(wù)在最
后99%時(shí)候卡在個(gè)別reducertask上。
解決:依據(jù)數(shù)據(jù)特性在以taskid,appid為key的mapreduce中添加combiner,讓數(shù)據(jù)在map
完成之后直接在本地機(jī)器上進(jìn)行一次合并,減少數(shù)據(jù)本身數(shù)量級(jí),進(jìn)而降低部分reducer負(fù)
載過高問題
數(shù)據(jù)大量重復(fù)在reduce中部分日志出現(xiàn)大量重復(fù),但是一個(gè)reducer進(jìn)行行處理的時(shí)候出現(xiàn)
內(nèi)存使用開銷很大或者部分reducer出現(xiàn)數(shù)據(jù)傾斜,解決:采用combiner在本地做一次數(shù)據(jù)
去重?zé)?/p>
shuffle階段出現(xiàn)內(nèi)存不足
例如任務(wù)提示:Error:java.lang.OutOfMcmoryError:Javaheapspaceat
org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOulpu(Copier.shuffleInMemory(Red
uceTask.java:1640)
解決://設(shè)置sort完成后reduce計(jì)算階段用來緩存數(shù)據(jù)的百分比
conf.setFloat("mapred.job.shuffle.input.buffer.percent",0.30f);
conf.sctFloat("maprcducc.job.shufflc.input.buffcr.pcrccnt",0.30f);
一次計(jì)算很多天數(shù)據(jù)占用集群大部分資源并且無法保證任務(wù)成功完成
背景:
標(biāo)注LBS數(shù)據(jù)時(shí),一次性將I個(gè)月行為數(shù)據(jù)放入在一個(gè)MRJob中執(zhí)行,導(dǎo)致占用掉Hado。
集群大部分資源,和其他任務(wù)搶占資源,執(zhí)行3個(gè)小時(shí)最后IO超時(shí)異常。
解決:
避免蠻力計(jì)算,將超大規(guī)模數(shù)據(jù)分拆多次計(jì)算,得到降低數(shù)量級(jí)后的中間結(jié)果,然后再合并
計(jì)算。好處:1、延長(zhǎng)時(shí)叵,保證程序穩(wěn)定。2、程序即使異常數(shù)據(jù)也不用從頭開始計(jì)算,可
以利用中間結(jié)果。
問題:
輸入的數(shù)據(jù)中部分文件不可分割,導(dǎo)致單個(gè)reducer負(fù)載過高,task卡住不進(jìn)行下去。
解決:
I、一個(gè)是數(shù)據(jù)在導(dǎo)入之前分割好來
2、轉(zhuǎn)化為可以分割的格式。
Hive簡(jiǎn)介
Hive是基于Hadoop的一個(gè)數(shù)據(jù)倉庫工具,可以將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫表,
并提供簡(jiǎn)單的sql查詢功能,可以將sql語句轉(zhuǎn)換為MapReduce任務(wù)進(jìn)行運(yùn)行。
優(yōu)點(diǎn)是學(xué)習(xí)成本低,可以通過類SQL語句快速實(shí)現(xiàn)簡(jiǎn)單的MapReduce統(tǒng)計(jì),不必開發(fā)專門
的M叩Reduce應(yīng)用,十分適合數(shù)據(jù)倉庫的統(tǒng)計(jì)分析。
H2c體系結(jié)構(gòu)
Hive實(shí)踐
分區(qū)添加
altertableuscr_qucry_poi_qunaraddpartition(day=20141125)location
7data_resul(/lbs/user_query_poi_qunar/20141125,;
統(tǒng)計(jì)
useuser;
droptableuser_qu
ROWFORMATDELIMITED
FIELDSTERMINATEDBYT
LOCATION7data_result/lbs/usery_poi_qunar_airport_cid_count;
CREATEEXTERNALTABLEuser_query_poi_qunar_airport_cid_coun((namesiring,count
bigint)
er_query_poi_qunar_airpon_cid_count';
insertoverwritetableuser_queiy_poi_qunar_airport_cid_countselect
count(distinct(cid)),concat(poi_name,"\t",city)fromuser_query_poi_qunarwhereday=20141125
andtag='bt_airport'groupbyconcat(poi_name,"\t",city);
UDF
例如我們有個(gè)自定義jar支持md5方法,udf.jar
1、添加jar
hive>addjar/app/yuankai/hadoopjars/gexin_hive_udf.jar;
2、關(guān)聯(lián)內(nèi)置函數(shù)hive>createtemporaryfunctionmd5ascom.igexin.hive.udf.Md5,;
3、執(zhí)行內(nèi)置函數(shù)hivc>sclcctmd5('tcst')fromtestlimit1;
其它內(nèi)置函數(shù)類實(shí)現(xiàn)方式:
publicclassXXXUDFextendsUDF{
publicTextevahiate(Object...args){〃實(shí)現(xiàn)操作returnnewText("yourresult");}}
comigexinhiveudf
orgapache.hadoophiveqlexec.UDF
orgapachehadoopioText
bl■cSelectlntsalledAppUDF{
Textevaluate(Objectargs){
(args.length4){
//com.tencent.pao;116;20141119;20141119;20141119#com.tencent.qq;112;20141118
String[]apps((String)args[0])spliti"#")
(appslength0)0
Stringpackageld(String)args[1]
startDayLongparseLongt(String)args[2])
endDayLongparseLongt(String)args[3])
(Stringappapps){
//com.tencent.pao;116;20141119;20141119;20141119
String[]arrappsplit。';"1)
(arr.length4){
(packageld.equals(arr[0])){
installed_dayLongparseLongi(String)arr[2])
(installed_daystartDayinstall.ed_dayendDay){
returnneuText(app);
}
)
}
}
}
returnnewTextC,H);
}
}
Hive實(shí)踐-遠(yuǎn)程調(diào)用
javasqlSQLException
HiveJdbcClient(
privat*finalStringdriverName"org.apache.hive.jdbc.HiveDriver"
privatefinalst?iStringjdbcUrl-jdbc:hive2://192.168.2.249:19999/defauXf
privatefinalStringjdbcUrl_user"user"
privatefinalstaticStringjdbcurl_pwd"yuanKai”
pu()li'.sf.:i.voidnain(String[]args)thrczSQLException{
tr(
Class.forA/ame(driverName)
)(ClassNotFoundExceptione){
Systemoutprintln(HClassNotFoundException:"egetMessage())
eprintStackTrace()
System.exit(l);
)
ConnectionconDriverManagergetConnection(jdbcUrIjdbcUrl_userjdbcllrl_pwd)
StatementstmtconcreateStatement()
Stringsqlnull;
ResultSet-null;|
stmtexecuteC'usereport_ods_mdp")
StringsubTaskidargs(0]
StringstartActionargs(1)
StringendActionargs(2)
StringstartDayargs(3]
StringendDayargs(4]
sql“selectdistinct(token_md5)fromreport_ods_mdp.msg_aswheretask_id=
subTaskidandaction_id>='*startAction"andaction_id<="endAction
?'andday>="startDay"andday<="endDay
resstmt.executeOuery(sql);
whili(res.next()){
Systemoutprintln(resgetString(l))
〃保存獲取到Cd到對(duì)應(yīng)表格中
//...
用戶是不是在某個(gè)時(shí)間內(nèi)安裝了的內(nèi)置函數(shù)
\addjar/app/yuankai/hadoopjars/hive-udf.jar;
createtemporaryfunctiongx_selectinstallas'com.igexin.hive.udf.SelectlntsallcdApp';
使用舉例:
select*fromuser_app_infowhere
gx.selectinstalKinstalled:'com.tencent.pao\'^OMIII9";'2O141119")!=nulllimiti;
褊一部分:產(chǎn)生背景
產(chǎn)生背景
?為了滿足客戶個(gè)性化的需求,Hive被設(shè)計(jì)成一個(gè)很開放的系統(tǒng),很多內(nèi)容都支持用戶定制,包括:
?文件格式:TextFile.SequenceFile
?內(nèi)存中的數(shù)據(jù)格式:JavaInteger/String,HadoopIntWritable/Text
?用戶提供的map/reduce腳本:不管什么語言,利用stdin/stdout傳輸數(shù)據(jù)
?用戶自定義函數(shù)
自定義函數(shù)
?雖然Hive提供了很多函數(shù),但是有些還是難以滿足我們的需求。因此Hive提供了自定義函數(shù)開發(fā)
?自定義函數(shù)包括三種UDF、JADF、UDTF
?UDF(User-Defined-Functon)
?UDAF(User-DefinedAggregationFuncation)
?UDTF(User-DefinedTable-GeneratingFunctions)用來解決輸入一行輸出多行(On-to-many
maping)的需求。
HIVE中使用定義的函數(shù)的三種方式
?在HIVE會(huì)話中add自定義函數(shù)的jar文件,然后創(chuàng)建function,繼而使用函數(shù)
?在進(jìn)入HIVE會(huì)話之前先自動(dòng)執(zhí)行創(chuàng)建function,不用用戶手工創(chuàng)建
?把自定義的函數(shù)寫到系統(tǒng)函數(shù)中,使之成為HIVE的一個(gè)默認(rèn)函數(shù),這樣就不需要createtemporary
function
第二部分:UDF
UDF用法
?UDF(User-Defined-Function)
?UDF函數(shù)可以直接應(yīng)用于select語句,對(duì)查詢結(jié)構(gòu)做格式化處理后,再輸出內(nèi)容
?編寫UDF函數(shù)的時(shí)候需要注意一下幾點(diǎn)
?自定義UDF需要繼承org.apache.hadoop.hive.ql.UDF
?需要實(shí)現(xiàn)evaluate函數(shù)
?evaluate函數(shù)支持重載
?UDF只能實(shí)現(xiàn)一進(jìn)一出的操作,如果需要實(shí)現(xiàn)多進(jìn)一出,則需要實(shí)現(xiàn)UDAF
UDF用法代碼示例
importorg.apache.Hadoop.hive.ql.exec.UDF
publicclassHellowordextendsUDF(
publicStringevaluate(){
return"helloworld!";
}
publicStringevaluate(Stringstr){
return"helloworld:"+str;
}
)
開發(fā)步驟
?開發(fā)代碼
?把程序打包放到目標(biāo)機(jī)器上去
?進(jìn)入hive客戶端
?添加jar包:hive>addjar/run/jar/udf_test.jar;
?創(chuàng)建臨時(shí)函數(shù):hive〉CREATETEMPORARYFUNCTIONmy_addAS'com.hive.udf.Add'
?查詢HQL語句:
?SELECTmy_add(8,9)FROMscores;
?SELECTmy_add(scores.math,scores.art)FROMscores;
?銷毀臨時(shí)函數(shù):hive>DROPTEMPORARYFUNCTIONmy_add;
?細(xì)節(jié)
?在使用UDF的時(shí)候,會(huì)自動(dòng)進(jìn)行類型轉(zhuǎn)換,例如:
SELECTmy_add(8,9.1)FROMscores;
?結(jié)果是17.1,UDF將類型為Int的參數(shù)轉(zhuǎn)化成double。類型的飲食轉(zhuǎn)換是通過UDFResolver來進(jìn)行
控制的
第三部分:UDAF
UDAF
?Hive查詢數(shù)據(jù)時(shí),有些聚類函數(shù)在HQL沒有自帶,需要用戶自定義實(shí)現(xiàn)
?用戶自定義聚合函數(shù):Sum,Averagen-1
?UDAF(User-DefinedAggregationFuncation)
用法
??下兩個(gè)包是必須的importorg.apache.hadoop.hive.ql.exec.UDAF和
org.apache,hadoop.hive.ql.exec.UDAFEvaluator
開發(fā)步驟
?函數(shù)類需要繼承UDAF類,為部類Evaluator實(shí)UDAFEvaluator接口
?Evaluator"需要實(shí)現(xiàn)init、iterate、terminatePartiaKmerge>terminate這幾個(gè)函數(shù)
a)init函數(shù)實(shí)現(xiàn)接口UDAFEvaluator的init函數(shù)。
b)iterate接收傳入的參數(shù),并進(jìn)行內(nèi)部的輪轉(zhuǎn)。其返回類型為boolean。
c)terminatePartial無參數(shù),其為iterate函數(shù)輪轉(zhuǎn)結(jié)束后,返回輪轉(zhuǎn)數(shù)據(jù),terminatePartial類似于
hadoop的Combiner,
d)merge接收terminatePartial的返回結(jié)果,進(jìn)行數(shù)據(jù)merge操作,其返回類型為boolean。
e)terminate返回最終的聚親函數(shù)結(jié)果。
執(zhí)行步驟
?執(zhí)行求平均數(shù)函數(shù)的步驟
a)將java文件編譯成Avg_test.jar。
b)進(jìn)入hive客戶端添加jar包:
hive>addjar/run/jar/Avg_test.jaro
c)創(chuàng)建臨時(shí)函數(shù):
hive>createtemporaryfunctionavg_test'hive.udaf.Avg';
d)查詢語句:
hivoselectavg_test(scoies.math)fromscores;
e)銷毀臨時(shí)函數(shù):
hive>droptemporaryfunctionavg_test;
UDAF代碼示例
publicclassMyAvgextendsUDAF{
publicstaticclassAvgEvauatorimplementsUDAFEvalustor(
}
publicvoidinit()
publicbooleaniterate(Doubleo){}
publicAvgStateterminatePartial(){}
publicbooleanterminateFartial(Doubleo){}
publicDoubleterminate(){}
)
第四部分:UDTF
UDTF
?UDTF(User-DefinedTable-GeneratingFunctions)用來解決輸入?行輸出多行(On-to-many
maping)的需求。
開發(fā)步驟
?UDTF步驟:
?必須繼承org.apache.Hadoop.hive.ql.udf.generic.GenericUDTF
?實(shí)現(xiàn)initialize,process,close三個(gè)方法
?UDTF首先會(huì)
?調(diào)用initialize方法,此方法返回UDTF的返回行的信息(返回個(gè)數(shù),類型)
初始化完成后,會(huì)調(diào)用process方法,對(duì)傳入的參數(shù)進(jìn)行處理,可以通過forword。方法把結(jié)果返回
?最后close。方法調(diào)用,對(duì)辭要清理的方法進(jìn)行清理
使用方法
?UDTF有兩種使用方法,一種直接放到select后面,一種和lateralview一起使用
?直接select中使用:selectexplode_map(properties)as(coll,col2)fromsrc;
?不可以添加其他字段使用:selecta,explode_map(properties)as(coll,col2)fromsrc
?不可以嵌套調(diào)用:selectexplode_map(explode_map(properties))fromsrc
?不可以和groupby/clusterby/distributeby/sortby一起使用:selectexplode_map(properties)
as(collzcol2)fromsrcgroupbycoll,col2
?和lateralview一起使用:selectsrc.id,mytable.coll,mytable.col2fromsrclateralview
explode_map(properties)mytableascoll,col2;
此方法史為方便H常使用。執(zhí)行過程相當(dāng)于單獨(dú)執(zhí)行了兩次抽取,然后union到一個(gè)表里。
lateralview
?LateralView語法
?lateralview:LATERALVIEWudtf(expression)tableAliasAScolumnAliascolumnAlias)*
fromClause:FROMbaseTable(lateralview)*
?LateralView用于UDTF(user-definedtablegeneratingfunctions)中將行轉(zhuǎn)成列,例如explode().
?目前LateralView不支持有上而下的優(yōu)化。如果使用Where子句,查詢可能將不被編譯。解決方法見:
此時(shí),在查詢之前執(zhí)行sethive.optimize.ppd=false;
?例子
?pageAdy。它有兩個(gè)列
stringpageidArray<int>adidlist
"front_page"[1,2,3]
"contact_page"[3,4,5]
?SELECTpageid,adidFROMpageAdsLATERALVIEWexplode(adidjist)adTableASadid;
?將輸出如下結(jié)果
stringpageidintadid
"front_page"1
Mcontact_page"3
代碼示例
publicclassMyUDTFextendsGenericUDTF{
publicStructObjectlnspectorinitialize(ObjectInspector[]args){}
publicvoidprocess(Object[]args)throwsHiveException(}
}
實(shí)現(xiàn):切分"key:value;key:value;”這種字符串,返回結(jié)果為key,value兩個(gè)字段
Hive使用經(jīng)驗(yàn)
/結(jié)果顯示設(shè)置列名
sethive.cli.print.header=true;
〃壓縮設(shè)置
seterniediate=true;
setprcss.output=false;
〃設(shè)置并行開啟和數(shù)量
sethive.exec.parallel=true;
sethive.exec.parallel.thread.number=10;
1)使用外表,避免drop的時(shí)候?qū)⒃磾?shù)據(jù)刪除。
2)對(duì)于較大兩個(gè)表格進(jìn)行關(guān)聯(lián)查詢的時(shí)候,盡可能將數(shù)據(jù)預(yù)先處理
(例如篩選到單獨(dú)表格),然后再join。
3)在數(shù)據(jù)入庫的時(shí)候,依照時(shí)間或者某個(gè)屬性進(jìn)行分區(qū),方便查詢
時(shí)能夠在一個(gè)分區(qū)內(nèi)檢索,提高效率。
4)復(fù)雜字段使用自定義內(nèi)置函數(shù),避免超級(jí)復(fù)雜的sql。
分桶
CREATETABLEbucketeduser(idINT)nameSTRING)
CLUSTEREDBY(id)INTO4BUCKETS;
對(duì)于每一個(gè)表(table)或者分區(qū),Hive可以進(jìn)一步組織成桶,也就是說桶是更為細(xì)粒度的
數(shù)據(jù)范圍劃分。Hive也是針對(duì)某一列進(jìn)行桶的組織。Hive采用對(duì)列值哈希,然后除以桶的
個(gè)數(shù)求余的方式?jīng)Q定該條記錄存放在哪個(gè)桶當(dāng)中。
把表(或者分區(qū))組織成桶(Bucket)有兩個(gè)理由:
(1)獲得更高的查詢處理效率。桶為表加上了額外的結(jié)構(gòu),Hive在處理有些查詢時(shí)能利用
這個(gè)結(jié)構(gòu)。具體而言,連接兩個(gè)在(包含連接列的)相同列上劃分了桶的表,可以使用M叩端
連接(Map-sidejoin)高效的實(shí)現(xiàn)。比如JOIN操作。對(duì)于JOIN操作兩個(gè)表有一個(gè)相同的
列,如果對(duì)這兩個(gè)表都進(jìn)行了桶操作。那么將保存相同列值的桶進(jìn)行JOIN操作就可以,可
以大大較少JOIN的數(shù)據(jù)量。
(2)使取樣(sampling)更高效。在處理大規(guī)模數(shù)據(jù)集時(shí),在開發(fā)和修改查詢的階段,如
果能在數(shù)據(jù)集的一小部分?jǐn)?shù)據(jù)上試運(yùn)行查詢,會(huì)帶來很多方便。
分區(qū)
分區(qū)是以字段的形式在表結(jié)構(gòu)中存在,通過describetable命令可以查看到字段存在,但是
該字段不存放實(shí)際的數(shù)據(jù)內(nèi)容,僅僅是分區(qū)的表示(偽列)。
(1)靜態(tài)分區(qū)
createtableifnotexistssopdm.wyp2(idstring,telstring)
partitionedby(ageint)
rowformatdelimited
fieldsterminatedby
storedastextfile;
-overwrite是覆蓋,into是追加
insertintotablesopdm.w/p2
partition(age='25')
selectid,name,telfromsopdm.wyp;
(2)動(dòng)態(tài)分區(qū)
--設(shè)置為true表示開啟動(dòng)態(tài)分區(qū)功能(默認(rèn)為false)
sethive.exec.dynamic.partition=true;
-設(shè)置為nonstrict,表示允許所有分區(qū)都是動(dòng)態(tài)的(默認(rèn)為strict)
sethive.exec.dynamic.partition.mode=nonstrict;
-insertoverwrite是覆蓋,insertinto是追加
sethive.exec.dynamic.partition.mode=nonstrict;
insertoverwritetablesopdm.wyp2
partition(age)
selectid,name,tel,agefromsopdm.wyp;
Hive服務(wù):
hive命令行模式,直接輸入#/hive/bin/hive的執(zhí)行程序,或者輸
入#hive-servicecli
2、hiveweb界面的(端口號(hào)9999)啟動(dòng)方式
#hive-servicehwi&
用于通過瀏覽器來訪問hive
http://hadoopO:9999/hwi/
3、hive遠(yuǎn)程服務(wù)(端口號(hào)10000)啟動(dòng)方式(java訪問必須打開)
#hive一servicehiveserver&
與數(shù)據(jù)庫中的Table在概念上是類似
每一個(gè)Table在Hive中都有一個(gè)相應(yīng)的目錄存儲(chǔ)數(shù)據(jù)。例如,一個(gè)表test,它
在HDFS中的路徑為:/warehouse/testowarehouse是在hive-site.xml中
由${hive.metastore,warehouse,dir}
指定的數(shù)據(jù)倉庫的目錄所有的Table數(shù)據(jù)(不包括ExternalTable)都保存在這個(gè)目
錄中。刪除表時(shí),元數(shù)據(jù)與數(shù)據(jù)都會(huì)被刪除
內(nèi)部表和外部表的區(qū)別:
內(nèi)部表:的創(chuàng)建過程和數(shù)據(jù)加載過程(這兩個(gè)過程可以在同一個(gè)語句中完成),在加載數(shù)據(jù)
的過程中,實(shí)際數(shù)據(jù)會(huì)被移動(dòng)到數(shù)據(jù)倉庫目錄中;之后對(duì)數(shù)據(jù)對(duì)訪問將會(huì)直接在數(shù)據(jù)倉庫目
錄中完成。刪除表時(shí),表中的數(shù)據(jù)和元數(shù)據(jù)將會(huì)被同時(shí)刪
除。
外部表:只有一個(gè)過程,加載數(shù)據(jù)和創(chuàng)建表同時(shí)完成,并不會(huì)移動(dòng)到數(shù)據(jù)倉庫H錄中,只是
與外部數(shù)據(jù)建立一個(gè)鏈接,當(dāng)刪除一個(gè)外部表時(shí),僅刪除該鏈接
HiveSql:
創(chuàng)建內(nèi)部表createtableinner_table(keystring);〃如果不指定分隔符,默
認(rèn)分隔符為
createtableinner_table(keystring)rowformatdelimitedfieldsterm
inatedby〃指定分隔符
createtableinner_table(keystring)rowformatdelimitedfieldsterm
inatedbystoredasSEQUENCEFILE;〃用哪種方式存儲(chǔ)數(shù)據(jù),SEQUENCEFILE
是hadoop自帶的文件壓縮格式
查看表結(jié)構(gòu)
describeinner_table;或者descinner_table;
加載數(shù)據(jù)
loaddatalocalinpath*/root/test,txt'intotableinnertable;
loaddatalocalinpath1/root/test.txt*overwi'iterintotableinnertab
le;〃數(shù)據(jù)有誤,重新加載數(shù)據(jù)
查看數(shù)據(jù)select*frominner_table
selectcount(*)frominnertable
刪除表droptableinrer_table
重命名表altertableinnertablerenametonew_table_name;
修改字段altertableinnertablechangekeykey1;
添加字段altertableinner_tableaddcolumns(valuestring);
分區(qū)相關(guān):
創(chuàng)建分區(qū)表
createtablepartition_tab1e(namestring,salaryfloat,genderstring,level
string)partitionedby(dtstring,depstring)rowformatdelimitedfie
Idsterminatedbystoredastextfile;
查看表有哪些分區(qū)SHOWPARTITIONSpartition.table;
查看表結(jié)構(gòu)describepartitiontable;或
者dosepartitiontable;
加載數(shù)據(jù)到分區(qū)
loaddatalocalinp
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 皮影文化課題申報(bào)書
- 智能農(nóng)場(chǎng)研究課題申報(bào)書
- 課題項(xiàng)目申報(bào)書研究?jī)?nèi)容
- 教師課題申報(bào)書講座視頻
- 課題立項(xiàng)申報(bào)書如何上傳
- 怎么寫科研課題申報(bào)書
- 教育學(xué) 課題申報(bào)書
- 怎樣查課題申報(bào)書
- 課題申報(bào)評(píng)審書注意事項(xiàng)
- 課題申報(bào)書選題
- (正式版)JBT 14660-2024 額定電壓6kV到30kV地下掘進(jìn)設(shè)備用橡皮絕緣軟電纜
- 本科院校-基礎(chǔ)醫(yī)學(xué)-醫(yī)學(xué)細(xì)胞生物學(xué)-第二章 細(xì)胞的概念與分子基礎(chǔ)
- iso37001-2016反賄賂管理手冊(cè)程序文件表單一整套
- 新蘇教版科學(xué)六年級(jí)下冊(cè)全冊(cè)教案(含反思)
- 火災(zāi)自動(dòng)報(bào)警系統(tǒng)檢查表
- 高速公路橋頭跳車判別和處治
- 骨髓細(xì)胞圖譜
- 建筑工程分部分項(xiàng)工程劃分表(新版)
- 勃利縣大四站鎮(zhèn)侵蝕溝治理工程施工組織設(shè)計(jì)
- 公路瀝青路面設(shè)計(jì)標(biāo)準(zhǔn)規(guī)范
- 普通高中歷史課程標(biāo)準(zhǔn)(2022年版2023年修訂)解讀
評(píng)論
0/150
提交評(píng)論