云計(jì)算之Flume-NG-1.5.0 Kafka_2.9_第1頁
云計(jì)算之Flume-NG-1.5.0 Kafka_2.9_第2頁
云計(jì)算之Flume-NG-1.5.0 Kafka_2.9_第3頁
云計(jì)算之Flume-NG-1.5.0 Kafka_2.9_第4頁
云計(jì)算之Flume-NG-1.5.0 Kafka_2.9_第5頁
已閱讀5頁,還剩21頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

1、Flume-NG-1.5.0+Kafka 環(huán)境整合# Flume的學(xué)習(xí)請(qǐng)參考前面的文檔或者Blog# Kafka的學(xué)習(xí)請(qǐng)參考前面的文檔或者Blog# Storm的學(xué)習(xí)請(qǐng)參考前面的文檔或者Blog# 場(chǎng)景使用Flume監(jiān)控指定目錄,出現(xiàn)新的日志文件后將文件數(shù)據(jù)傳到Kafka,最后由Storm從Kafka中取出數(shù)據(jù)并顯示 、# Flume+Kafka的整合# Flume的的配置文件監(jiān)控指定目錄/usr/local/yting/flume/tdata/tdir1,然后使用自定義的Sink(),將數(shù)據(jù)傳入Kafkarootrs229 fks# pwdrootrs229 fks# vi fks001.c

2、onf # fks : yting yousmile flume kafka storm integrationfks.sources=source1fks.sinks=sink1fks.channels=channel1# configure source1fks.sources.source1.type=spooldirfks.sources.source1.spoolDir=/usr/local/yting/flume/tdata/tdir1fks.sources.source1.fileHeader = false# configure sink1fks.sinks.sink1.typ

3、e= #(自定義Sink,F(xiàn)lume監(jiān)控?cái)?shù)據(jù)傳入Kafka)# configure channel1fks.channels.channel1.type=filefks.channels.channel1.checkpointDir=/usr/local/yting/flume/checkpointdir/tcpdir/example_fks_001fks.channels.channel1.dataDirs=/usr/local/yting/flume/datadirs/tddirs/example_fks_001# bind source and sinkfks.sources.sourc

4、e1.channels=channel1fks.sinks.sink1.channel=channel1# Kafka 的配置文件rootrs229 ytconf# pwdrootrs229 ytconf# vi perties # A comma seperated list of directories under which to store log files# log.dirs=/tmp/kafka-logs# root directory for all kafka znodes.zookeeper.connect=rs229:2181,rs227:2181,r

5、s226:2181,rs198:2181,rs197:2181/kafka# jar包的復(fù)制,Kafka中的jar包復(fù)制到Flume中去,因?yàn)樽远x的Sink()會(huì)用到,如果不做這一步,會(huì)抱異常的!rootrs229 lib# pwdrootrs229 lib# cp /usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/libs/* .# 使用Eclipse將自定義的Sink()打成jar包放入$FLUME_HOME/libs目錄下去納尼?這里不會(huì),那你還是跟餓學(xué)養(yǎng)豬吧 、# Kafka的啟動(dòng)rootrs229 kafka_2.9.2

6、-# pwdrootrs229 kafka_2.9.2-# bin/kafka-server-start.sh config/ytconf/perties &1 24672rootrs229 kafka_2.9.2-# 2014-07-14 11:48:24,533 INFO Verifying properties (kafka.utils.VerifiableProperties)2014-07-14 11:48:24,572 INFO Property broker.id is overridden to 0 (kaf

7、ka.utils.VerifiableProperties)2014-07-14 11:48:24,572 INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)2014-07-14 11:48:24,572 INFO Property log.dirs is overridden to /usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/kafka-logs (kafka.utils.VerifiablePr

8、operties)2014-07-14 11:48:24,572 INFO Property erval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)2014-07-14 11:48:24,572 INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)2014-07-14 11:48:24,573 INFO Property log.segment.b

9、ytes is overridden to 536870912 (kafka.utils.VerifiableProperties)2014-07-14 11:48:24,573 INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)2014-07-14 11:48:24,573 INFO Property work.threads is overridden to 2 (kafka.utils.VerifiableProperties)2014-07-14 11:48:24,573

