flume spooldir to file_roll实现
在flume中提供了Spooling Directory Source对文件夹中的文件扫描进行监控并作为传输源。根据doc里说的此方法比exec source的tail更加的可靠。在使用过程中遇到了读取文件流编码异常的问题:
Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
at java.nio.charset.CoderResult.throwException(CoderResult.java:277)
at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:195)
查了下,网上也有类似的问题,引起的原因主要是deserializer的配置默认为LINE。配置的时候设置为BlobDeserializer即解决此问题。
具体配置如下:
agent.sources = s2
agent.channels = m1
agent.sinks = k1 k2
############################################################## SOURCES
# For each one of the sources, the type is defined
# Exec Source For Flume agent on Win XP(UnxUtils).
#agent.sources.s1.type = exec
#agent.sources.s1.command = E:/tool/UnxUtils/usr/local/wbin/tail.exe --follow=name --retry E:/tool/apache-flume-1.5.2-bin/apache-flume-1.5.2-bin/test/*.log
#agent.sources.s1.restart = true
#agent.sources.s1.restartThrottle = 1000
#agent.sources.s1.batchSize = 100
agent.sources.s2.type = spooldir
agent.sources.s2.spoolDir = E:/test
#agent.sources.s2.ignorePattern = \.
agent.sources.s2.inputCharset=utf-8
#agent.sources.s2.decodeErrorPolicy=REPLACE
agent.sources.s2.batchSize=1
#agent.sources.s2.deserializer = avro
agent.sources.s2.fileHeader = true
agent.sources.s2.fileHeaderKey = resourceName
agent.sources.s2.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
agent.sources.s2.deserializer.maxBlobLength = 2000000000
agent.sources.s2.deserializer.maxBackoff=30000
#agent.sources.s2.ignorePattern = ^(.)*\\.tmp$
#agent.sources.s2.deletePolicy=immediate
############################################################## SINKS
agent.sinks.k2.type = avro
agent.sinks.k2.hostname = log.sellx.cn
agent.sinks.k2.port = 14546
agent.sinks.k1.type = file_roll
agent.sinks.k1.sink.directory = e:/sh
agent.sinks.k1.sink.rollInterval=0
agent.sinks.k1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
agent.sinks.k1.deserializer.maxBlobLength = 2000000000
agent.sinks.k1.batchSize=1
############################################################## CHANNELS
# Each channel's type is defined.
agent.channels.m1.type = memory
agent.channels.m1.capacity = 100000
############################################################## RELATIONS
# The channel can be defined as follows.
#agent.sources.s1.channels = m1
agent.sources.s2.channels = m1
agent.sinks.k1.channel = m1
agent.sources.s1.interceptors = i1
agent.sources.s1.interceptors.i1.type = static
agent.sources.s1.interceptors.i1.key = fileName
agent.sources.s1.interceptors.i1.value = #box_id#
仍有两个问题待解决通过avro存储到hdfs未成功,还有就是同时在spooldir生成多个文件在file_roll端会打包生成一个文件,至于hdfs端还未知。
您可能也对下面文章感兴趣:
There are 1 Comments to "flume spooldir to file_roll实现"