色彩时光 | 记录程序员世界的点点滴滴

flume抓取日志存取到hadoop demo


flume抓取日志存取到hadoop,分为客户端与服务端,两端之间通过arvo进行连接。客户端抓取日志并将数据发送到服务端,服务端接受到数据后通过hadoop进行保存。
客户端配置:


############################################################## COMPONENTS
# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'
agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = k1
############################################################## SOURCES
# For each one of the sources, the type is defined
# Exec Source For Flume agent on Win XP(UnxUtils).
agent.sources.seqGenSrc.type = exec
agent.sources.seqGenSrc.command = E:/tool/UnxUtils/usr/local/wbin/tail.exe --follow=name --retry E:/errorlog.txt
agent.sources.seqGenSrc.restart = true
agent.sources.seqGenSrc.restartThrottle = 1000
agent.sources.seqGenSrc.batchSize = 100
#agent.sources.seqGenSrc.charset = GBK
############################################################## SINKS
agent.sinks.k1.type = avro
agent.sinks.k1.hostname = 192.168.63.193
agent.sinks.k1.port = 4545
############################################################## CHANNELS
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100000
############################################################## RELATIONS
# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel
#Specify the channel the sink should use
agent.sinks.k1.channel = memoryChannel

agent.sources.seqGenSrc.interceptors = i1
agent.sources.seqGenSrc.interceptors.i1.type = static
agent.sources.seqGenSrc.interceptors.i1.key = fileName
agent.sources.seqGenSrc.interceptors.i1.value = box1234

服务端配置:

############################################################## COMPONENTS
# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'
agent.sources = r1 http1
agent.channels = memoryChannel
agent.sinks = h
############################################################## SOURCES
agent.sources.r1.type = avro
agent.sources.r1.bind = 192.168.63.193
agent.sources.r1.port = 4545
############################################################## SOURCES
agent.sources.http1.type = http
agent.sources.http1.port = 5140
agent.sources.http1.bind=192.168.63.193
agent.sources.http1.handler = org.apache.flume.source.http.JSONHandler
############################################################## SINKS
agent.sinks.h.type = hdfs
agent.sinks.h.hdfs.path =hdfs://192.168.63.193:9000/logs/box/source/%Y%m%d
agent.sinks.h.hdfs.filePrefix = %{fileName}
agent.sinks.h.hdfs.fileType = DataStream
agent.sinks.h.hdfs.useLocalTimeStamp=true
agent.sinks.h.hdfs.rollInterval=0
agent.sinks.h.hdfs.rollSize = 60000
agent.sinks.h.hdfs.rollCount = 0
agent.sinks.h.hdfs.writeFormat = Text
#agent.sinks.h.hdfs.fileSuffix=%Y%m%d%H%M%S
############################################################## CHANNELS
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100000
############################################################## RELATIONS
# The channel can be defined as follows.
agent.sources.r1.channels = memoryChannel
agent.sources.http1.channels = memoryChannel
#Specify the channel the sink should use
agent.sinks.h.channel = memoryChannel

动态配置文件%{fileName} 采用如下,即在heards里带入了其变量,当然也可以采取http source post 发送json数据 [{ "headers" : {"fileName" : "val1" }]的方式


agent.sources.seqGenSrc.interceptors = i1
agent.sources.seqGenSrc.interceptors.i1.type = static
agent.sources.seqGenSrc.interceptors.i1.key = fileName
agent.sources.seqGenSrc.interceptors.i1.value = box1234

参考资料:http://www.tuicool.com/articles/ZjuqMv
http://www.aboutyun.com/forum-145-1.html
http://flume.apache.org/FlumeUserGuide.html#jsonhandler


您可能也对下面文章感兴趣:

Write a Comment


* Content (required) 10~500s

分类

热门标签

友情链接