Hi Denes,

Many thanks for you suggestion.

This is the solution I was considering, but was hoping there is a cleaner
way of achieving the goal and not having to pass the information around in
body.
I feel that this should have been mentioned in Flume docs in File Channel
section.

I'll go with your suggestion. Cheers!

On Thu, Aug 24, 2017 at 8:02 PM, Denes Arvay <de...@cloudera.com> wrote:

> Hi Muhammad,
>
> File channel converts the events to its internal FlumeEvent class (
> https://github.com/apache/flume/blob/trunk/flume-ng-
> channels/flume-file-channel/src/main/java/org/apache/
> flume/channel/file/FlumeEvent.java) during the put operation:
> https://github.com/apache/flume/blob/trunk/flume-ng-channels/flume-file-
> channel/src/main/java/org/apache/flume/channel/file/Log.java#L649-L650
> When the sink takes the events from the channel it gets FlumeEvent
> instances (in case of file channel), that is the reason of the exception
> you got.
> Note that the file channel only stores the event's body and headers, no
> other extra properties will be persisted thus returned on take operations.
>
> I don't know the exact details of your event implementation, but I'd
> recommend to instead of casting the event to ServerEvent create a
> constructor in ServerEvent which gets an org.apache.flume.Event (or byte[]
> body, Map<String, String> headers parameters) and uses this to construct
> itself.
>
> Regards,
> Denes
>
>
> On Thu, Aug 24, 2017 at 2:33 PM Muhammad Yaseen <yaseenaftab...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I have implemented a custom HTTPSource handler, and am not using the
>> default { "headers": {  } , "body": "text" } format. Hence, I am also not
>> using the default JSONEvent class and have defined my own class (which also
>> implements Event interface).
>>
>> class ServerEvent implements Event.
>>
>> It has 3 additional attributes (String server,String output,long
>> timestamp) which are mapped to the json object keys by Gson in the my
>> handler class (ServerEventHandler) .
>>
>> The format of JSON post data received: { "server": "some server",
>> "output": "some output", "timestamp": 1234567890 }
>>
>> I have also implemented a custom Avro Serializer for this event class (by
>> extending from AbstractAvroEventSerializer<T>).
>>
>> In the convert(Event e) method I am doing the following.
>> {
>>  ServerEvent se = (ServerEvent)event;  <-- exception on this line
>>  // do something with 'se'
>> }
>>
>> This works fine when I use the Memory Channel, but throws the following
>> exception if I use the File Channel:
>>
>> 7/08/24 07:33:16 ERROR flume.SinkRunner: Unable to deliver event.
>> Exception follows.
>> org.apache.flume.EventDeliveryException: java.lang.ClassCastException:
>> org.apache.flume.channel.file.FlumeEvent cannot be cast to
>> com.myapp.flumeapp.events.ServerEvent
>> at org.apache.flume.sink.hdfs.HDFSEventSink.process(
>> HDFSEventSink.java:451)
>> at org.apache.flume.sink.DefaultSinkProcessor.process(
>> DefaultSinkProcessor.java:67)
>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ClassCastException: 
>> org.apache.flume.channel.file.FlumeEvent
>> cannot be cast to com.myapp.flumeapp.events.ServerEvent
>> at  com.myapp.flumeapp.eventserializers.ServerEventAvroSerializer.
>> convert(ServerEventAvroSerializer.java:46)
>> at  com.myapp.flumeapp.eventserializers.ServerEventAvroSerializer.
>> convert(ServerEventAvroSerializer.java:15)
>> at org.apache.flume.serialization.AbstractAvroEventSerializer.write(
>> AbstractAvroEventSerializer.java:107)
>> at org.apache.flume.sink.hdfs.HDFSDataStream.append(
>> HDFSDataStream.java:119)
>> at org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:540)
>> at org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:537)
>> at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:668)
>> at org.apache.flume.auth.SimpleAuthenticator.execute(
>> SimpleAuthenticator.java:50)
>> at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:665)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1149)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:624)
>>
>> The same issue is mentioned here: https://github.com/
>> telefonicaid/fiware-cygnus/issues/1419
>>
>> They trace the issue to custom event class not implementing the Writable
>> interface.
>>
>> Has anyone else tried custom HTTP handler and serializer with file
>> channel? Any way to get around this limitation ?
>>
>> --
>>
>> Regards,
>> Muhammad Yaseen
>>
>

Reply via email to