10、INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)2014-07-14 11:48:24,573 INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)2014-07-14 11:48:24,573 INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiablePrope

11、rties)2014-07-14 11:48:24,574 INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)2014-07-14 11:48:24,574 INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)2014-07-14 11:48:24,574 INFO Property zookeeper.

12、connect is overridden to rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka (kafka.utils.VerifiableProperties)2014-07-14 11:48:24,574 INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)2014-07-14 11:48:24,590 INFO Kafka Server 0, starti

13、ng (kafka.server.KafkaServer)2014-07-14 11:48:24,592 INFO Kafka Server 0, Connecting to zookeeper on rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka (kafka.server.KafkaServer)2014-07-14 11:48:24,603 INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)2014-07-14 11:48

14、:24,610 INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,610 INFO Client environment:=rs229 (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,610 INFO Client environment:java.version=1.7.0_60 (org.ap

15、ache.zookeeper.ZooKeeper)2014-07-14 11:48:24,610 INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,610 INFO Client environment:java.home=/usr/local/adsit/yting/jdk/jdk1.7.0_60/jre (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,610 INFO Cl

16、ient environment:java.class.path=:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./core/build/dependant-libs-2.8.0/*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./perf/build/libs/kafka-perf_2.8.0*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./clie

17、nts/build/libs/kafka-clients*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./examples/build/libs/kafka-examples*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./contrib/hadoop-consumer/build/libs/kafka-hadoop-consumer*.jar:/usr/local/adsit/yting/apache/kafka/kafk

18、a_2.9.2-/bin/./contrib/hadoop-producer/build/libs/kafka-hadoop-producer*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./libs/jopt-simple-3.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./libs/kafka_2.9.2-.jar:/usr/local/adsit/yting/apache/kafka/ka

19、fka_2.9.2-/bin/./libs/kafka_2.9.2--javadoc.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./libs/kafka_2.9.2--scaladoc.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./libs/kafka_2.9.2--sources.jar:/usr/local/adsit/yting/apache/kafka/kafk

20、a_2.9.2-/bin/./libs/log4j-1.2.15.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./libs/metrics-core-2.2.0.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./libs/scala-library-2.9.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./libs/slf4j-ap

21、i-1.7.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./libs/snappy-java-1.0.5.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./libs/zkclient-0.3.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/bin/./libs/zookeeper-3.3.4.jar:/usr/local/adsit/yting/apach

22、e/kafka/kafka_2.9.2-/bin/./core/build/libs/kafka_2.8.0*.jar (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,610 INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,610 INFO Client enviro

23、nment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,610 INFO Client environment:piler=<NA> (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,610 INFO Client environment:=Linux (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,610 INFO Client environment:os.

24、arch=amd64 (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,610 INFO Client environment:os.version=2.6.32-279.el6.x86_64 (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,610 INFO Client environment:=root (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,610 INFO Client environment:u

25、ser.home=/root (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,610 INFO Client environment:user.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2- (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,611 INFO Initiating client connection, connectString=rs229:2181,rs227:2181,rs226:2181,rs

26、198:2181,rs197:2181/kafka sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient7c8b7ac9 (org.apache.zookeeper.ZooKeeper)2014-07-14 11:48:24,625 INFO Opening socket connection to server rs198/98:2181 (org.apache.zookeeper.ClientCnxn)2014-07-14 11:48:24,631 INFO Socket connection estab

27、lished to rs198/98:2181, initiating session (org.apache.zookeeper.ClientCnxn)2014-07-14 11:48:24,642 INFO Session establishment complete on server rs198/98:2181, sessionid = 0xc6472c07f50b0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)2014-07-14 11:48:24,6

28、45 INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)2014-07-14 11:48:24,892 INFO Found clean shutdown file. Skipping recovery for all logs in data directory '/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-/kafka-logs' (kafka.log.LogManager)2014-07-14 11:48:2

29、4,894 INFO Loading log 'flume-kafka-storm-001-0' (kafka.log.LogManager)2014-07-14 11:48:24,945 INFO Completed load of log flume-kafka-storm-001-0 with log end offset 18 (kafka.log.Log)SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation

30、(NOP) logger implementationSLF4J: See /codes.html#StaticLoggerBinder for further details.2014-07-14 11:48:24,966 INFO Loading log 'flume-kafka-storm-001-1' (kafka.log.LogManager)2014-07-14 11:48:24,969 INFO Completed load of log flume-kafka-storm-001-1 with log end offset

31、7 (kafka.log.Log)2014-07-14 11:48:24,970 INFO Loading log 'test001-1' (kafka.log.LogManager)2014-07-14 11:48:24,973 INFO Completed load of log test001-1 with log end offset 0 (kafka.log.Log)2014-07-14 11:48:24,974 INFO Loading log 'test003-1' (kafka.log.LogManager)2014-07-14 11:48:24

32、,976 INFO Completed load of log test003-1 with log end offset 47 (kafka.log.Log)2014-07-14 11:48:24,977 INFO Loading log 'test004-0' (kafka.log.LogManager)2014-07-14 11:48:24,980 INFO Completed load of log test004-0 with log end offset 51 (kafka.log.Log)2014-07-14 11:48:24,981 INFO Loading l

33、og 'test004-1' (kafka.log.LogManager)2014-07-14 11:48:24,984 INFO Completed load of log test004-1 with log end offset 49 (kafka.log.Log)2014-07-14 11:48:24,985 INFO Loading log 'test002-0' (kafka.log.LogManager)2014-07-14 11:48:24,987 INFO Completed load of log test002-0 with log end

34、 offset 0 (kafka.log.Log)2014-07-14 11:48:24,987 INFO Loading log 'test001-0' (kafka.log.LogManager)2014-07-14 11:48:24,991 INFO Completed load of log test001-0 with log end offset 0 (kafka.log.Log)2014-07-14 11:48:24,991 INFO Loading log 'test002-1' (kafka.log.LogManager)2014-07-14

35、11:48:24,993 INFO Completed load of log test002-1 with log end offset 0 (kafka.log.Log)2014-07-14 11:48:24,994 INFO Loading log 'test003-0' (kafka.log.LogManager)2014-07-14 11:48:24,997 INFO Completed load of log test003-0 with log end offset 53 (kafka.log.Log)2014-07-14 11:48:24,999 INFO St

36、arting log cleanup with a period of 60000 ms. (kafka.log.LogManager)2014-07-14 11:48:25,003 INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)2014-07-14 11:48:25,031 INFO Awaiting socket connections on :9092. (work.Acceptor)2014-07-14 11:48:25,03

37、2 INFO Socket Server on Broker 0, Started (work.SocketServer)2014-07-14 11:48:25,143 INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)2014-07-14 11:48:25,163 INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)2014-07-14 11:48:25,639 INF

38、O New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)2014-07-14 11:48:25,645 INFO Registered broker 0 at path /brokers/ids/0 with address rs229:9092. (kafka.utils.ZkUtils$)2014-07-14 11:48:25,660 INFO Kafka Server 0, started (kafka.server.KafkaServer)2014-07-14 11:48:25,942 IN

39、FO ReplicaFetcherManager on broker 0 Removed fetcher for partitions test001,0,test003,1,test003,0,flume-kafka-storm-001,1,flume-kafka-storm-001,0,test004,1,test004,0,test001,1,test002,0,test002,1 (kafka.server.ReplicaFetcherManager)2014-07-14 11:48:26,045 INFO ReplicaFetcherManager on broker 0 Remov

40、ed fetcher for partitions test001,0,test003,1,test003,0,flume-kafka-storm-001,1,flume-kafka-storm-001,0,test004,1,test004,0,test001,1,test002,0,test002,1 (kafka.server.ReplicaFetcherManager)# Flume的啟動(dòng)rootrs229 apache-flume-1.5.0-bin# bin/flume-ng agent -n fks -c conf/ -f conf/ytconf/fks/fks001.conf

41、-Dflume.root.logger=INFO,console &2014-07-14 11:50:13,882 (lifecycleSupervisor-1-0) INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:77) SpoolDirectorySource source starting with directory: /usr/local/yting/flume/tdata/tdir12014-07-14 11:50:13,912 (lifecycleSup

42、ervisor-1-0) INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119) Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.2014-07-14 11:50:13,916 (lifecycleSupervisor-1-0) INFO - org.apache.flume.instrumentation.Mon

43、itoredCounterGroup.start(MonitoredCounterGroup.java:95) Component type: SOURCE, name: source1 started2014-07-14 11:50:13,916 (pool-4-thread-1) INFO - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:254) Spooling Directory Source runner has shutdown.2

44、014-07-14 11:50:14,417 (pool-4-thread-1) INFO - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:254) Spooling Directory Source runner has shutdown.這樣Flume就算啟動(dòng)成功了,并且如果你的監(jiān)控目錄下出現(xiàn)新的日志文件的話,日志文件中的信息會(huì)傳到Kafka中去,你懂的!# 在Flume的監(jiān)控目錄下新建一個(gè)文件試試rootrs229 ytconf# cd

45、/usr/local/yting/flume/tdata/tdir1/rootrs229 tdir1# lltotal 0rootrs229 tdir1# vi yousmile.logThe you smile until forever .rootrs229 tdir1# lltotal 1-rw-r-r- 1 root root 50 Jul 14 13:57 yousmile.log.COMPLETED (說明已經(jīng)被Flume處理過了)rootrs229 tdir1# Eclipse下鏈接服務(wù)器查看Flume的自定義Sink是否將數(shù)據(jù)傳到Kafka中去了# MySimpleConsum

46、er.java(Eclipse下運(yùn)行即可得到結(jié)果)package com.yting.cloud.kafa.consumer;import kafka.api.FetchRequest;import kafka.api.FetchRequestBuilder;import kafka.api.PartitionOffsetRequestInfo;import mon.ErrorMapping;import mon.TopicAndPartition;import kafka.javaapi.*;import kafka.javaapi.consumer.SimpleConsumer;impor

47、t kafka.message.MessageAndOffset;import java.nio.ByteBuffer;import java.util.ArrayList;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;/* * Kafka官網(wǎng)給的案例 SimpleConsumer,餓在Eclipse本地連接服務(wù)器測(cè)試,所以修改了一些代碼 * * Author 王揚(yáng)庭 * Time 2014-07-14 * */public class MySim

48、pleConsumer public static void main(String args) MySimpleConsumer example = new MySimpleConsumer();/ long maxReads = Long.parseLong(args0);/ String topic = args1;/ int partition = Integer.parseInt(args2);/ seeds.add(args3);/ int port = Integer.parseInt(args4);long maxReads = 100;/String topic = &quo

49、t;yting_page_visits"/String topic = "test003"String topic = "flume-kafka-storm-001"/int partition = 0; int partition = 1; / The you smile until forever . 日志文件中的這條信息被送到分區(qū)1中去了,默認(rèn)2分區(qū)List<String> seeds = new ArrayList<String>();/seeds.add("rs229");seeds.add

50、("rs229");seeds.add("rs227");seeds.add("rs226");seeds.add("rs198");seeds.add("rs197");int port = Integer.parseInt("9092");try example.run(maxReads, topic, partition, seeds, port); catch (Exception e) System.out.println("Oops:" + e

51、);e.printStackTrace();private List<String> m_replicaBrokers = new ArrayList<String>();public MySimpleConsumer() m_replicaBrokers = new ArrayList<String>();public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exceptio

52、n / find the meta data about the topic and partition we are interested in/PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,a_partition);if (metadata = null) System.out.println("Can't find metadata for Topic and Partition. Exiting");return;if (metadata.leader() = n

53、ull) System.out.println("Can't find Leader for Topic and Partition. Exiting");return;String leadBroker = metadata.leader().host();String clientName = "Client_" + a_topic + "_" + a_partition;SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64

54、* 1024, clientName);long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);int numErrors = 0;while (a_maxReads > 0) if (consumer = null) consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);FetchRequest req =

55、 new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000) / Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka.build();FetchResponse fetchResponse = consumer.fetch(req);if (fetchResponse.hasError() numErrors+;/ Som

56、ething went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);if (numErrors > 5)break;if (code = ErrorMapping.OffsetOutOfRangeCode() / We asked for an invalid offset. For simple case ask for/ the last element to resetreadOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_parti

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論