Well, I got it to work -- I just don't know why it's working. Or, more
specifically, why it wasn't working before.
The way I got it to work was simply setting the consumer's deserializer to our
custom codec class instead of trying to pass the Exchange body as a byte[] to a
Processor that called the custom codec.
Basically, as soon as I did the following, everything worked:
in the properties:
kafka.consumer.deserializer = <our.org>.MyObjectCodec
in the route:
.process(new Processor() {
public void process (Exchange e) throws Exception {
MyObject obj = e.getIn().getBody(MyObject.class);
[...]
Anyway............
Thanks, Camel. It's all quite simple... once I know what I'm doing.
> On 07/22/2021 8:07 PM Ron Cecchini <[email protected]> wrote:
>
>
> Hi, all.
>
> There is a non-Camel producer that takes a Java object and serializes &
> Snappy-compresses it and writes it directly to a Kafka topic.
>
> I have a Camel-based consumer which is trying to take the raw byte[] off the
> Kafka topic and pass it to a custom method that does both the Snappy
> decompression and deserialization.
>
> The consumer route basically looks like:
>
>
> from("kafka:<topic>?brokers=<brokers>&groupId=<groupid>&autoOffsetReset=earliest&consumersCount=1")
> .convertBodyTo(byte[].class)
> .process(new Processor() {
> public void process (Exchange e) throws Exception {
> byte[] kryoSer = (byte[]) e.getIn().getBody(byte[].class);
> // pass 'kryoSer' to custom decompress/deserialize method
> [...]
>
> When I try it, I'm getting a "FAILED_TO_UNCOMPRESS" error:
>
> java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
> at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
> at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
> [...]
>
> When I print out the body of the Exchange before doing the .process(), I get
> mixture of binary gibberish, but at the top of it is "SNAPPY", following by
> lots of binary, and then a human-readable string that clearly looks like the
> original object that was written.
>
> It's my understanding that if the string was still truly compressed that
> there is no way there should be any readable text whatsoever. So now I'm
> wondering,
>
> Is Camal:Kafka perhaps automatically doing *some* amount of decompression
> before I get to the .process()?
>
> I'm reading through the component doc but nothing is jumping out at me yet.
> (there's a lot there to process...)
>
> Thanks for any help.
>
> Ron