數(shù)據(jù)集成工具:Apache Nifi:使用Nifi進行實時數(shù)據(jù)處理_第1頁
數(shù)據(jù)集成工具:Apache Nifi:使用Nifi進行實時數(shù)據(jù)處理_第2頁
數(shù)據(jù)集成工具:Apache Nifi:使用Nifi進行實時數(shù)據(jù)處理_第3頁
數(shù)據(jù)集成工具:Apache Nifi:使用Nifi進行實時數(shù)據(jù)處理_第4頁
數(shù)據(jù)集成工具:Apache Nifi:使用Nifi進行實時數(shù)據(jù)處理_第5頁
已閱讀5頁,還剩22頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領

文檔簡介

數(shù)據(jù)集成工具:ApacheNifi:使用Nifi進行實時數(shù)據(jù)處理1數(shù)據(jù)集成工具:ApacheNifi1.1Nifi的歷史和發(fā)展ApacheNifi是一個開源的數(shù)據(jù)處理和分發(fā)系統(tǒng),由美國國家安全局(NSA)開發(fā),并于2014年貢獻給Apache軟件基金會。Nifi的設計初衷是為了自動化數(shù)據(jù)流的管理和處理,提供一個易于使用、功能強大的平臺,用于數(shù)據(jù)的收集、聚合和傳輸。自2014年以來,Nifi經(jīng)歷了多個版本的迭代,不斷引入新功能,優(yōu)化性能,增強安全性,使其成為數(shù)據(jù)集成領域的熱門工具。1.2Nifi的核心功能和優(yōu)勢1.2.1核心功能數(shù)據(jù)收集與傳輸:Nifi能夠從各種數(shù)據(jù)源收集數(shù)據(jù),如文件系統(tǒng)、數(shù)據(jù)庫、消息隊列等,并將數(shù)據(jù)傳輸?shù)街付ǖ哪康牡?,如?shù)據(jù)倉庫、大數(shù)據(jù)平臺等。數(shù)據(jù)處理與轉(zhuǎn)換:通過一系列的處理器,Nifi可以對數(shù)據(jù)進行清洗、轉(zhuǎn)換、富化等操作,滿足數(shù)據(jù)處理的多樣化需求。數(shù)據(jù)路由與分發(fā):Nifi支持基于內(nèi)容的路由,可以根據(jù)數(shù)據(jù)內(nèi)容的不同,將數(shù)據(jù)分發(fā)到不同的下游處理器或輸出通道。監(jiān)控與管理:Nifi提供了豐富的監(jiān)控和管理功能,用戶可以實時查看數(shù)據(jù)流的狀態(tài),調(diào)整數(shù)據(jù)處理的策略,以及管理數(shù)據(jù)流的生命周期。1.2.2優(yōu)勢零編程:Nifi采用圖形化界面,用戶可以通過拖拽處理器和連接線來構建數(shù)據(jù)流,無需編寫代碼,大大降低了使用門檻??蓴U展性:Nifi支持插件機制,用戶可以開發(fā)自定義的處理器、控制器服務等,以滿足特定的數(shù)據(jù)處理需求。安全性:Nifi內(nèi)置了強大的安全機制,支持數(shù)據(jù)加密、身份驗證和授權,確保數(shù)據(jù)在傳輸和處理過程中的安全。高可用性:Nifi支持集群部署,可以實現(xiàn)數(shù)據(jù)流的高可用性和負載均衡,確保數(shù)據(jù)處理的穩(wěn)定性和效率。1.2.3示例:使用Nifi進行數(shù)據(jù)收集與傳輸假設我們有一個日志文件,需要將其數(shù)據(jù)傳輸?shù)紿DFS中。以下是使用Nifi實現(xiàn)這一過程的步驟:創(chuàng)建數(shù)據(jù)源處理器:在Nifi的畫布上,拖拽一個GetFile處理器,配置其輸入目錄為日志文件所在的目錄。創(chuàng)建數(shù)據(jù)目標處理器:拖拽一個PutHDFS處理器,配置其目標HDFS的路徑。連接處理器:使用連接線將GetFile處理器的輸出連接到PutHDFS處理器的輸入。啟動數(shù)據(jù)流:保存并啟動數(shù)據(jù)流,Nifi將自動從指定目錄讀取日志文件,并將其傳輸?shù)紿DFS中。Nifi配置示例在GetFile處理器中,配置InputDirectory為/path/to/log/directory,在PutHDFS處理器中,配置Directory為/hdfs/path/to/destination。數(shù)據(jù)樣例假設日志文件的內(nèi)容如下:2023-04-0112:00:00INFOUserAloggedin.

2023-04-0112:01:00ERRORErroroccurredwhileprocessingdata.

2023-04-0112:02:00DEBUGDataprocessedsuccessfully.代碼示例Nifi本身是一個圖形化工具,不涉及代碼編寫,但以下是一個使用NifiAPI通過Python腳本啟動和停止數(shù)據(jù)流的示例:#導入必要的庫

fromnipyapiimportcanvas,nifi

#獲取Nifi的URL

nifi_url='http://localhost:8080/nifi-api'

#獲取數(shù)據(jù)流的ID

flow_id=canvas.get_flow('MyDataFlow').id

#啟動數(shù)據(jù)流

nifi.start_flow(flow_id)

#停止數(shù)據(jù)流

