Hi Xiagyu, Thank you for providing the SQL query, i will try it out: but i don't think MAX() will work for our use-case as the event ids may not be ordered. when they reach flink.
On Wed, 23 Aug 2023 at 15:23, xiangyu feng <xiangyu...@gmail.com> wrote: > Hi Pathy, > > Pls check if following SQL fits ur need, > > > CREATE TABLE event1 (id BIGINT, customer_id STRING, amount INT, ts > TIMESTAMP); > > CREATE VIEW event2 AS > SELECT * > FROM event1; > > CREATE VIEW event3 AS > SELECT * > FROM event1; > > CREATE VIEW temp1 AS > SELECT id AS event2.id, > "Total" AS Total, > SUM(event3.amount) AS total_amount > FROM event2 > LEFT JOIN > event3 > ON event2.ts >= event3.ts > GROUP BY > event2.id; > > CREATE VIEW temp2 AS > SELECT MAX(event2.id) as id, > event2.customer_id AS customer_id, > SUM(event3.amount) AS client_total_amount > FROM event2 > LEFT JOIN > event3 > ON event2.ts >= event3.ts > AND event2.customer_id = event3.customer_id > GROUP BY > event2.customer_id; > > Regards, > Xiangyu > > Jiten Pathy <jite...@netxd.com> 于2023年8月23日周三 16:38写道: > >> Hi Xiangyu, >> Yes, that's correct. It is the requestId, we will have for each request. >> >> On Wed, 23 Aug 2023 at 13:47, xiangyu feng <xiangyu...@gmail.com> wrote: >> >>> Hi Pathy, >>> >>> I want to know if the 'id' in {id, customerId, amount, timestamp} >>> stands for 'requestId'? If not, how is this 'id' field generated and >>> can we add 'requestId' field in the event? >>> >>> Thx, >>> Xiangyu >>> >>> Jiten Pathy <jite...@netxd.com> 于2023年8月22日周二 14:04写道: >>> >>>> Hi, >>>> We are currently evaluating Flink for our analytics engine. We would >>>> appreciate any help with our experiment in using flink for real-time >>>> request-response use-case. >>>> >>>> To demonstrate the current use-case: our application produces events of >>>> the following form: >>>> >>>> {id, customerId, amount, timestamp} >>>> >>>> We calculate some continuous aggregates triggered by each event >>>> produced and use them to decide on the action. >>>> >>>> Examples of Aggregates: sum of amount total, amount group by >>>> customerId, amount per day(group-by customer), per month etc. >>>> >>>> One approach we considered is to correlate the aggregates with the `Id`, >>>> So for the following input events: >>>> >>>> {1, "CUST1", 100, $TS1} >>>> {2, "CUST2", 5, $TS2} >>>> {3, "CUST1", 15, $TS3} >>>> >>>> We would generate the following(ignoring timestamp for now) into kafka: >>>> >>>> {1, "TOTAL", 100} , {1, "GROUPBYCUSTTOTAL", "CUST1", 100} >>>> {2, "TOTAL", 105} , {2, "GROUPBYCUSTTOTAL", "CUST2", 5} >>>> {3, "TOTAL", 120} , {3, "GROUPBYCUSTTOTAL", "CUST1", 115} >>>> >>>> And our application would read from kafka and process them. >>>> >>>> So the flow looks like: >>>> >>>> Application -- kafka---> flink --> kafka <--- Application >>>> >>>> We want to keep our current request - response model i.e. we need all >>>> continuous aggregates out for every ingested event into flink, before >>>> we can further process the said event. >>>> >>>> Unfortunately we don't see a way to do this in flink-SQL: As the >>>> aggregates would not have the requestId for us to correlate with e.g. >>>> for the following simple continuous query: >>>> SELECT sum(amount) from EVENTS >>>> >>>> We have tried doing this with flink-Datastream API: >>>> KeyedProcessFunction with MapState per window, and collecting in >>>> processElement and using Kafka sink. >>>> >>>> A sample code for the windowing would look like the following: >>>> >>>> public void processElement(Transaction transaction, >>>> KeyedProcessFunction<String, Transaction, Aggregate>.Context context, >>>> Collector<Aggregate> collector) throws Exception { >>>> (....) >>>> collector.collect(new Aggregate(transaction.getId(), >>>> context.getCurrentKey(), agg0, evTime)); >>>> } >>>> >>>> If we were to use FlinkSQL instead, how would we accomplish this >>>> functionality? >>>> >>>> If there are any alternative approaches to accomplish this while >>>> maintaining our invariant: every event must produce all aggregates that >>>> consume the corresponding event, we would love to hear from the community. >>>> >>>> Regards, >>>> >>>> Jiten >>>> >>>> *The information contained in this transmission (including any >>>> attachments) is confidential and may be privileged. It is intended only for >>>> the use of the individual or entity named above. If you are not the >>>> intended recipient; dissemination, distribution, or copy of this >>>> communication is strictly prohibited. If you have received this >>>> communication in error, please erase all copies of this message and its >>>> attachments and notify me immediately.* >>>> >>> >> *The information contained in this transmission (including any >> attachments) is confidential and may be privileged. It is intended only for >> the use of the individual or entity named above. If you are not the >> intended recipient; dissemination, distribution, or copy of this >> communication is strictly prohibited. If you have received this >> communication in error, please erase all copies of this message and its >> attachments and notify me immediately.* >> > -- *The information contained in this transmission (including any attachments) is confidential and may be privileged. It is intended only for the use of the individual or entity named above. If you are not the intended recipient; dissemination, distribution, or copy of this communication is strictly prohibited. If you have received this communication in error, please erase all copies of this message and its attachments and notify me immediately.*