Hello, I am using flume 1.7 on Amazon EMR cluster with hive version 2.1.1 and using hive sink for inserting data to partitioned and bucket hive table created on EMR (S3 storage). Flume commits the data successfully but sometimes it shows some exceptions saying
"Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: Writer closed." I tried to increase the calltimeout value but the problem again initiate after few minutes. Detailed log: 2017-10-26 05:32:08,839 (lifecycleSupervisor-1-6) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:228)] Component has already been stopped SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@10d43362 counterGroup:{ name:null counters:{runner.interruptions=1, runner.deliveryErrors=42, runner.backoffs.consecutive=2710, runner.backoffs=3330} } } 2017-10-26 05:32:08,945 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: Writer closed. Cannot write to : {metaStoreUri='thrift://localhost:9083', database='traqmatix', table='rawdata', partitionVals=[201710980] } at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:267) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Writer closed. Cannot write to : {metaStoreUri='thrift://localhost:9083', database='traqmatix', table='rawdata', partitionVals=[201710980] } at org.apache.flume.sink.hive.HiveWriter.write(HiveWriter.java:135) at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:299) at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:253) ... 3 more flume.conf agent1.sources = source1 agent1.channels = channel1 channel2 channel3 agent1.sinks = sink2 agent1.sources.source1.type = avro agent1.sources.source1.channels = channel2 agent1.sources.source1.bind = 0.0.0.0 agent1.sources.source1.port = 4646 agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = regex_extractor agent1.sources.source1.interceptors.i1.regex = partition_(\\d+):table_([A-Za-z0-9_]+) agent1.sources.source1.interceptors.i1.serializers = s1 s2 agent1.sources.source1.interceptors.i1.serializers.s1.name = partcol agent1.sources.source1.interceptors.i1.serializers.s2.name = table agent1.sources.source1.selector.type = multiplexing agent1.sources.source1.selector.header = table agent1.sources.source1.selector.mapping.database = channel1 agent1.sinks.sink2.type = hive agent1.sinks.sink2.channel = channel2 agent1.sinks.sink2.hive.metastore = thrift://localhost:9083 agent1.sinks.sink2.hive.database = database agent1.sinks.sink2.hive.table = table agent1.sinks.sink2.hive.partition = %{value} agent1.sinks.sink2.hive.txnsPerBatchAsk = 1000 agent1.sinks.sink2.batchSize = 100 agent1.sinks.sink2.callTimeout = 360000 agent1.sinks.sink2.useLocalTimeStamp = false agent1.sinks.sink2.round = true agent1.sinks.sink2.roundValue = 10 agent1.sinks.sink2.roundUnit = minute agent1.sinks.sink2.serializer = DELIMITED agent1.sinks.sink2.serializer.delimiter = , agent1.sinks.sink2.serializer.serdeSeparator = , agent1.sinks.sink2.serializer.fieldnames = field1,field2,, agent1.channels.channel2.type = FILE agent1.channels.channel2.checkpointDir = /home/hadoop/flume/channel-data/checkpoint_test/ agent1.channels.channel2.dataDirs = /home/hadoop/flume/channel-data/data_test/ agent1.channels.channel2.transactionCapacity = 1000000 agent1.channels.channel2.checkpointInterval 30000 agent1.channels.channel2.maxFileSize = 2146435071 agent1.channels.channel2.capacity = 10000000 Thanks & Regards, Amit Kumar, Mob: 9910611621