nifi.stop_flow(flow_id)1.2.4結論ApacheNifi以其強大的數(shù)據(jù)集成能力、易用性、可擴展性和安全性,成為實時數(shù)據(jù)處理領域的有力工具。通過Nifi,用戶可以構建復雜的數(shù)據(jù)流,實現(xiàn)數(shù)據(jù)的自動化處理和分發(fā),極大地提高了數(shù)據(jù)處理的效率和靈活性。2安裝和配置Nifi2.1在Windows上安裝Nifi2.1.1前提條件確保你的Windows系統(tǒng)上已經(jīng)安裝了Java8或更高版本。你擁有管理員權限,以便進行軟件安裝。2.1.2安裝步驟下載Nifi安裝包:訪問ApacheNifi的官方網(wǎng)站,下載最新版本的Nifi安裝包。通常,下載的是一個.zip文件。解壓縮文件:將下載的.zip文件解壓縮到你選擇的目錄中。例如,你可以解壓縮到C:\ProgramFiles\ApacheNifi目錄下。配置環(huán)境變量:打開“系統(tǒng)屬性”->“高級”->“環(huán)境變量”。在“系統(tǒng)變量”區(qū)域,找到并雙擊Path變量。添加Nifi的bin目錄到Path變量中,例如C:\ProgramFiles\ApacheNifi\bin。啟動Nifi:打開命令提示符。輸入nifi.shstart(如果使用的是Unixshell)或nifi.batstart(對于Windows),然后按Enter鍵。Nifi將在后臺啟動,你可以通過訪問http://localhost:8080/nifi在瀏覽器中查看Nifi界面。2.1.3注意事項確保防火墻設置允許Nifi服務的運行。如果遇到權限問題,嘗試以管理員身份運行命令提示符。2.2在Linux上安裝Nifi2.2.1前提條件Linux系統(tǒng)上已經(jīng)安裝了Java8或更高版本。你擁有sudo權限。2.2.2安裝步驟下載Nifi安裝包:使用wget或curl命令從ApacheNifi的官方網(wǎng)站下載最新版本的Nifi安裝包。wget/nifi/1.16.0/nifi-1.16.0-bin.zip解壓縮文件:使用unzip命令解壓縮下載的文件到你選擇的目錄中。unzipnifi-1.16.0-bin.zip-d/opt/配置環(huán)境變量:編輯/etc/environment文件,添加Nifi的bin目錄到PATH中。echo'PATH="/opt/nifi-1.16.0/bin:$PATH"'>>/etc/environment然后,更新環(huán)境變量。source/etc/environment啟動Nifi:使用以下命令啟動Nifi。sudo/opt/nifi-1.16.0/bin/nifi.shstart你可以在瀏覽器中通過訪問http://localhost:8080/nifi來查看Nifi界面。2.2.3注意事項確保你的Linux系統(tǒng)防火墻允許Nifi服務的端口。使用sudo命令時,確保你有正確的權限。2.3Nifi的初始配置2.3.1配置NpertiesNifi的配置文件perties位于conf目錄下。這個文件包含了Nifi運行的基本配置,包括日志級別、線程數(shù)量、數(shù)據(jù)存儲位置等。修改日志級別例如,如果你想將日志級別從默認的INFO改為DEBUG,可以在perties中找到以下行,并修改它。#LogLevel

nifi.log.level=DEBUG設置數(shù)據(jù)存儲位置Nifi的數(shù)據(jù)存儲位置默認是在安裝目錄下的content-repository目錄。你可以將其更改為其他位置,例如:#ContentRepository

nifi.content.repository.directory=/data/nifi/content-repository2.3.2配置bootstrap.confbootstrap.conf文件同樣位于conf目錄下,它包含了Nifi運行的高級配置,如JVM參數(shù)、系統(tǒng)屬性等。調(diào)整JVM堆內(nèi)存例如,你可以調(diào)整JVM的堆內(nèi)存大小,以適應你的數(shù)據(jù)處理需求。#JVMHeapSize

-Xms1g

-Xmx2g2.3.3啟動Nifi并驗證配置在修改了配置文件后,重新啟動Nifi服務,并通過Nifi的管理界面檢查配置是否生效。重啟Nifi在Windows上使用nifi.batstop和nifi.batstart,在Linux上使用sudo/opt/nifi-1.16.0/bin/nifi.shstop和sudo/opt/nifi-1.16.0/bin/nifi.shstart。驗證配置登錄到Nifi的管理界面,檢查日志和系統(tǒng)狀態(tài),確保配置更改已經(jīng)生效。通過以上步驟,你可以在Windows或Linux系統(tǒng)上成功安裝并配置ApacheNifi,為實時數(shù)據(jù)處理做好準備。3理解Nifi架構3.1Nifi的組件:處理器、控制器服務、連接、流程文件3.1.1處理器(Processors)在ApacheNiFi中,處理器是執(zhí)行數(shù)據(jù)流操作的基本單元。它們可以讀取、寫入、轉(zhuǎn)換、路由、過濾數(shù)據(jù),或者執(zhí)行更復雜的操作,如執(zhí)行外部系統(tǒng)調(diào)用。每個處理器都有特定的輸入和輸出,以及一組可配置的屬性,用于控制其行為。示例:PutFile處理器<processorid="12345678-1234-1234-1234-1234567890ab"type="cessors.standard.PutFile">

<name>PutFileExample</name>

<properties>

<property>

<name>FileName</name>

<value>${filename}</value>

</property>

<property>

<name>Directory</name>

<value>/path/to/directory</value>

</property>

</properties>

<scheduling>

<type>EVENT_DRIVEN</type>

<interval>1sec</interval>

<penalizationPeriod>30sec</penalizationPeriod>

<yieldPeriod>1sec</yieldPeriod>

</scheduling>

<inputPorts>

<inputPortid="12345678-1234-1234-1234-1234567890ac"name="input"position="100,100"/>

</inputPorts>

<outputPorts>

<outputPortid="12345678-1234-1234-1234-1234567890ad"name="success"position="200,100"/>

<outputPortid="12345678-1234-1234-1234-1234567890ae"name="failure"position="200,150"/>

</outputPorts>

</processor>此示例中的PutFile處理器將接收到的流程文件寫入指定的目錄。FileName屬性使用表達式語言${filename}來動態(tài)設置文件名,而Directory屬性指定了文件的保存位置。3.1.2控制器服務(ControllerServices)控制器服務提供對NiFi處理器和報告任務的共享和重用服務。它們可以是認證服務、加密服務、數(shù)據(jù)庫連接服務等,用于支持NiFi中的其他組件。示例:StandardJDBCDatabaseClient控制器服務<controllerServiceid="12345678-1234-1234-1234-1234567890af"type="org.apache.nifi.services.jdbc.StandardJDBCDatabaseClient"name="JDBCDatabaseClient">

<properties>

<property>

<name>ConnectionURL</name>

<value>jdbc:mysql://localhost:3306/mydatabase</value>

</property>

<property>

<name>Username</name>

<value>myuser</value>

</property>

<property>

<name>Password</name>

