Hi Xiangyu, Yes, that's correct. It is the requestId, we will have for each request.
On Wed, 23 Aug 2023 at 13:47, xiangyu feng <xiangyu...@gmail.com> wrote: > Hi Pathy, > > I want to know if the 'id' in {id, customerId, amount, timestamp} stands > for 'requestId'? If not, how is this 'id' field generated and can we add > 'requestId' field in the event? > > Thx, > Xiangyu > > Jiten Pathy <jite...@netxd.com> 于2023年8月22日周二 14:04写道: > >> Hi, >> We are currently evaluating Flink for our analytics engine. We would >> appreciate any help with our experiment in using flink for real-time >> request-response use-case. >> >> To demonstrate the current use-case: our application produces events of >> the following form: >> >> {id, customerId, amount, timestamp} >> >> We calculate some continuous aggregates triggered by each event produced >> and use them to decide on the action. >> >> Examples of Aggregates: sum of amount total, amount group by customerId, >> amount per day(group-by customer), per month etc. >> >> One approach we considered is to correlate the aggregates with the `Id`, >> So for the following input events: >> >> {1, "CUST1", 100, $TS1} >> {2, "CUST2", 5, $TS2} >> {3, "CUST1", 15, $TS3} >> >> We would generate the following(ignoring timestamp for now) into kafka: >> >> {1, "TOTAL", 100} , {1, "GROUPBYCUSTTOTAL", "CUST1", 100} >> {2, "TOTAL", 105} , {2, "GROUPBYCUSTTOTAL", "CUST2", 5} >> {3, "TOTAL", 120} , {3, "GROUPBYCUSTTOTAL", "CUST1", 115} >> >> And our application would read from kafka and process them. >> >> So the flow looks like: >> >> Application -- kafka---> flink --> kafka <--- Application >> >> We want to keep our current request - response model i.e. we need all >> continuous aggregates out for every ingested event into flink, before we >> can further process the said event. >> >> Unfortunately we don't see a way to do this in flink-SQL: As the >> aggregates would not have the requestId for us to correlate with e.g. >> for the following simple continuous query: >> SELECT sum(amount) from EVENTS >> >> We have tried doing this with flink-Datastream API: KeyedProcessFunction >> with MapState per window, and collecting in processElement and using >> Kafka sink. >> >> A sample code for the windowing would look like the following: >> >> public void processElement(Transaction transaction, >> KeyedProcessFunction<String, Transaction, Aggregate>.Context context, >> Collector<Aggregate> collector) throws Exception { >> (....) >> collector.collect(new Aggregate(transaction.getId(), >> context.getCurrentKey(), agg0, evTime)); >> } >> >> If we were to use FlinkSQL instead, how would we accomplish this >> functionality? >> >> If there are any alternative approaches to accomplish this while >> maintaining our invariant: every event must produce all aggregates that >> consume the corresponding event, we would love to hear from the community. >> >> Regards, >> >> Jiten >> >> *The information contained in this transmission (including any >> attachments) is confidential and may be privileged. It is intended only for >> the use of the individual or entity named above. If you are not the >> intended recipient; dissemination, distribution, or copy of this >> communication is strictly prohibited. If you have received this >> communication in error, please erase all copies of this message and its >> attachments and notify me immediately.* >> > -- *The information contained in this transmission (including any attachments) is confidential and may be privileged. It is intended only for the use of the individual or entity named above. If you are not the intended recipient; dissemination, distribution, or copy of this communication is strictly prohibited. If you have received this communication in error, please erase all copies of this message and its attachments and notify me immediately.*