Hi Denes, Many thanks for you suggestion.
This is the solution I was considering, but was hoping there is a cleaner way of achieving the goal and not having to pass the information around in body. I feel that this should have been mentioned in Flume docs in File Channel section. I'll go with your suggestion. Cheers! On Thu, Aug 24, 2017 at 8:02 PM, Denes Arvay <de...@cloudera.com> wrote: > Hi Muhammad, > > File channel converts the events to its internal FlumeEvent class ( > https://github.com/apache/flume/blob/trunk/flume-ng- > channels/flume-file-channel/src/main/java/org/apache/ > flume/channel/file/FlumeEvent.java) during the put operation: > https://github.com/apache/flume/blob/trunk/flume-ng-channels/flume-file- > channel/src/main/java/org/apache/flume/channel/file/Log.java#L649-L650 > When the sink takes the events from the channel it gets FlumeEvent > instances (in case of file channel), that is the reason of the exception > you got. > Note that the file channel only stores the event's body and headers, no > other extra properties will be persisted thus returned on take operations. > > I don't know the exact details of your event implementation, but I'd > recommend to instead of casting the event to ServerEvent create a > constructor in ServerEvent which gets an org.apache.flume.Event (or byte[] > body, Map<String, String> headers parameters) and uses this to construct > itself. > > Regards, > Denes > > > On Thu, Aug 24, 2017 at 2:33 PM Muhammad Yaseen <yaseenaftab...@gmail.com> > wrote: > >> Hello, >> >> I have implemented a custom HTTPSource handler, and am not using the >> default { "headers": { } , "body": "text" } format. Hence, I am also not >> using the default JSONEvent class and have defined my own class (which also >> implements Event interface). >> >> class ServerEvent implements Event. >> >> It has 3 additional attributes (String server,String output,long >> timestamp) which are mapped to the json object keys by Gson in the my >> handler class (ServerEventHandler) . >> >> The format of JSON post data received: { "server": "some server", >> "output": "some output", "timestamp": 1234567890 } >> >> I have also implemented a custom Avro Serializer for this event class (by >> extending from AbstractAvroEventSerializer<T>). >> >> In the convert(Event e) method I am doing the following. >> { >> ServerEvent se = (ServerEvent)event; <-- exception on this line >> // do something with 'se' >> } >> >> This works fine when I use the Memory Channel, but throws the following >> exception if I use the File Channel: >> >> 7/08/24 07:33:16 ERROR flume.SinkRunner: Unable to deliver event. >> Exception follows. >> org.apache.flume.EventDeliveryException: java.lang.ClassCastException: >> org.apache.flume.channel.file.FlumeEvent cannot be cast to >> com.myapp.flumeapp.events.ServerEvent >> at org.apache.flume.sink.hdfs.HDFSEventSink.process( >> HDFSEventSink.java:451) >> at org.apache.flume.sink.DefaultSinkProcessor.process( >> DefaultSinkProcessor.java:67) >> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.ClassCastException: >> org.apache.flume.channel.file.FlumeEvent >> cannot be cast to com.myapp.flumeapp.events.ServerEvent >> at com.myapp.flumeapp.eventserializers.ServerEventAvroSerializer. >> convert(ServerEventAvroSerializer.java:46) >> at com.myapp.flumeapp.eventserializers.ServerEventAvroSerializer. >> convert(ServerEventAvroSerializer.java:15) >> at org.apache.flume.serialization.AbstractAvroEventSerializer.write( >> AbstractAvroEventSerializer.java:107) >> at org.apache.flume.sink.hdfs.HDFSDataStream.append( >> HDFSDataStream.java:119) >> at org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:540) >> at org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:537) >> at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:668) >> at org.apache.flume.auth.SimpleAuthenticator.execute( >> SimpleAuthenticator.java:50) >> at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:665) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at java.util.concurrent.ThreadPoolExecutor.runWorker( >> ThreadPoolExecutor.java:1149) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >> ThreadPoolExecutor.java:624) >> >> The same issue is mentioned here: https://github.com/ >> telefonicaid/fiware-cygnus/issues/1419 >> >> They trace the issue to custom event class not implementing the Writable >> interface. >> >> Has anyone else tried custom HTTP handler and serializer with file >> channel? Any way to get around this limitation ? >> >> -- >> >> Regards, >> Muhammad Yaseen >> >