<value>mypass</value>

</property>

</properties>

</controllerService>此示例中的StandardJDBCDatabaseClient控制器服務配置了與MySQL數(shù)據(jù)庫的連接,用于執(zhí)行SQL查詢或更新。3.1.3連接(Connections)連接是NiFi中處理器之間的數(shù)據(jù)傳輸通道。它們定義了數(shù)據(jù)從一個處理器到另一個處理器的流動路徑。示例:連接處理器A到處理器B在NiFi的畫布上,通過拖拽從處理器A的輸出端口到處理器B的輸入端口,可以創(chuàng)建一個連接。連接可以配置為只傳遞成功的關系,或者也可以傳遞失敗的關系。3.1.4流程文件(FlowFiles)流程文件是NiFi中數(shù)據(jù)的基本單位。它們包含數(shù)據(jù)內(nèi)容和一組屬性,這些屬性可以被處理器讀取和修改,用于路由和處理數(shù)據(jù)。示例:流程文件屬性<flowFileid="12345678-1234-1234-1234-1234567890ab">

<attributes>

<attribute>

<name>filename</name>

<value>data.csv</value>

</attribute>

<attribute>

<name>content-type</name>

<value>text/csv</value>

</attribute>

</attributes>

<content>

"id","name","age"

"1","JohnDoe","30"

"2","JaneDoe","25"

</content>

</flowFile>此示例中的流程文件包含了CSV格式的數(shù)據(jù),以及filename和content-type兩個屬性,用于描述數(shù)據(jù)的元信息。3.2Nifi的流程設計和執(zhí)行在設計NiFi流程時,需要考慮數(shù)據(jù)的來源、處理邏輯和目標。流程通常由多個處理器組成,通過連接將它們鏈接起來,形成數(shù)據(jù)流。3.2.1設計流程設計流程時,首先確定數(shù)據(jù)的來源,然后選擇適當?shù)奶幚砥鱽硖幚頂?shù)據(jù),最后確定數(shù)據(jù)的目標。例如,從一個文件系統(tǒng)讀取數(shù)據(jù),使用GetFile處理器,然后使用ConvertRecord處理器將數(shù)據(jù)轉(zhuǎn)換為特定格式,最后使用PutFile處理器將處理后的數(shù)據(jù)寫入另一個文件系統(tǒng)。3.2.2執(zhí)行流程執(zhí)行流程時,NiFi會根據(jù)處理器的配置和連接的設置,自動處理數(shù)據(jù)流。每個處理器在接收到數(shù)據(jù)后,會根據(jù)其配置執(zhí)行相應的操作,并將結果傳遞給下一個處理器。示例:簡單的數(shù)據(jù)處理流程<processGroupid="12345678-1234-1234-1234-1234567890ab"name="SimpleDataProcessing">

<processorid="12345678-1234-1234-1234-1234567890ac"type="cessors.standard.GetFile">

<name>GetFile</name>

<properties>

<property>

<name>InputDirectory</name>

<value>/path/to/input/directory</value>

</property>

</properties>

</processor>

<processorid="12345678-1234-1234-1234-1234567890ad"type="cessors.standard.ConvertRecord">

<name>ConvertRecord</name>

<properties>

<property>

<name>RecordReader</name>

<value>CSVReader</value>

</property>

<property>

<name>RecordWriter</name>

<value>JSONWriter</value>

</property>

</properties>

</processor>

<processorid="12345678-1234-1234-1234-1234567890ae"type="cessors.standard.PutFile">

<name>PutFile</name>

<properties>

<property>

<name>Directory</name>

<value>/path/to/output/directory</value>

</property>

</properties>

</processor>

<connectionid="12345678-1234-1234-1234-1234567890af"source="12345678-1234-1234-1234-1234567890ac"sourcePort="success"destination="12345678-1234-1234-1234-1234567890ad"destinationPort="input">

<name>GettoConvert</name>

</connection>

<connectionid="12345678-1234-1234-1234-1234567890ag"source="12345678-1234-1234-1234-1234567890ad"sourcePort="success"destination="12345678-1234-1234-1234-1234567890ae"destinationPort="input">

<name>ConverttoPut</name>

</connection>

</processGroup>此示例中的流程首先使用GetFile處理器從指定的輸入目錄讀取文件,然后使用ConvertRecord處理器將CSV格式的數(shù)據(jù)轉(zhuǎn)換為JSON格式,最后使用PutFile處理器將處理后的數(shù)據(jù)寫入輸出目錄。通過理解這些組件和如何設計執(zhí)行流程,可以有效地使用ApacheNiFi進行實時數(shù)據(jù)處理和集成。4實時數(shù)據(jù)處理基礎4.1數(shù)據(jù)流的概念數(shù)據(jù)流(DataStream)是實時數(shù)據(jù)處理的核心概念,它指的是數(shù)據(jù)在時間上連續(xù)、快速、大量地到達,通常無法預知其大小和到達時間。在數(shù)據(jù)集成和實時處理場景中,數(shù)據(jù)流可以來自各種數(shù)據(jù)源,如傳感器、社交媒體、交易系統(tǒng)等。數(shù)據(jù)流處理的目標是在數(shù)據(jù)到達時立即進行處理,以提供即時的洞察和響應。4.1.1特點連續(xù)性:數(shù)據(jù)流是連續(xù)的,不像批處理數(shù)據(jù)那樣有明確的開始和結束。實時性:數(shù)據(jù)流處理需要在數(shù)據(jù)到達后立即處理,以實現(xiàn)低延遲的響應。無限性:數(shù)據(jù)流的大小通常是未知的,可能無限大。高吞吐量:數(shù)據(jù)流處理系統(tǒng)需要能夠處理高吞吐量的數(shù)據(jù),即每秒處理大量數(shù)據(jù)的能力。4.1.2應用場景實時監(jiān)控:如網(wǎng)絡流量監(jiān)控、設備狀態(tài)監(jiān)控等。實時分析:如社交媒體情緒分析、實時交易分析等。實時響應:如基于用戶行為的實時推薦系統(tǒng)、實時警報系統(tǒng)等。4.2使用Nifi進行數(shù)據(jù)采集ApacheNifi是一個強大的數(shù)據(jù)流處理和集成系統(tǒng),它提供了豐富的處理器(Processor)來采集、處理和分發(fā)數(shù)據(jù)。數(shù)據(jù)采集是實時數(shù)據(jù)處理的第一步,Nifi通過其處理器可以連接到各種數(shù)據(jù)源,如文件系統(tǒng)、數(shù)據(jù)庫、消息隊列等,以獲取數(shù)據(jù)。4.2.1常用處理器GetFile:從文件系統(tǒng)中讀取數(shù)據(jù)。JDBCInput:從數(shù)據(jù)庫中讀取數(shù)據(jù)。ConsumeKafka:從Kafka消息隊列中讀取數(shù)據(jù)。4.2.2示例假設我們需要從一個文件系統(tǒng)中實時采集日志數(shù)據(jù),可以使用GetFile處理器。首先,創(chuàng)建一個Nifi流程,在流程中添加GetFile處理器,并配置其輸入目錄為日志文件所在的目錄。<!--Nifi配置示例-->

