[
https://issues.apache.org/jira/browse/CAMEL-21929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941012#comment-17941012
]
Chirag Sanghavi commented on CAMEL-21929:
-----------------------------------------
[~davsclaus] originally I was trying to identify how to use various
aggregation stretegies with Google Pubsub to send data as a batch. Google
supports writing multiple messages using async sender -
[https://cloud.google.com/pubsub/lite/docs/samples/pubsublite-publish-batch#pubsublite_publish_batch-java.|https://cloud.google.com/pubsub/lite/docs/samples/pubsublite-publish-batch#pubsublite_publish_batch-java]
Only after running tests, I was able to identify that google pubsub component
loops over messages and does not use batching offered by pubsub.
In that process, I ended up testing different aggregation stretegies to
identify which one works.
based on analysis I have done,
org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy works
fine (component has support for it).
but it does not support following two very well (one fails and one sends
serialized java object).
org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy
org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy
This Jira is specificly about failures occuring in component.
Alternatively, we will want to update documentation to indicate following;
* PubSub Batching is not supported.
* any reason aggregation is to be used with pubsub, only
GroupedExchangeAggregationStrategy should be leveraged.
Just for context CAMEL-21882
(https://issues.apache.org/jira/browse/CAMEL-21882) is to create new exchange
once x number of bytes are collected in aggregation to help ensure that
aggregated exchanges are not larger than what can be handled by components
downstream.
> Use of GroupedMessageAggregationStrategy causes issue with
> camel-google-pubsub component serialization
> -------------------------------------------------------------------------------------------------------
>
> Key: CAMEL-21929
> URL: https://issues.apache.org/jira/browse/CAMEL-21929
> Project: Camel
> Issue Type: Improvement
> Components: camel-google-pubsub
> Reporter: Chirag Sanghavi
> Priority: Minor
> Attachments: GooglePubsubProducer.java
>
>
> if a route uses aggegation with GroupedMessageAggregationStrategy, it results
> into an Exchange/Body containing a list of DefaultMessage objects.
> DefaultMessage is not a serializable object causing exceptions.
>
> {code:java}
> Stacktrace
> ---------------------------------------------------------------------------------------------------------------------------------------
> java.io.NotSerializableException: org.apache.camel.support.DefaultMessage
> at
> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1200)
> ~[?:?]
> at
> java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:358)
> ~[?:?]
> at java.base/java.util.ArrayList.writeObject(ArrayList.java:948)
> ~[?:?]
> at
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> ~[?:?]
> at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[?:?]
> at
> java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1072)
> ~[?:?]
> at
> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1529)
> ~[?:?]
> at
> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1451)
> ~[?:?]
> at
> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1194)
> ~[?:?]
> at
> java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:358)
> ~[?:?]
> at
> org.apache.camel.component.google.pubsub.serializer.DefaultGooglePubsubSerializer.serialize(DefaultGooglePubsubSerializer.java:32)
> ~[camel-google-pubsub-4.10.3.jar:4.10.3]
> at
> org.apache.camel.component.google.pubsub.GooglePubsubProducer.send(GooglePubsubProducer.java:97)
> ~[camel-google-pubsub-4.10.3.jar:4.10.3]
> at
> org.apache.camel.component.google.pubsub.GooglePubsubProducer.process(GooglePubsubProducer.java:75)
> ~[camel-google-pubsub-4.10.3.jar:4.10.3]
> at
> org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:65)
> {code}
> The fix requires update to
> GooglePubsubProducer in process function as well as create an overloaded send
> function or switching Send function to object and using branch with
> "instanceof" to run different logic (defaultMessage.getBody() in place of
> exchange.getIn().getBody().
--
This message was sent by Atlassian Jira
(v8.20.10#820010)