Hi, I have noticed a couple of issues in my flume setup. Below I have described my setup,issues, flume.conf
Setup:- I'm using Flume-Ng 1.4 cdh4.4 Tarball for collecting aggregated logs. I am running a 2 tier(agent,collector) Flume Configuration with custom plugins. There are approximately 20 agents (receiving data) and 6 collector flume (writing to HDFS) machines all running independently. Issues :- The agent seems to be running fine.However; I notice a couple of issues in the collector side (the collector flume.conf is included in the end of email): * Issue 1 :- Assume flume is writing a file in a data node.Now; if that data node crashes for some reason. Flume does not recovers from this situation. Ideally; flume should skip writing to that file and continue its processing.However; we see it tries to reach that file.If it can't; it keeps on trying infinitely and stops doing any other processing. Either I'm not doing something right OR there is a bug in HDFS Sink STDOUT of the logs: 10 Jun 2014 20:23:40,878 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.append:477) - Caught IOException writing to HDFSWriter (70000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.64.4.22:58980 remote=/10.64.6.134:50010]). Closing file (hdfs://namenode/data/2014/06/10/1900/stream/c-6-record1.1402429221442.tmp) and rethrowing exception. 10 Jun 2014 20:23:40,904 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.append:483) - Caught IOException while closing file (hdfs://namenode/data/2014/06/10/1900/rumnonhttp/c-6-rum24-nonhttprecord.1402429221442.tmp). Exception follows. java.net.SocketTimeoutException: 70000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.64.4.22:58980 remote=/10.64.6.134:50010] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:165) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:156) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:129) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:117) at java.io.FilterInputStream.read(FilterInputStream.java:83) at java.io.FilterInputStream.read(FilterInputStream.java:83) at org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed(HdfsProtoUtil.java:169) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:954) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:922) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1023) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:821) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463) 10 Jun 2014 20:23:40,904 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:438) - HDFS IO error java.net.SocketTimeoutException: 70000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.64.4.22:58980 remote=/10.64.6.134:50010] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:165) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:156) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:129) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:117) at java.io.FilterInputStream.read(FilterInputStream.java:83) at java.io.FilterInputStream.read(FilterInputStream.java:83) at org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed(HdfsProtoUtil.java:169) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:954) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:922) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1023) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:821) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463) 10 Jun 2014 20:23:41,617 INFO [New I/O server boss #1 ([id: 0x5201a55f, /0.0.0.0:53000])] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0x00b4bdbe, /10.75.201.32:42877 => /10.64.4.22:53000] OPEN 10 Jun 2014 20:23:41,617 INFO [New I/O worker #8] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0x00b4bdbe, /10.75.201.32:42877 => /10.64.4.22:53000] BOUND: /10.64.4.22:53000 10 Jun 2014 20:23:41,617 INFO [New I/O worker #8] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0x00b4bdbe, /10.75.201.32:42877 => /10.64.4.22:53000] CONNECTED: /10.75.201.32:42877 10 Jun 2014 20:23:43,686 DEBUG [New I/O worker #8] (com.viasat.flume.sources.RUMFilterAvroSource.appendBatch:359) - Avro source AvroSource: Received avro event batch of 10000 events. 10 Jun 2014 20:23:44,646 ERROR [New I/O worker #7] (com.viasat.flume.sources.RUMFilterAvroSource.appendBatch:401) - Avro source AvroSource: Unable to process event batch. Exception follows. org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name: NonHttpHdfsChannel} at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200) at com.viasat.flume.sources.RUMFilterAvroSource.appendBatch(RUMFilterAvroSource.java:398) ..... * Issue 2 :- I am using a File Channel in my collector.conf . I noticed the replay of logs takes a lot of time; around 24 hrs to replay 12 GB of data. I am using Amazon EBS IOPS drive for file channel storage and also dual checkpoints in file channel conf. On parsing the flume.logs; I noticed that there is a Bad Checkpoint Exception. So; putting all the pieces together; Flume founded a bad checkpoint and it tried to replay all logs worth 12 GB. What makes the time to replay logs (12 GB) at around 24 hours ? P.S : Each event/record is avg 2KB . Flume Collector Configuration :- # Name the components on this agent agent.sources = r1 agent.channels = c1 c2 c3 c4 c5 agent.sinks = k1 k2 k3 k4 k5 # Describe/configure the source r1 agent.sources.r1.type = CustomSource agent.sources.r1.channels = c1 c2 c3 c4 c5 agent.sources.r1.bind = 0.0.0.0 agent.sources.r1.port = 53000 agent.sources.r1.schemaFolder = /usr/lib/flume-ng/schema agent.sources.r1.selector.type = multiplexing agent.sources.r1.selector.header = rectype agent.sources.r1.selector.mapping.Record-1 = c1 agent.sources.r1.selector.mapping.Record-2 = c2 agent.sources.r1.selector.mapping.Record-3 = c3 agent.sources.r1.selector.mapping.Record-4 = c4 agent.sources.r1.selector.mapping.Record-5 = c5 # c1 channel config agent.channels.c1.type = file agent.channels.c1.useDualCheckpoints = true agent.channels.c1.checkpointDir = /usr/lib/flume-ng/datastore/collector/record-1-channel/checkpoint agent.channels.c1.backUpCheckpointDir = /usr/lib/flume-ng/datastore/collector/record-1-channel/backUpCheckpoint agent.channels.c1.dataDirs = /usr/lib/flume-ng/datastore/collector/record-1-channel/logs agent.channels.c1.capacity = 30000 agent.channels.c1.transactionCapacity = 3000 agent.channels.c1.write-timeout = 30 agent.channels.c1.keep-alive = 30 #c2 channel config agent.channels.c2.type = file agent.channels.c2.useDualCheckpoints = true agent.channels.c2.checkpointDir = /usr/lib/flume-ng/datastore/collector/record-2-channel/checkpoint agent.channels.c2.backUpCheckpointDir = /usr/lib/flume-ng/datastore/collector/record-2-channel/backUpCheckpoint agent.channels.c2.dataDirs = /usr/lib/flume-ng/datastore/collector/record-2-channel/logs agent.channels.c2.capacity = 30000 agent.channels.c2.transactionCapacity = 3000 agent.channels.c2.write-timeout = 30 agent.channels.c2.keep-alive = 30 # c3 channel config agent.channels.c3.type = file agent.channels.c3.useDualCheckpoints = true agent.channels.c3.checkpointDir = /usr/lib/flume-ng/datastore/collector/record-3-channel/checkpoint agent.channels.c3.backUpCheckpointDir = /usr/lib/flume-ng/datastore/collector/record-3-channel/backUpCheckpoint agent.channels.c3.dataDirs = /usr/lib/flume-ng/datastore/collector/record-3-channel/logs agent.channels.c3.capacity = 30000 agent.channels.c3.transactionCapacity = 3000 agent.channels.c3.write-timeout = 30 agent.channels.c3.keep-alive = 30 #c4 channel config agent.channels.c4.type = file agent.channels.c4.useDualCheckpoints = true agent.channels.c4.checkpointDir = /usr/lib/flume-ng/datastore/collector/record-4-channel/checkpoint agent.channels.c4.backUpCheckpointDir = /usr/lib/flume-ng/datastore/collector/record-4-channel/backUpCheckpointt agent.channels.c4.dataDirs = /usr/lib/flume-ng/datastore/collector/record-4-channel/logs agent.channels.c4.capacity = 30000 agent.channels.c4.transactionCapacity = 3000 agent.channels.c4.write-timeout = 30 agent.channels.c4.keep-alive = 30 #c5 channel config agent.channels.c5.type = file agent.channels.c5.useDualCheckpoints = true agent.channels.c5.checkpointDir = /usr/lib/flume-ng/datastore/collector/record-5-channel/checkpoint agent.channels.c5.backUpCheckpointDir = /usr/lib/flume-ng/datastore/collector/record-5-channel/backUpCheckpointt agent.channels.c5.dataDirs = /usr/lib/flume-ng/datastore/collector/record-5-channel/logs agent.channels.c5.capacity = 30000 agent.channels.c5.transactionCapacity = 3000 agent.channels.c5.write-timeout = 30 agent.channels.c5.keep-alive = 30 #k1 sink config agent.sinks.k1.type = hdfs agent.sinks.k1.channel = c1 agent.sinks.k1.hdfs.fileType = DataStream agent.sinks.k1.serializer = CustomSerializer$Builder agent.sinks.k1.serializer.schemaFile = /usr/lib/flume-ng/schema/schema agent.sinks.k1.serializer.schemaVersion = 24 agent.sinks.k1.serializer.syncIntervalBytes = 4096000 #agent.sinks.k1.serializer = avro agent.sinks.k1.serializer.compressionCodec = snappy agent.sinks.k1.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-1 agent.sinks.k1.hdfs.filePrefix = rec-1 agent.sinks.k1.hdfs.rollSize = 0 agent.sinks.k1.hdfs.rollInterval = 1200 agent.sinks.k1.hdfs.rollCount = 0 agent.sinks.k1.hdfs.callTimeout = 30000 agent.sinks.k1.hdfs.batchSize = 1000 #k2 sink config agent.sinks.k2.type = hdfs agent.sinks.k2.channel = c2 agent.sinks.k2.hdfs.fileType = DataStream agent.sinks.k2.serializer = CustomSerializer$Builder agent.sinks.k2.serializer.schemaFile = /usr/lib/flume-ng/schema/schema agent.sinks.k2.serializer.schemaVersion = 24 agent.sinks.k2.serializer.syncIntervalBytes = 4096000 #agent.sinks.k2.serializer = avro agent.sinks.k2.serializer.compressionCodec = snappy agent.sinks.k2.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-2 agent.sinks.k2.hdfs.filePrefix = rec-2 agent.sinks.k2.hdfs.rollSize = 0 agent.sinks.k2.hdfs.rollInterval = 1200 agent.sinks.k2.hdfs.rollCount = 0 agent.sinks.k2.hdfs.callTimeout = 30000 agent.sinks.k2.hdfs.batchSize = 1000 #k3 sink config agent.sinks.k3.type = hdfs agent.sinks.k3.channel = c3 agent.sinks.k3.hdfs.fileType = DataStream agent.sinks.k3.serializer = CustomSerializer$Builder agent.sinks.k3.serializer.schemaFile = /usr/lib/flume-ng/schema/schema agent.sinks.k3.serializer.schemaVersion = 24 agent.sinks.k3.serializer.syncIntervalBytes = 4096000 #agent.sinks.k3.serializer = avro agent.sinks.k3.serializer.compressionCodec = snappy agent.sinks.k3.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-3 agent.sinks.k3.hdfs.filePrefix = rec-3 agent.sinks.k3.hdfs.rollSize = 0 agent.sinks.k3.hdfs.rollInterval = 1200 agent.sinks.k3.hdfs.rollCount = 0 agent.sinks.k3.hdfs.callTimeout = 30000 agent.sinks.k3.hdfs.batchSize = 1000 #k4 sink config agent.sinks.k4.type = hdfs agent.sinks.k4.channel = c4 agent.sinks.k4.hdfs.fileType = DataStream agent.sinks.k4.serializer = CustomSerializer$Builder agent.sinks.k4.serializer.schemaFile = /usr/lib/flume-ng/schema/schema agent.sinks.k4.serializer.schemaVersion = 24 agent.sinks.k4.serializer.syncIntervalBytes = 4096000 #agent.sinks.k4.serializer = avro agent.sinks.k4.serializer.compressionCodec = snappy agent.sinks.k4.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-4 agent.sinks.k4.hdfs.filePrefix = rec-4 agent.sinks.k4.hdfs.rollSize = 0 agent.sinks.k4.hdfs.rollInterval = 1200 agent.sinks.k4.hdfs.rollCount = 0 agent.sinks.k4.hdfs.callTimeout = 30000 agent.sinks.k4.hdfs.batchSize = 1000 #k5 sink config agent.sinks.k5.type = hdfs agent.sinks.k5.channel = c5 agent.sinks.k5.hdfs.fileType = DataStream agent.sinks.k5.serializer = CustomSerializer$Builder agent.sinks.k5.serializer.schemaFile = /usr/lib/flume-ng/schema/schema agent.sinks.k5.serializer.schemaVersion = 24 agent.sinks.k5.serializer.syncIntervalBytes = 4096000 #agent.sinks.k5.serializer = avro agent.sinks.k5.serializer.compressionCodec = snappy agent.sinks.k5.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-5 agent.sinks.k5.hdfs.filePrefix = rec-5 agent.sinks.k5.hdfs.rollSize = 0 agent.sinks.k5.hdfs.rollInterval = 1200 agent.sinks.k5.hdfs.rollCount = 0 agent.sinks.k5.hdfs.callTimeout = 30000 agent.sinks.k5.hdfs.batchSize = 1000 Any Inputs/Suggestions ? Thanks -Kushal Mangtani