Hi Bill/Matthias, Thanks for the replies.
The issue is I never see a result, I have a log that shows the message coming in, but the adder/subtractor is never invoked for it even though it should. So no result gets published to the intermediate topic I have. Thanks, Tomer On Thu, Oct 28, 2021 at 11:57 AM Bill Bejeck <b...@confluent.io.invalid> wrote: > Tomer, > > As Matthias pointed out for a single, final result you need to use the > `suppress()` operator. > > But back to your original question, > > they are processed by the adder/subtractor and are not > > materialized in the intermediate topics which causes them not to be > > outputted in the final topic > > > > Is the issue you never see a result or were you curious about the > intermediate calculations? > HTH, > Bill > > On Thu, Oct 28, 2021 at 1:05 AM Matthias J. Sax <mj...@apache.org> wrote: > > > For this case, you can call `aggregate(...).suppress()`. > > > > -Matthias > > > > On 10/27/21 12:42 PM, Tomer Cohen wrote: > > > Hi Bill, > > > > > > Thanks for the prompt reply. > > > > > > Setting to 0 forces a no collection window, so if I get 10 messages to > > > aggregate for example, it will send 10 updates. But I only want to > > publish > > > the final state only. > > > > > > Thanks, > > > > > > Tomer > > > > > > On Wed, Oct 27, 2021 at 2:10 PM Bill Bejeck <b...@confluent.io.invalid > > > > > wrote: > > > > > >> Hi Tomer, > > >> > > >> From the description you've provided, it sounds to me like you have a > > >> stateful operation. > > >> > > >> The thing to keep in mind with stateful operations in Kafka Streams is > > that > > >> every result is not written to the changelog and forwarded downstream. > > >> Kafka Streams uses a cache for stateful operations and it's only on > > cache > > >> flush either when it's full or when Kafka Streams commits (every 30 > > seconds > > >> by default) that Kafka Streams writes the results of the stateful > > >> operations to the changelog and forwards the records downstream to > other > > >> processors. > > >> > > >> If you want every Kafka Streams to forward every record you'll need to > > set > > >> the `StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG` to 0. > > >> > > >> If I haven't understood your experience accurately can you provide a > few > > >> more details? > > >> > > >> Thanks, > > >> Bill > > >> > > >> > > >> On Wed, Oct 27, 2021 at 9:48 AM Tomer Cohen <ilan...@gmail.com> > wrote: > > >> > > >>> Hello Kafka team, > > >>> > > >>> I am seeing an odd behavior when using kafka streams. > > >>> > > >>> During periods of heavier volumes, there are messages coming in. > > However, > > >>> they do look like they are processed by the adder/subtractor and are > > not > > >>> materialized in the intermediate topics which causes them not to be > > >>> outputted in the final topic. > > >>> > > >>> Is there any way to debug this or log out when a message is dropped > in > > >> the > > >>> stream and not processed for whatever reason? > > >>> > > >>> Thanks, > > >>> > > >>> Tomer > > >>> > > >> > > > > > >