<processorid="12345678-1234-1234-1234-1234567890ab"type="GetFile">

<name>GetLogFiles</name>

<properties>

<propertyname="InputDirectory"value="/path/to/log/directory"/>

<propertyname="FileFilter"value="*.log"/>

</properties>

<scheduling>

<schedulingPeriod>0sec</schedulingPeriod>

<schedulingStrategy>EVENT_DRIVEN</schedulingStrategy>

</scheduling>

</processor>4.3使用Nifi進行數(shù)據(jù)清洗數(shù)據(jù)清洗是數(shù)據(jù)處理中的關鍵步驟,它涉及識別和糾正數(shù)據(jù)中的錯誤、不一致和缺失值。Nifi提供了多種處理器來執(zhí)行數(shù)據(jù)清洗任務,如ExtractText、ReplaceText、SplitText等,這些處理器可以用于解析、轉(zhuǎn)換和過濾數(shù)據(jù)。4.3.1示例假設我們從日志文件中采集的數(shù)據(jù)包含一些不需要的字段和格式錯誤,可以使用ReplaceText處理器來清洗數(shù)據(jù)。例如,刪除日志中的時間戳,只保留實際的日志消息。<!--Nifi配置示例-->

<processorid="87654321-1234-1234-1234-1234567890ab"type="ReplaceText">

<name>RemoveTimestamp</name>

<properties>

<propertyname="SearchValue"value="^\d{4}-\d{2}-\d{2}\d{2}:\d{2}:\d{2},\d{3}"/>

<propertyname="ReplacementValue"value=""/>

<propertyname="MatchType"value="REGEX"/>

</properties>

<scheduling>

<schedulingPeriod>0sec</schedulingPeriod>

<schedulingStrategy>EVENT_DRIVEN</schedulingStrategy>

</scheduling>

</processor>在這個示例中,ReplaceText處理器被配置為使用正則表達式(REGEX)來匹配并刪除日志中的時間戳。時間戳的格式被定義為^\d{4}-\d{2}-\d{2}\d{2}:\d{2}:\d{2},\d{3},這表示一個以四位數(shù)字(年份)開始,包含日期、時間、毫秒的字符串。通過上述配置,Nifi可以實時地從文件系統(tǒng)中采集日志數(shù)據(jù),并立即清洗數(shù)據(jù),去除時間戳,只保留日志消息,從而為后續(xù)的數(shù)據(jù)處理和分析提供更干凈、更一致的數(shù)據(jù)。5高級數(shù)據(jù)處理技術5.1數(shù)據(jù)富化和關聯(lián)在數(shù)據(jù)集成領域,數(shù)據(jù)富化(DataEnrichment)和關聯(lián)(DataCorrelation)是提升數(shù)據(jù)價值的關鍵步驟。ApacheNiFi提供了強大的功能來實現(xiàn)這些操作,通過連接不同的數(shù)據(jù)源,將數(shù)據(jù)進行合并、清洗和轉(zhuǎn)換,從而生成更完整、更準確的數(shù)據(jù)集。5.1.1數(shù)據(jù)富化數(shù)據(jù)富化是指在原始數(shù)據(jù)上添加額外信息的過程,這些信息可以來自其他數(shù)據(jù)源,如數(shù)據(jù)庫、API或文件。例如,假設我們有一個包含用戶ID的流數(shù)據(jù),我們可能需要從數(shù)據(jù)庫中獲取每個用戶的詳細信息,如姓名、年齡和位置,以豐富原始數(shù)據(jù)。示例:使用LookupProcessor進行數(shù)據(jù)富化在NiFi中,Lookup處理器可以用于數(shù)據(jù)富化。假設我們有以下流數(shù)據(jù):{"userId":"123"}

{"userId":"456"}我們需要從一個外部數(shù)據(jù)庫中獲取每個用戶的詳細信息。我們可以使用JDBCLookup處理器來實現(xiàn)這一目標。首先,配置JDBCLookup處理器連接到數(shù)據(jù)庫,然后設置SQL查詢語句,如:SELECTname,age,locationFROMusersWHEREuserId=?在JDBCLookup處理器中,將userId作為查詢參數(shù),處理器將返回與每個用戶ID匹配的詳細信息。最終,原始數(shù)據(jù)將被富化,如下所示:{"userId":"123","name":"張三","age":28,"location":"北京"}

{"userId":"456","name":"李四","age":32,"location":"上海"}5.1.2數(shù)據(jù)關聯(lián)數(shù)據(jù)關聯(lián)是指將來自不同源的數(shù)據(jù)集合并到一起,通?;谝粋€共同的鍵。例如,我們可能需要將用戶活動數(shù)據(jù)與用戶詳細信息數(shù)據(jù)關聯(lián)起來,以分析用戶行為。示例:使用JoinProcessor進行數(shù)據(jù)關聯(lián)在NiFi中,Join處理器可以用于數(shù)據(jù)關聯(lián)。假設我們有兩個數(shù)據(jù)流,一個包含用戶活動信息,另一個包含用戶詳細信息:用戶活動數(shù)據(jù)流:{"userId":"123","activity":"購買"}

{"userId":"456","activity":"瀏覽"}用戶詳細信息數(shù)據(jù)流:{"userId":"123","name":"張三","age":28}

