Hello,

I am using flume 1.7 on Amazon EMR cluster with hive version 2.1.1 and
using hive sink for inserting data to partitioned and bucket hive table
created on EMR (S3 storage). Flume commits the data successfully but
sometimes it shows some exceptions saying

"Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
Writer closed."

I tried to increase the calltimeout value but the problem again initiate
after few minutes.

Detailed log:

2017-10-26 05:32:08,839 (lifecycleSupervisor-1-6) [INFO -
org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:228)]
Component has already been stopped SinkRunner: {
policy:org.apache.flume.sink.DefaultSinkProcessor@10d43362 counterGroup:{
name:null counters:{runner.interruptions=1, runner.deliveryErrors=42,
runner.backoffs.consecutive=2710, runner.backoffs=3330} } }
2017-10-26 05:32:08,945 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[ERROR -
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable
to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
Writer closed. Cannot write to : {metaStoreUri='thrift://localhost:9083',
database='traqmatix', table='rawdata', partitionVals=[201710980] }
        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:267)
        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Writer closed. Cannot write to
: {metaStoreUri='thrift://localhost:9083', database='traqmatix',
table='rawdata', partitionVals=[201710980] }
        at org.apache.flume.sink.hive.HiveWriter.write(HiveWriter.java:135)
        at
org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:299)
        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:253)
        ... 3 more


flume.conf

agent1.sources = source1
agent1.channels = channel1 channel2 channel3
agent1.sinks = sink2

agent1.sources.source1.type = avro
agent1.sources.source1.channels =  channel2
agent1.sources.source1.bind = 0.0.0.0
agent1.sources.source1.port = 4646

agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = regex_extractor
agent1.sources.source1.interceptors.i1.regex =
partition_(\\d+):table_([A-Za-z0-9_]+)
agent1.sources.source1.interceptors.i1.serializers = s1 s2
agent1.sources.source1.interceptors.i1.serializers.s1.name = partcol
agent1.sources.source1.interceptors.i1.serializers.s2.name = table

agent1.sources.source1.selector.type = multiplexing
agent1.sources.source1.selector.header = table
agent1.sources.source1.selector.mapping.database = channel1

agent1.sinks.sink2.type = hive
agent1.sinks.sink2.channel = channel2
agent1.sinks.sink2.hive.metastore = thrift://localhost:9083
agent1.sinks.sink2.hive.database = database
agent1.sinks.sink2.hive.table = table
agent1.sinks.sink2.hive.partition = %{value}
agent1.sinks.sink2.hive.txnsPerBatchAsk = 1000
agent1.sinks.sink2.batchSize = 100
agent1.sinks.sink2.callTimeout = 360000
agent1.sinks.sink2.useLocalTimeStamp = false
agent1.sinks.sink2.round = true
agent1.sinks.sink2.roundValue = 10
agent1.sinks.sink2.roundUnit = minute
agent1.sinks.sink2.serializer = DELIMITED
agent1.sinks.sink2.serializer.delimiter = ,
agent1.sinks.sink2.serializer.serdeSeparator = ,
agent1.sinks.sink2.serializer.fieldnames = field1,field2,,

agent1.channels.channel2.type = FILE
agent1.channels.channel2.checkpointDir =
/home/hadoop/flume/channel-data/checkpoint_test/
agent1.channels.channel2.dataDirs =
/home/hadoop/flume/channel-data/data_test/
agent1.channels.channel2.transactionCapacity = 1000000
agent1.channels.channel2.checkpointInterval 30000
agent1.channels.channel2.maxFileSize = 2146435071
agent1.channels.channel2.capacity = 10000000



Thanks & Regards,

Amit Kumar,
Mob: 9910611621

Reply via email to