This is a pretty hard problem. I would be inclined to try Queryable State ( https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/queryable_state/) first.
-0xe1a On Mon, Aug 21, 2023 at 11:04 PM Jiten Pathy <jite...@netxd.com> wrote: > Hi, > We are currently evaluating Flink for our analytics engine. We would > appreciate any help with our experiment in using flink for real-time > request-response use-case. > > To demonstrate the current use-case: our application produces events of > the following form: > > {id, customerId, amount, timestamp} > > We calculate some continuous aggregates triggered by each event produced > and use them to decide on the action. > > Examples of Aggregates: sum of amount total, amount group by customerId, > amount per day(group-by customer), per month etc. > > One approach we considered is to correlate the aggregates with the `Id`, > So for the following input events: > > {1, "CUST1", 100, $TS1} > {2, "CUST2", 5, $TS2} > {3, "CUST1", 15, $TS3} > > We would generate the following(ignoring timestamp for now) into kafka: > > {1, "TOTAL", 100} , {1, "GROUPBYCUSTTOTAL", "CUST1", 100} > {2, "TOTAL", 105} , {2, "GROUPBYCUSTTOTAL", "CUST2", 5} > {3, "TOTAL", 120} , {3, "GROUPBYCUSTTOTAL", "CUST1", 115} > > And our application would read from kafka and process them. > > So the flow looks like: > > Application -- kafka---> flink --> kafka <--- Application > > We want to keep our current request - response model i.e. we need all > continuous aggregates out for every ingested event into flink, before we > can further process the said event. > > Unfortunately we don't see a way to do this in flink-SQL: As the > aggregates would not have the requestId for us to correlate with e.g. for > the following simple continuous query: > SELECT sum(amount) from EVENTS > > We have tried doing this with flink-Datastream API: KeyedProcessFunction > with MapState per window, and collecting in processElement and using > Kafka sink. > > A sample code for the windowing would look like the following: > > public void processElement(Transaction transaction, > KeyedProcessFunction<String, Transaction, Aggregate>.Context context, > Collector<Aggregate> collector) throws Exception { > (....) > collector.collect(new Aggregate(transaction.getId(), > context.getCurrentKey(), agg0, evTime)); > } > > If we were to use FlinkSQL instead, how would we accomplish this > functionality? > > If there are any alternative approaches to accomplish this while > maintaining our invariant: every event must produce all aggregates that > consume the corresponding event, we would love to hear from the community. > > Regards, > > Jiten > > *The information contained in this transmission (including any > attachments) is confidential and may be privileged. It is intended only for > the use of the individual or entity named above. If you are not the > intended recipient; dissemination, distribution, or copy of this > communication is strictly prohibited. If you have received this > communication in error, please erase all copies of this message and its > attachments and notify me immediately.* >