{"userId":"456","name":"李四","age":32}我們可以使用Join處理器,將兩個流基于userId進行關聯(lián)。配置Join處理器時,選擇userId作為關聯(lián)鍵,并設置適當?shù)年P聯(lián)策略。最終,兩個數(shù)據(jù)流將被關聯(lián),如下所示:{"userId":"123","activity":"購買","name":"張三","age":28}

{"userId":"456","activity":"瀏覽","name":"李四","age":32}5.2數(shù)據(jù)路由和分發(fā)數(shù)據(jù)路由和分發(fā)是數(shù)據(jù)處理流程中的重要環(huán)節(jié),它們決定了數(shù)據(jù)如何在系統(tǒng)中流動以及最終如何被分發(fā)到不同的目的地。ApacheNiFi提供了靈活的數(shù)據(jù)路由和分發(fā)機制,允許用戶根據(jù)數(shù)據(jù)內(nèi)容、屬性或狀態(tài),將數(shù)據(jù)發(fā)送到不同的下游處理器或輸出目的地。5.2.1數(shù)據(jù)路由數(shù)據(jù)路由是指根據(jù)數(shù)據(jù)的某些屬性或內(nèi)容,決定數(shù)據(jù)的流向。例如,我們可以根據(jù)數(shù)據(jù)的類型或質(zhì)量,將數(shù)據(jù)發(fā)送到不同的處理器進行處理。示例:使用RouteOnAttributeProcessor進行數(shù)據(jù)路由假設我們有以下流數(shù)據(jù),其中包含不同類型的事件:{"type":"error","message":"系統(tǒng)異常"}

{"type":"info","message":"用戶登錄成功"}我們可以使用RouteOnAttribute處理器,根據(jù)type屬性將數(shù)據(jù)路由到不同的處理器。例如,將所有error類型的事件發(fā)送到一個處理器進行錯誤處理,將所有info類型的事件發(fā)送到另一個處理器進行日志記錄。5.2.2數(shù)據(jù)分發(fā)數(shù)據(jù)分發(fā)是指將數(shù)據(jù)發(fā)送到一個或多個目的地的過程。例如,我們可以將處理后的數(shù)據(jù)分發(fā)到數(shù)據(jù)庫、文件系統(tǒng)或消息隊列。示例:使用PublishKafkaProcessor進行數(shù)據(jù)分發(fā)假設我們處理后的數(shù)據(jù)如下:{"userId":"123","activity":"購買","name":"張三","age":28}我們可以使用PublishKafka處理器,將數(shù)據(jù)分發(fā)到Kafka消息隊列。首先,配置PublishKafka處理器連接到Kafka集群,然后設置主題名稱和數(shù)據(jù)格式。最終,數(shù)據(jù)將被發(fā)送到Kafka,供其他系統(tǒng)或應用程序消費。5.3使用Nifi進行數(shù)據(jù)轉(zhuǎn)換數(shù)據(jù)轉(zhuǎn)換是數(shù)據(jù)集成過程中的核心步驟,它涉及到數(shù)據(jù)格式的轉(zhuǎn)換、數(shù)據(jù)類型的轉(zhuǎn)換以及數(shù)據(jù)內(nèi)容的修改。ApacheNiFi提供了多種處理器和功能,可以輕松地進行數(shù)據(jù)轉(zhuǎn)換。5.3.1數(shù)據(jù)格式轉(zhuǎn)換數(shù)據(jù)格式轉(zhuǎn)換是指將數(shù)據(jù)從一種格式轉(zhuǎn)換為另一種格式。例如,將JSON數(shù)據(jù)轉(zhuǎn)換為CSV格式,或?qū)ML數(shù)據(jù)轉(zhuǎn)換為JSON格式。示例:使用ConvertRecordProcessor進行數(shù)據(jù)格式轉(zhuǎn)換假設我們有以下JSON格式的流數(shù)據(jù):{"userId":"123","activity":"購買","name":"張三","age":28}我們可以使用ConvertRecord處理器,將數(shù)據(jù)轉(zhuǎn)換為CSV格式。首先,配置ConvertRecord處理器,選擇適當?shù)霓D(zhuǎn)換策略,如CSVtoJSON。然后,設置CSV格式的列名和數(shù)據(jù)類型。最終,數(shù)據(jù)將被轉(zhuǎn)換為CSV格式,如下所示:userId,activity,name,age

