[ 
https://issues.apache.org/jira/browse/CAMEL-21929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941012#comment-17941012
 ] 

Chirag Sanghavi commented on CAMEL-21929:
-----------------------------------------

[~davsclaus]  originally I was trying to identify how to use various 
aggregation stretegies with Google Pubsub to send data as a batch. Google 
supports writing multiple messages using async sender - 
[https://cloud.google.com/pubsub/lite/docs/samples/pubsublite-publish-batch#pubsublite_publish_batch-java.|https://cloud.google.com/pubsub/lite/docs/samples/pubsublite-publish-batch#pubsublite_publish_batch-java]
 Only after running tests, I was able to identify that google pubsub component 
loops over messages and does not use batching offered by pubsub.

In that process, I ended up testing different aggregation stretegies to 
identify which one works.

 
based on analysis I have done, 
org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy works 
fine (component has support for it).
but it does not support following two very well (one fails and one sends 
serialized java object).
org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy
org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy
 
This Jira is specificly about failures occuring in component. 
Alternatively, we will want to update documentation to indicate following;
* PubSub Batching is not supported.
* any reason aggregation is to be used with pubsub, only 
GroupedExchangeAggregationStrategy should be leveraged.
 
Just for context CAMEL-21882 
(https://issues.apache.org/jira/browse/CAMEL-21882) is to create new exchange 
once x number of bytes are collected in aggregation to help ensure that 
aggregated exchanges are not larger than what can be handled by components 
downstream.

> Use of GroupedMessageAggregationStrategy causes issue with 
> camel-google-pubsub component serialization 
> -------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-21929
>                 URL: https://issues.apache.org/jira/browse/CAMEL-21929
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-google-pubsub
>            Reporter: Chirag Sanghavi
>            Priority: Minor
>         Attachments: GooglePubsubProducer.java
>
>
> if a route uses aggegation with GroupedMessageAggregationStrategy, it results 
> into an Exchange/Body containing a list of DefaultMessage objects. 
> DefaultMessage is not a serializable object causing exceptions.
>  
> {code:java}
> Stacktrace
> ---------------------------------------------------------------------------------------------------------------------------------------
> java.io.NotSerializableException: org.apache.camel.support.DefaultMessage
>         at 
> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1200)
>  ~[?:?]
>         at 
> java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:358) 
> ~[?:?]
>         at java.base/java.util.ArrayList.writeObject(ArrayList.java:948) 
> ~[?:?]
>         at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
>  ~[?:?]
>         at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[?:?]
>         at 
> java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1072)
>  ~[?:?]
>         at 
> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1529)
>  ~[?:?]
>         at 
> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1451)
>  ~[?:?]
>         at 
> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1194)
>  ~[?:?]
>         at 
> java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:358) 
> ~[?:?]
>         at 
> org.apache.camel.component.google.pubsub.serializer.DefaultGooglePubsubSerializer.serialize(DefaultGooglePubsubSerializer.java:32)
>  ~[camel-google-pubsub-4.10.3.jar:4.10.3]
>         at 
> org.apache.camel.component.google.pubsub.GooglePubsubProducer.send(GooglePubsubProducer.java:97)
>  ~[camel-google-pubsub-4.10.3.jar:4.10.3]
>         at 
> org.apache.camel.component.google.pubsub.GooglePubsubProducer.process(GooglePubsubProducer.java:75)
>  ~[camel-google-pubsub-4.10.3.jar:4.10.3]
>         at 
> org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:65)
>  {code}
> The fix requires update to 
> GooglePubsubProducer in process function as well as create an overloaded send 
> function or switching Send function to object and using branch with 
> "instanceof" to run different logic (defaultMessage.getBody()  in place of 
> exchange.getIn().getBody().



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to