Hi,

I have noticed a couple of issues in my flume setup. Below I have described my 
setup,issues, flume.conf

Setup:-

I'm using Flume-Ng 1.4 cdh4.4 Tarball for collecting aggregated logs.
I am running a 2 tier(agent,collector) Flume Configuration with custom plugins. 
There are approximately 20 agents (receiving data) and 6 collector flume 
(writing to HDFS) machines all running independently.

Issues :-

The agent seems to be running fine.However; I notice a couple of issues in the 
collector side (the collector flume.conf is included in the end of email):

  *   Issue 1 :- Assume flume is writing a file in a data node.Now; if that 
data node crashes for some reason. Flume does not recovers from this situation. 
Ideally;  flume should skip writing to that file and continue its 
processing.However; we see it tries to reach that file.If it can't; it keeps on 
trying infinitely and stops doing any other processing. Either I'm not doing 
something right OR there is a bug in HDFS Sink

STDOUT of the logs:


10 Jun 2014 20:23:40,878 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] 
(org.apache.flume.sink.hdfs.BucketWriter.append:477)  - Caught IOException 
writing to HDFSWriter (70000 millis timeout while waiting for channel to be 
ready for read. ch : java.nio.channels.SocketChannel[connected 
local=/10.64.4.22:58980 remote=/10.64.6.134:50010]). Closing file 
(hdfs://namenode/data/2014/06/10/1900/stream/c-6-record1.1402429221442.tmp) and 
rethrowing exception.

10 Jun 2014 20:23:40,904 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] 
(org.apache.flume.sink.hdfs.BucketWriter.append:483)  - Caught IOException 
while closing file 
(hdfs://namenode/data/2014/06/10/1900/rumnonhttp/c-6-rum24-nonhttprecord.1402429221442.tmp).
 Exception follows.

java.net.SocketTimeoutException: 70000 millis timeout while waiting for channel 
to be ready for read. ch : java.nio.channels.SocketChannel[connected 
local=/10.64.4.22:58980 remote=/10.64.6.134:50010]

        at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:165)

        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:156)

        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:129)

        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:117)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at 
org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed(HdfsProtoUtil.java:169)

        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:954)

        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:922)

        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1023)

        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:821)

        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)

10 Jun 2014 20:23:40,904 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] 
(org.apache.flume.sink.hdfs.HDFSEventSink.process:438)  - HDFS IO error

java.net.SocketTimeoutException: 70000 millis timeout while waiting for channel 
to be ready for read. ch : java.nio.channels.SocketChannel[connected 
local=/10.64.4.22:58980 remote=/10.64.6.134:50010]

        at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:165)

        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:156)

        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:129)

        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:117)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at 
org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed(HdfsProtoUtil.java:169)

        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:954)

        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:922)

        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1023)

        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:821)

        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)

10 Jun 2014 20:23:41,617 INFO  [New I/O server boss #1 ([id: 0x5201a55f, 
/0.0.0.0:53000])] 
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171)  - 
[id: 0x00b4bdbe, /10.75.201.32:42877 => /10.64.4.22:53000] OPEN

10 Jun 2014 20:23:41,617 INFO  [New I/O  worker #8] 
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171)  - 
[id: 0x00b4bdbe, /10.75.201.32:42877 => /10.64.4.22:53000] BOUND: 
/10.64.4.22:53000

10 Jun 2014 20:23:41,617 INFO  [New I/O  worker #8] 
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171)  - 
[id: 0x00b4bdbe, /10.75.201.32:42877 => /10.64.4.22:53000] CONNECTED: 
/10.75.201.32:42877