123,購買,張三,285.3.2數(shù)據(jù)類型轉(zhuǎn)換數(shù)據(jù)類型轉(zhuǎn)換是指將數(shù)據(jù)從一種類型轉(zhuǎn)換為另一種類型。例如,將字符串類型的日期轉(zhuǎn)換為日期類型,或?qū)⒄麛?shù)類型的數(shù)據(jù)轉(zhuǎn)換為浮點類型。示例:使用UpdateRecordProcessor進行數(shù)據(jù)類型轉(zhuǎn)換假設我們有以下流數(shù)據(jù),其中包含字符串類型的日期:{"userId":"123","activity":"購買","name":"張三","age":28,"date":"2023-01-01"}我們可以使用UpdateRecord處理器,將date字段從字符串類型轉(zhuǎn)換為日期類型。首先,配置UpdateRecord處理器,使用RecordReader和RecordWriter來讀取和寫入數(shù)據(jù)。然后,使用ExpressionEvaluator來修改date字段的類型。最終,數(shù)據(jù)將被轉(zhuǎn)換,如下所示:{"userId":"123","activity":"購買","name":"張三","age":28,"date":"2023-01-01T00:00:00.000Z"}5.3.3數(shù)據(jù)內(nèi)容修改數(shù)據(jù)內(nèi)容修改是指對數(shù)據(jù)進行清洗、過濾或修改,以滿足特定的業(yè)務需求或數(shù)據(jù)質(zhì)量標準。示例:使用UpdateAttributeProcessor進行數(shù)據(jù)內(nèi)容修改假設我們有以下流數(shù)據(jù),其中包含用戶活動信息:{"userId":"123","activity":"購買","name":"張三","age":28,"date":"2023-01-01T00:00:00.000Z"}我們可以使用UpdateAttribute處理器,添加或修改數(shù)據(jù)的屬性。例如,我們可以添加一個屬性eventTime,表示事件發(fā)生的時間戳。配置UpdateAttribute處理器時,使用以下表達式:${date:format("${flowFile:date(date)}","yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")}最終,數(shù)據(jù)將被修改,如下所示:{"userId":"123","activity":"購買","name":"張三","age":28,"date":"2023-01-01T00:00:00.000Z"}屬性eventTime將被添加到數(shù)據(jù)中,表示事件發(fā)生的時間戳。通過上述示例,我們可以看到ApacheNiFi在數(shù)據(jù)富化、關聯(lián)、路由、分發(fā)以及數(shù)據(jù)轉(zhuǎn)換方面的強大功能。這些技術的應用可以極大地提高數(shù)據(jù)處理的效率和準確性,為數(shù)據(jù)分析和決策提供更高質(zhì)量的數(shù)據(jù)支持。6Nifi與大數(shù)據(jù)生態(tài)6.1Nifi與Hadoop的集成6.1.1原理ApacheNifi是一個易于使用、功能強大的數(shù)據(jù)處理和分發(fā)系統(tǒng),它支持強大的數(shù)據(jù)路由、轉(zhuǎn)換和系統(tǒng)中介邏輯。Nifi與Hadoop的集成主要體現(xiàn)在Nifi能夠讀取和寫入Hadoop分布式文件系統(tǒng)(HDFS)中的數(shù)據(jù),以及與Hadoop生態(tài)系統(tǒng)中的其他組件如Hive、HBase、Pig等進行交互。這種集成使得Nifi成為Hadoop數(shù)據(jù)湖和數(shù)據(jù)倉庫的優(yōu)秀數(shù)據(jù)管道,能夠處理實時和批量數(shù)據(jù)流。6.1.2內(nèi)容讀取HDFS數(shù)據(jù)Nifi提供了HDFSRead處理器,用于從HDFS中讀取數(shù)據(jù)。例如,假設我們有一個存儲在HDFS中的日志文件,我們可以使用以下配置來讀取它:-處理器類型:HDFSRead

-HDFSURI:hdfs://localhost:9000

-輸入目錄:/logs

-文件過濾器:*.log寫入HDFS數(shù)據(jù)同樣,HDFSPut處理器可以用于將數(shù)據(jù)寫入HDFS。例如,如果我們想要將處理后的數(shù)據(jù)存儲回HDFS,可以配置如下:-處理器類型:HDFSPut

-HDFSURI:hdfs://localhost:9000

-輸出目錄:/processed_data與Hive的集成Nifi可以通過HiveMetastore處理器與Hive進行集成,用于查詢Hive元數(shù)據(jù),以及通過HiveQuery處理器執(zhí)行HiveQL查詢。例如,我們可以使用以下配置來查詢Hive表中的數(shù)據(jù):-處理器類型:HiveQuery

-HiveURI:thrift://localhost:9083

-HiveQuery:SELECT*FROMlogsWHEREdate='2023-01-01'6.2Nifi與Kafka的集成6.2.1原理Nifi與ApacheKafka的集成允許Nifi作為Kafka的生產(chǎn)者和消費者,處理實時數(shù)據(jù)流。Kafka是一個分布式流處理平臺,用于構建實時數(shù)據(jù)管道和流應用。Nifi通過Kafka處理器可以訂閱Kafka主題中的數(shù)據(jù),也可以將數(shù)據(jù)發(fā)布到Kafka主題。6.2.2內(nèi)容從Kafka消費數(shù)據(jù)使用ConsumeKafka處理器,Nifi可以訂閱Kafka主題中的數(shù)據(jù)。例如,訂閱名為logs的主題:-處理器類型:ConsumeKafka

-KafkaURI:localhost:9092

-主題:logs

-消費者組:nifi-consumer-group向Kafka發(fā)布數(shù)據(jù)PublishKafka處理器用于將數(shù)據(jù)發(fā)布到Kafka主題。例如,將處理后的數(shù)據(jù)發(fā)布到名為processed_logs的主題:-處理器類型:PublishKafka

-KafkaURI:localhost:9092

-主題:processed_logs6.3Nifi與Spark的集成6.3.1原理Nifi與ApacheSpark的集成主要通過Nifi的ExecuteStreamCommand處理器實現(xiàn),該處理器可以執(zhí)行SparkStreaming任務,處理實時數(shù)據(jù)流。此外,Nifi也可以通過ExecuteScript處理器調(diào)用Spark的批處理任務,處理批量數(shù)據(jù)。6.3.2內(nèi)容執(zhí)行SparkStreaming任務使用ExecuteStreamCommand處理器,Nifi可以執(zhí)行SparkStreaming任務。例如,執(zhí)行一個SparkStreaming任務,該任務從Kafka主題讀取數(shù)據(jù)并進行實時處理:-處理器類型:ExecuteStreamCommand

-SparkMasterURI:spark://localhost:7077

-主程序:/path/to/spark-streaming-job.jar

-參數(shù):--kafka-brokerlocalhost:9092--kafka-topiclogs--output-topicprocessed_logs執(zhí)行Spark批處理任務ExecuteScript處理器可以調(diào)用Spark的批處理任務。例如,執(zhí)行一個Spark任務,該任務從HDFS讀取數(shù)據(jù)并進行批處理:-處理器類型:ExecuteScript

-腳本引擎:Spark

-腳本路徑:/path/to/spark-batch-job.py

