[ 
https://issues.apache.org/jira/browse/KAFKA-7971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775007#comment-16775007
 ] 

Maciej Lizewski commented on KAFKA-7971:
----------------------------------------

I know I can use transformer and in fact I do now (kindly please check 
stackoverflow link where I put snippet). The "problem" is that this processing 
only depend on wall clock time and state store. Attaching this transformer to 
some input stream is only needed to introduce component to streams environment 
and in fact all input messages are just dropped. State store is updated in 
different stream that produces different output. I am thinking about creating 
"dummy" topic without any producer just to attach there my transformer. In my 
opinion this is not very nice solution. It would be nice to have something 
like: 

KStream StreamsBuilder::producer(ProducerInterface, local_stores);

the component would be initialized same way transformers and processors are 
with ProcessorContext so it has access to stores, it could even register 
punctuator connected with wall_clock and forward newly created events to 
processing network. It is just a thought, improvement...

> Producer in Streams environment
> -------------------------------
>
>                 Key: KAFKA-7971
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7971
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Maciej Lizewski
>            Priority: Minor
>              Labels: newbie
>
> Would be nice to have Producers that can emit messages to topic just like any 
> producer but also have access to local stores from streams environment in 
> Spring.
> consider case: I have event sourced ordering process like this:
> [EVENTS QUEUE] -> [MERGING PROCESS] -> [ORDERS CHANGELOG/KTABLE]
> Merging process uses local storage "opened orders" to easily apply new 
> changes.
> Now I want to implement process of closing abandoned orders (orders that were 
> started, but for too long there was no change and they hang in beginning 
> status). Easiest way is to periodically scan "opened orders" store and 
> produce "abandon event" for every order that meets criteria. The obnly way 
> now i to create Transformer with punctuator and connect output to [EVENTS 
> QUEUE]. That is obvious. but Transformer must be also connected to some input 
> stream, but these events must be dropped as we want only the punctuator 
> results. This causes unnecessary overhead in processing input messages 
> (although they are just dropped) and it is not very elegant.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to