10 Jun 2014 20:23:43,686 DEBUG [New I/O  worker #8] 
(com.viasat.flume.sources.RUMFilterAvroSource.appendBatch:359)  - Avro source 
AvroSource: Received avro event batch of 10000 events.

10 Jun 2014 20:23:44,646 ERROR [New I/O  worker #7] 
(com.viasat.flume.sources.RUMFilterAvroSource.appendBatch:401)  - Avro source 
AvroSource: Unable to process event batch. Exception follows.

org.apache.flume.ChannelException: Unable to put batch on required channel: 
org.apache.flume.channel.MemoryChannel{name: NonHttpHdfsChannel}

        at 
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)

        at 
com.viasat.flume.sources.RUMFilterAvroSource.appendBatch(RUMFilterAvroSource.java:398)
 .....


  *   Issue 2 :- I am using a File Channel in my collector.conf .  I noticed 
the replay of logs takes a lot of time; around 24 hrs to replay 12 GB of data. 
I am using Amazon EBS IOPS drive for file channel storage and also dual 
checkpoints in file channel conf. On parsing the flume.logs; I noticed that 
there is a Bad Checkpoint Exception.

So; putting all the pieces together; Flume founded a bad checkpoint and it 
tried to replay all logs worth 12 GB. What makes the time to replay logs (12 
GB) at around 24 hours ?


P.S :  Each event/record is avg 2KB .




Flume Collector Configuration  :-

# Name the components on this agent
agent.sources = r1
agent.channels = c1 c2 c3 c4 c5
agent.sinks = k1 k2 k3 k4 k5


# Describe/configure the source r1
agent.sources.r1.type = CustomSource
agent.sources.r1.channels = c1 c2 c3 c4 c5
agent.sources.r1.bind = 0.0.0.0
agent.sources.r1.port = 53000
agent.sources.r1.schemaFolder = /usr/lib/flume-ng/schema
agent.sources.r1.selector.type = multiplexing
agent.sources.r1.selector.header = rectype
agent.sources.r1.selector.mapping.Record-1 = c1
agent.sources.r1.selector.mapping.Record-2 = c2
agent.sources.r1.selector.mapping.Record-3 = c3
agent.sources.r1.selector.mapping.Record-4 = c4
agent.sources.r1.selector.mapping.Record-5 = c5

# c1 channel config
agent.channels.c1.type = file
agent.channels.c1.useDualCheckpoints = true
agent.channels.c1.checkpointDir = 
/usr/lib/flume-ng/datastore/collector/record-1-channel/checkpoint
agent.channels.c1.backUpCheckpointDir = 
/usr/lib/flume-ng/datastore/collector/record-1-channel/backUpCheckpoint
agent.channels.c1.dataDirs = 
/usr/lib/flume-ng/datastore/collector/record-1-channel/logs
agent.channels.c1.capacity = 30000
agent.channels.c1.transactionCapacity = 3000
agent.channels.c1.write-timeout = 30
agent.channels.c1.keep-alive = 30


#c2 channel config
agent.channels.c2.type = file
agent.channels.c2.useDualCheckpoints = true
agent.channels.c2.checkpointDir = 
/usr/lib/flume-ng/datastore/collector/record-2-channel/checkpoint
agent.channels.c2.backUpCheckpointDir = 
/usr/lib/flume-ng/datastore/collector/record-2-channel/backUpCheckpoint
agent.channels.c2.dataDirs = 
/usr/lib/flume-ng/datastore/collector/record-2-channel/logs
agent.channels.c2.capacity = 30000
agent.channels.c2.transactionCapacity = 3000
agent.channels.c2.write-timeout = 30
agent.channels.c2.keep-alive = 30


# c3 channel config
agent.channels.c3.type = file
agent.channels.c3.useDualCheckpoints = true
agent.channels.c3.checkpointDir = 
/usr/lib/flume-ng/datastore/collector/record-3-channel/checkpoint
agent.channels.c3.backUpCheckpointDir = 
/usr/lib/flume-ng/datastore/collector/record-3-channel/backUpCheckpoint
agent.channels.c3.dataDirs = 
/usr/lib/flume-ng/datastore/collector/record-3-channel/logs
agent.channels.c3.capacity = 30000
agent.channels.c3.transactionCapacity = 3000
agent.channels.c3.write-timeout = 30
agent.channels.c3.keep-alive = 30


#c4 channel config
agent.channels.c4.type = file
agent.channels.c4.useDualCheckpoints = true
agent.channels.c4.checkpointDir = 
/usr/lib/flume-ng/datastore/collector/record-4-channel/checkpoint
agent.channels.c4.backUpCheckpointDir = 
/usr/lib/flume-ng/datastore/collector/record-4-channel/backUpCheckpointt
agent.channels.c4.dataDirs = 
/usr/lib/flume-ng/datastore/collector/record-4-channel/logs
agent.channels.c4.capacity = 30000
agent.channels.c4.transactionCapacity = 3000
agent.channels.c4.write-timeout = 30
agent.channels.c4.keep-alive = 30


#c5 channel config
agent.channels.c5.type = file
agent.channels.c5.useDualCheckpoints = true
agent.channels.c5.checkpointDir = 
/usr/lib/flume-ng/datastore/collector/record-5-channel/checkpoint
agent.channels.c5.backUpCheckpointDir = 
/usr/lib/flume-ng/datastore/collector/record-5-channel/backUpCheckpointt
agent.channels.c5.dataDirs = 
/usr/lib/flume-ng/datastore/collector/record-5-channel/logs
agent.channels.c5.capacity = 30000
agent.channels.c5.transactionCapacity = 3000
agent.channels.c5.write-timeout = 30
agent.channels.c5.keep-alive = 30




#k1 sink config
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.serializer = CustomSerializer$Builder
agent.sinks.k1.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k1.serializer.schemaVersion = 24
agent.sinks.k1.serializer.syncIntervalBytes = 4096000
#agent.sinks.k1.serializer = avro
agent.sinks.k1.serializer.compressionCodec = snappy
agent.sinks.k1.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-1
agent.sinks.k1.hdfs.filePrefix = rec-1
agent.sinks.k1.hdfs.rollSize = 0
agent.sinks.k1.hdfs.rollInterval = 1200
agent.sinks.k1.hdfs.rollCount = 0
agent.sinks.k1.hdfs.callTimeout = 30000
agent.sinks.k1.hdfs.batchSize = 1000

#k2 sink config
agent.sinks.k2.type = hdfs
agent.sinks.k2.channel = c2
agent.sinks.k2.hdfs.fileType = DataStream
agent.sinks.k2.serializer = CustomSerializer$Builder
agent.sinks.k2.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k2.serializer.schemaVersion = 24
agent.sinks.k2.serializer.syncIntervalBytes = 4096000
#agent.sinks.k2.serializer = avro
agent.sinks.k2.serializer.compressionCodec = snappy
agent.sinks.k2.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-2
agent.sinks.k2.hdfs.filePrefix = rec-2
agent.sinks.k2.hdfs.rollSize = 0
agent.sinks.k2.hdfs.rollInterval = 1200
agent.sinks.k2.hdfs.rollCount = 0
agent.sinks.k2.hdfs.callTimeout = 30000
agent.sinks.k2.hdfs.batchSize = 1000

#k3 sink config
agent.sinks.k3.type = hdfs
agent.sinks.k3.channel = c3
agent.sinks.k3.hdfs.fileType = DataStream
agent.sinks.k3.serializer = CustomSerializer$Builder
agent.sinks.k3.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k3.serializer.schemaVersion = 24
agent.sinks.k3.serializer.syncIntervalBytes = 4096000
#agent.sinks.k3.serializer = avro
agent.sinks.k3.serializer.compressionCodec = snappy
agent.sinks.k3.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-3
agent.sinks.k3.hdfs.filePrefix = rec-3
agent.sinks.k3.hdfs.rollSize = 0
agent.sinks.k3.hdfs.rollInterval = 1200
agent.sinks.k3.hdfs.rollCount = 0
agent.sinks.k3.hdfs.callTimeout = 30000
agent.sinks.k3.hdfs.batchSize = 1000

#k4 sink config
agent.sinks.k4.type = hdfs
agent.sinks.k4.channel = c4
agent.sinks.k4.hdfs.fileType = DataStream
agent.sinks.k4.serializer = CustomSerializer$Builder
agent.sinks.k4.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k4.serializer.schemaVersion = 24
agent.sinks.k4.serializer.syncIntervalBytes = 4096000
#agent.sinks.k4.serializer = avro
agent.sinks.k4.serializer.compressionCodec = snappy
agent.sinks.k4.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-4
agent.sinks.k4.hdfs.filePrefix = rec-4
agent.sinks.k4.hdfs.rollSize = 0
agent.sinks.k4.hdfs.rollInterval = 1200
agent.sinks.k4.hdfs.rollCount = 0
agent.sinks.k4.hdfs.callTimeout = 30000
agent.sinks.k4.hdfs.batchSize = 1000

#k5 sink config
agent.sinks.k5.type = hdfs
agent.sinks.k5.channel = c5
agent.sinks.k5.hdfs.fileType = DataStream
agent.sinks.k5.serializer = CustomSerializer$Builder
agent.sinks.k5.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k5.serializer.schemaVersion = 24
agent.sinks.k5.serializer.syncIntervalBytes = 4096000
#agent.sinks.k5.serializer = avro
agent.sinks.k5.serializer.compressionCodec = snappy
agent.sinks.k5.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-5
agent.sinks.k5.hdfs.filePrefix = rec-5
agent.sinks.k5.hdfs.rollSize = 0
agent.sinks.k5.hdfs.rollInterval = 1200
agent.sinks.k5.hdfs.rollCount = 0
agent.sinks.k5.hdfs.callTimeout = 30000
agent.sinks.k5.hdfs.batchSize = 1000


Any Inputs/Suggestions ?


Thanks
-Kushal Mangtani











Reply via email to