Hi,

I'm currently attempting to make use of an Embedded Agent with a file based 
channel which will be used to write to another agent and then ultimately into a 
hdfs sequence file. The connection is successfully made, and data is sent 
across to the other agent (and then a sequence file). However, I'm trying to 
understand how to close the agent and flush out any 'local' data that still may 
need to be transferred on program close.

As I am using an AvroSink, I read that if a channel sends a null event the 
batch is immediately sent (read from 
https://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/AvroSink.html).
 My idea, then, was to pass null to the agent just as the program is requested 
to stop, like so:

this.agent.put(null);
this.agent.stop();

This seemed to attempt to send the batch, however when the agent.stop() method 
is called, an error is printed in the logs:

org.apache.flume.EventDeliveryException: Failed to send events
       at 
org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:382) 
~[flume-ng-core-1.4.0-cdh4.4.0.jar:na]
       at 
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
 ~[flume-ng-core-1.4.0-cdh4.4.0.jar:na]
       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) 
[flume-ng-core-1.4.0-cdh4.4.0.jar:na]
       at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 
oraclelinux6.localdomain, port: 44444 }: Failed to send batch
       at 
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:294)
 ~[flume-ng-sdk-1.4.0-cdh4.4.0.jar:na]
       at 
org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:366) 
~[flume-ng-core-1.4.0-cdh4.4.0.jar:na]
       ... 3 common frames omitted
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 
oraclelinux6.localdomain, port: 44444 }: Interrupted in handshake
       at 
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:341)
 ~[flume-ng-sdk-1.4.0-cdh4.4.0.jar:na]
       at 
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:282)
 ~[flume-ng-sdk-1.4.0-cdh4.4.0.jar:na]
       ... 4 common frames omitted
Caused by: java.lang.InterruptedException: null
       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:400) 
~[na:1.7.0_45]
       at java.util.concurrent.FutureTask.get(FutureTask.java:199) 
~[na:1.7.0_45]
       at 
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:336)
 ~[flume-ng-sdk-1.4.0-cdh4.4.0.jar:na]
       ... 5 common frames omitted

>From the error message, "Interrupted in handshake", I would suggest that 
>whilst attempting to send the batch, the agent.stop() method interrupted it 
>and caused the exception. When I put a Thread.sleep(10000) between putting a 
>null event and the stop method, the problem seems to disappear and the batch 
>is successfully flushed (from what I can see). I was wondering whether I have 
>understood fully what is going on, and if so, whether there is a more elegant 
>way of shutting down the embedded agent and flushing any events that are still 
>stored locally?

Any help would be greatly appreciated.

Thanks and kind regards,

Adam
The information included in this email and any files transmitted with it may 
contain information that is confidential and it must not be used by, or its 
contents or attachments copied or disclosed, to persons other than the intended 
addressee. If you have received this email in error, please notify BJSS. In the 
absence of written agreement to the contrary BJSS' relevant standard terms of 
contract for any work to be undertaken will apply. Please carry out virus or 
such other checks as you consider appropriate in respect of this email. BJSS do 
not accept responsibility for any adverse effect upon your system or data in 
relation to this email or any files transmitted with it. BJSS Limited, a 
company registered in England and Wales (Company Number 2777575), VAT 
Registration Number 613295452, Registered Office Address, First Floor, Coronet 
House, Queen Street, Leeds, LS1 2TW

Reply via email to