-參數(shù):--input-path/logs--output-path/processed_data通過上述集成,Nifi成為了大數(shù)據(jù)生態(tài)系統(tǒng)中一個靈活且強大的數(shù)據(jù)處理和分發(fā)工具,能夠無縫地與Hadoop、Kafka和Spark等組件協(xié)同工作,處理實時和批量數(shù)據(jù)流。7監(jiān)控和管理Nifi7.1Nifi的監(jiān)控儀表板在ApacheNifi中,監(jiān)控儀表板是管理實時數(shù)據(jù)處理流程的關鍵工具。它提供了對Nifi實例的全面視圖,包括處理器狀態(tài)、連接、隊列、系統(tǒng)資源使用情況等。通過監(jiān)控儀表板,可以實時監(jiān)控數(shù)據(jù)流的性能,確保數(shù)據(jù)處理的效率和穩(wěn)定性。7.1.1訪問監(jiān)控儀表板打開Nifi的WebUI。轉(zhuǎn)到“監(jiān)控”選項卡,這里會顯示各種監(jiān)控信息。7.1.2監(jiān)控指標處理器狀態(tài):顯示每個處理器的運行狀態(tài),如運行、停止、失敗等。連接狀態(tài):監(jiān)控數(shù)據(jù)在不同組件之間的流動情況,包括數(shù)據(jù)量、傳輸速率等。隊列狀態(tài):查看數(shù)據(jù)隊列的大小和數(shù)據(jù)量,幫助理解數(shù)據(jù)處理的瓶頸。系統(tǒng)資源:監(jiān)控CPU、內(nèi)存、磁盤使用情況,確保Nifi運行在最佳狀態(tài)。7.2性能調(diào)優(yōu)和故障排除性能調(diào)優(yōu)是確保Nifi高效處理數(shù)據(jù)的關鍵。故障排除則是在遇到問題時,快速定位并解決問題的必要技能。7.2.1性能調(diào)優(yōu)調(diào)整線程數(shù):根據(jù)系統(tǒng)資源和數(shù)據(jù)處理需求,調(diào)整處理器的線程數(shù)。優(yōu)化隊列策略:合理設置隊列策略,如優(yōu)先級隊列,可以提高數(shù)據(jù)處理的效率。使用緩存:對于頻繁訪問的數(shù)據(jù),使用緩存可以減少I/O操作,提高性能。7.2.2故障排除檢查日志文件:Nifi的日志文件包含了運行時的詳細信息,是故障排除的第一步。使用診斷工具:Nifi提供了診斷工具,如“診斷”選項卡,可以幫助快速定位問題。監(jiān)控儀表板:監(jiān)控儀表板顯示的實時數(shù)據(jù)流狀態(tài),是故障排除的重要參考。7.3安全管理Nifi安全管理是保護數(shù)據(jù)和Nifi實例免受未授權訪問和攻擊的重要措施。7.3.1配置身份驗證Nifi支持多種身份驗證機制,包括Kerberos、LDAP、ActiveDirectory等。例如,使用Kerberos進行身份驗證的配置如下:<!--在perties中配置Kerberos-->

vider=KerberosLoginIdentityProvider

vider.KerberosLoginIdentityProvider.principal=niuser@EXAMPLE.COM

vider.KerberosLoginIdentityProvider.keytab=/etc/security/keytabs/niuser.keytab7.3.2設置訪問控制通過設置訪問控制,可以限制用戶對Nifi實例的訪問。例如,只允許特定用戶組訪問Nifi的WebUI:<!--在perties中配置訪問控制-->

nifi.security.user.groups=niadmin,niuser

nifi.security.user.niadmin=niadmin

nifi.security.user.niuser=niuser

vider=StandardGroupsProvider7.3.3加密數(shù)據(jù)傳輸為了保護數(shù)據(jù)在傳輸過程中的安全,Nifi支持SSL/TLS加密。配置SSL/TLS的示例如下:<!--在perties中配置SSL/TLS-->

nifi.security.keystore=file:/etc/ssl/nifi.jks

nifi.security.keystore.type=JKS

nifi.security.keystore.password=nifipassword

nifi.security.truststore=file:/etc/ssl/nifi-truststore.jks

nifi.security.truststore.type=JKS

nifi.security.truststore.password=nifipassword通過以上配置,可以確保Nifi的數(shù)據(jù)處理流程在安全的環(huán)境中運行,同時通過監(jiān)控和性能調(diào)優(yōu),保證數(shù)據(jù)處理的效率和穩(wěn)定性。8案例研究與實踐8.1實時日志處理在實時日志處理場景中,ApacheNiFi是一個強大的工具,它能夠自動、可靠地處理和路由數(shù)據(jù)流。下面,我們將通過一個具體的案例來展示如何使用NiFi進行實時日志數(shù)據(jù)的收集、處理和分析。8.1.1案例背景假設我們正在運營一個大型的在線服務,需要實時監(jiān)控和分析用戶活動日志,以快速響應任何異常情況。日志數(shù)據(jù)從不同的服務器產(chǎn)生,需要被集中收集,清洗,然后發(fā)送到數(shù)據(jù)分析平臺進行實時分析。8.1.2NiFi配置步驟創(chuàng)建數(shù)據(jù)收集流程:在NiFi的Canvas上,使用GetFile處理器來監(jiān)聽日志文件目錄。配置GetFile處理器,設置監(jiān)聽的目錄和文件過濾規(guī)則。數(shù)據(jù)清洗:使用ExecuteScript處理器,可以使用Groovy或Python腳本來清洗數(shù)據(jù),例如去除日志中的無用信息,提取關鍵字段。示例Groovy腳本://GroovyScriptfordatacleaning

deflogContent=flowFile.getContent().toString()

defcleanedContent=logContent.replaceAll(/.*INFO.*$$(\d{4}-\d{2}-\d{2}\d{2}:\d{2}:\d{2})$$.*/,'$1')

session.write(flowFile,newStringWriter(cleanedContent))數(shù)據(jù)路由:使用RouteOnAttribute處理器,根據(jù)數(shù)據(jù)的屬性(如日志級別)來決定數(shù)據(jù)的流向。例如,將所有INFO級別的日志發(fā)送到一個目的地,而ERROR級別的日志發(fā)送到另一個目的地。數(shù)據(jù)發(fā)送:使用PublishKafka處理器將清洗后的日志數(shù)據(jù)發(fā)送到Kafka集群,供實時分析系統(tǒng)使用。配置PublishKafka處理器,設置Kafka的Broker列表和目標Topic。8.1.3實踐要點性能優(yōu)化:確保GetFile處理器的輪詢間隔和線程數(shù)量設置合理,以避免資源浪費或數(shù)據(jù)積壓。錯誤處理:在數(shù)據(jù)清洗和路由過程中,設置適當?shù)腻e誤處理策略,如重試或發(fā)送到錯誤隊列。8.2社交媒體數(shù)據(jù)流分析社交媒體數(shù)據(jù)流分析是另一個NiFi可以大展身手的領域。通過實時收集和分析社交媒體上的數(shù)據(jù),企業(yè)可以快速了解公眾情緒,進行市場趨勢分析。8.2.1案例背景假設我們需要從Twitter收集實時的推文數(shù)據(jù),然后分析其中的情感傾向,以了解用戶對某個新產(chǎn)品的反饋。8.2.2NiFi配置步驟數(shù)據(jù)收集:使用ConsumeTwitter處理器來收集Twitter上的實時數(shù)據(jù)。配置ConsumeTwitter處理器,設置Twitter的API密鑰和訪問令牌,以及要監(jiān)聽的關鍵詞。數(shù)據(jù)處理:使用ExecuteScript處理器,可以使用Python的TextBlob庫來分析推文的情感傾向。示例Python腳本:#PythonScriptforsentimentanalysis

