A .connect() method may work. You can connect the changeLogStream coming from the table with fields Customer and sum with the original stream consisting of Customer and EventId. In a coProcessFunction, you can keep the latest EventID of a Customer in the state and whenever you receive data from table you can merge them.
On Thu, Aug 24, 2023 at 4:10 PM Jiten Pathy <jite...@netxd.com> wrote: > Most of the aggregates can be added/removed/updated dynamically, it would > be easier from an implementation point of view, if we could use SQL. > > On Thu, 24 Aug 2023 at 16:09, Kenan Kılıçtepe <kkilict...@gmail.com> > wrote: > >> I think Datastream fits your requirements better. Is there a specific >> reason for using SQL ? >> >> >> On Tue, Aug 22, 2023 at 9:04 AM Jiten Pathy <jite...@netxd.com> wrote: >> >>> Hi, >>> We are currently evaluating Flink for our analytics engine. We would >>> appreciate any help with our experiment in using flink for real-time >>> request-response use-case. >>> >>> To demonstrate the current use-case: our application produces events of >>> the following form: >>> >>> {id, customerId, amount, timestamp} >>> >>> We calculate some continuous aggregates triggered by each event produced >>> and use them to decide on the action. >>> >>> Examples of Aggregates: sum of amount total, amount group by customerId, >>> amount per day(group-by customer), per month etc. >>> >>> One approach we considered is to correlate the aggregates with the `Id`, >>> So for the following input events: >>> >>> {1, "CUST1", 100, $TS1} >>> {2, "CUST2", 5, $TS2} >>> {3, "CUST1", 15, $TS3} >>> >>> We would generate the following(ignoring timestamp for now) into kafka: >>> >>> {1, "TOTAL", 100} , {1, "GROUPBYCUSTTOTAL", "CUST1", 100} >>> {2, "TOTAL", 105} , {2, "GROUPBYCUSTTOTAL", "CUST2", 5} >>> {3, "TOTAL", 120} , {3, "GROUPBYCUSTTOTAL", "CUST1", 115} >>> >>> And our application would read from kafka and process them. >>> >>> So the flow looks like: >>> >>> Application -- kafka---> flink --> kafka <--- Application >>> >>> We want to keep our current request - response model i.e. we need all >>> continuous aggregates out for every ingested event into flink, before >>> we can further process the said event. >>> >>> Unfortunately we don't see a way to do this in flink-SQL: As the >>> aggregates would not have the requestId for us to correlate with e.g. >>> for the following simple continuous query: >>> SELECT sum(amount) from EVENTS >>> >>> We have tried doing this with flink-Datastream API: KeyedProcessFunction >>> with MapState per window, and collecting in processElement and using >>> Kafka sink. >>> >>> A sample code for the windowing would look like the following: >>> >>> public void processElement(Transaction transaction, >>> KeyedProcessFunction<String, Transaction, Aggregate>.Context context, >>> Collector<Aggregate> collector) throws Exception { >>> (....) >>> collector.collect(new Aggregate(transaction.getId(), >>> context.getCurrentKey(), agg0, evTime)); >>> } >>> >>> If we were to use FlinkSQL instead, how would we accomplish this >>> functionality? >>> >>> If there are any alternative approaches to accomplish this while >>> maintaining our invariant: every event must produce all aggregates that >>> consume the corresponding event, we would love to hear from the community. >>> >>> Regards, >>> >>> Jiten >>> >>> *The information contained in this transmission (including any >>> attachments) is confidential and may be privileged. It is intended only for >>> the use of the individual or entity named above. If you are not the >>> intended recipient; dissemination, distribution, or copy of this >>> communication is strictly prohibited. If you have received this >>> communication in error, please erase all copies of this message and its >>> attachments and notify me immediately.* >>> >> > *The information contained in this transmission (including any > attachments) is confidential and may be privileged. It is intended only for > the use of the individual or entity named above. If you are not the > intended recipient; dissemination, distribution, or copy of this > communication is strictly prohibited. If you have received this > communication in error, please erase all copies of this message and its > attachments and notify me immediately.* >