fromtextblobimportTextBlob

tweet=flowFile.read().decode('utf-8')

sentiment=TextBlob(tweet).sentiment.polarity

ifsentiment>0:

flowFile.addAttribute('sentiment','positive')

elifsentiment<0:

flowFile.addAttribute('sentiment','negative')

else:

flowFile.addAttribute('sentiment','neutral')

session.transfer(flowFile,REL_SUCCESS)數(shù)據(jù)存儲與分析:使用PutHDFS或PublishKafka處理器將處理后的數(shù)據(jù)存儲或發(fā)送到實時分析系統(tǒng)。8.2.3實踐要點數(shù)據(jù)過濾:在收集階段,可以設置過濾規(guī)則,只收集與特定關鍵詞相關的推文,以減少數(shù)據(jù)量。情感分析的準確性:情感分析可能受到語言、文化差異的影響,需要定期校準和優(yōu)化分析模型。8.3物聯(lián)網(wǎng)數(shù)據(jù)集成物聯(lián)網(wǎng)(IoT)數(shù)據(jù)集成是NiFi的另一個關鍵應用領域。NiFi能夠處理來自各種IoT設備的大量數(shù)據(jù),進行實時分析和決策。8.3.1案例背景假設我們正在管理一個智能農(nóng)業(yè)系統(tǒng),需要實時收集土壤濕度、溫度等數(shù)據(jù),以優(yōu)化灌溉和施肥策略。8.3.2NiFi配置步驟數(shù)據(jù)收集:使用ConsumeKafka處理器來收集來自IoT設備的數(shù)據(jù)。配置ConsumeKafka處理器,設置Kafka的Broker列表和源Topic。數(shù)據(jù)轉(zhuǎn)換:使用ExecuteScript處理器,可以使用Python或Groovy腳本來轉(zhuǎn)換數(shù)據(jù)格式,例如將JSON格式轉(zhuǎn)換為CSV格式。示例Groovy腳本://GroovyScriptfordatatransformation

defjsonContent=flowFile.getContent().toString()

defdata=JSON.parseText(jsonContent)

defcsvContent="${data['device_id']},${data['timestamp']},${data['humidity']},${data['temperature']}"

session.write(flowFile,newStringWriter(csvContent))數(shù)據(jù)存儲與分析:使用PutHDFS處理器將轉(zhuǎn)換后的數(shù)據(jù)存儲到Hadoop的HDFS中,供后續(xù)的批處理分析使用?;蛘呤褂肞ublishKafka處理器將數(shù)據(jù)發(fā)送到實時分析系統(tǒng)。8.3.3實踐要點數(shù)據(jù)安全:確保在傳輸和存儲IoT數(shù)據(jù)時采取適當?shù)陌踩胧?,如?shù)據(jù)加密和訪問控制。數(shù)據(jù)質(zhì)量:定期檢查和清洗數(shù)據(jù),以確保分析結果的準確性。通過以上案例,我們可以看到ApacheNiFi在實時數(shù)據(jù)處理中的強大功能和靈活性。無論是日志處理、社交媒體分析還是IoT數(shù)據(jù)集成,NiFi都能提供一個可靠、可擴展的解決方案。9Nifi的擴展和自定義9.1開發(fā)自定義處理器9.1.1原理ApacheNifi允許用戶通過開發(fā)自定義處理器來擴展其功能,滿足特定的數(shù)據(jù)處理需求。自定義處理器是通過實現(xiàn)Nifi的Processor接口來創(chuàng)建的,可以使用Java進行開發(fā)。處理器可以執(zhí)行各種操作,如數(shù)據(jù)轉(zhuǎn)換、數(shù)據(jù)路由、數(shù)據(jù)存儲等。9.1.2內(nèi)容要開發(fā)自定義處理器,首先需要創(chuàng)建一個Java類,該類實現(xiàn)cessor.Processor接口。以下是一個簡單的自定義處理器示例,該處理器將接收到的FlowFile中的文本轉(zhuǎn)換為大寫。importcessor.Processor;

importcessor.ProcessContext;

importcessor.ProcessSession;

importcessor.Relationship;

importorg.apache.nifi.flowfile.FlowFile;

importorg.apache.nifi.annotation.documentation.CapabilityDescription;

importorg.apache.nifi.annotation.documentation.Tags;

importorg.apache.nifi.annotation.lifecycle.OnScheduled;

importjava.util.HashSet;

importjava.util.Set;

@Tags({"uppercase","text"})

@CapabilityDescription("將接收到的文本轉(zhuǎn)換為大寫")

publicclassToUpperCaseProcessorimplementsProcessor{

publicstaticfinalRelationshipREL_SUCCESS=newRelationship.Builder()

.name("success")

.description("成功處理的流文件")

.build();

privateSet<Relationship>relationships;

@Override

publicvoidonScheduled(ProcessContextcontext){

relationships=newHashSet<>();

relationships.add(REL_SUCCESS);

}

@Override

publicSet<Relationship>getRelationships(){

returnrelationships;

}

@Override

publicvoidinitialize(){

//初始化處理器

}

@Override

publicvoidprocess(ProcessSessionsession){

FlowFileflowFile=session.get();

if(flowFile!=null){

Stringcontent=newString(session.read(flowFile).array());

StringupperCaseContent=content.toUpperCase();

FlowFileupdatedFlowFile=

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論