Hi Pathy, Pls check if following SQL fits ur need,
CREATE TABLE event1 (id BIGINT, customer_id STRING, amount INT, ts TIMESTAMP ); CREATE VIEW event2 AS SELECT * FROM event1; CREATE VIEW event3 AS SELECT * FROM event1; CREATE VIEW temp1 AS SELECT id AS event2.id, "Total" AS Total, SUM(event3.amount) AS total_amount FROM event2 LEFT JOIN event3 ON event2.ts >= event3.ts GROUP BY event2.id; CREATE VIEW temp2 AS SELECT MAX(event2.id) as id, event2.customer_id AS customer_id, SUM(event3.amount) AS client_total_amount FROM event2 LEFT JOIN event3 ON event2.ts >= event3.ts AND event2.customer_id = event3.customer_id GROUP BY event2.customer_id; Regards, Xiangyu Jiten Pathy <jite...@netxd.com> 于2023年8月23日周三 16:38写道: > Hi Xiangyu, > Yes, that's correct. It is the requestId, we will have for each request. > > On Wed, 23 Aug 2023 at 13:47, xiangyu feng <xiangyu...@gmail.com> wrote: > >> Hi Pathy, >> >> I want to know if the 'id' in {id, customerId, amount, timestamp} stands >> for 'requestId'? If not, how is this 'id' field generated and can we >> add 'requestId' field in the event? >> >> Thx, >> Xiangyu >> >> Jiten Pathy <jite...@netxd.com> 于2023年8月22日周二 14:04写道: >> >>> Hi, >>> We are currently evaluating Flink for our analytics engine. We would >>> appreciate any help with our experiment in using flink for real-time >>> request-response use-case. >>> >>> To demonstrate the current use-case: our application produces events of >>> the following form: >>> >>> {id, customerId, amount, timestamp} >>> >>> We calculate some continuous aggregates triggered by each event produced >>> and use them to decide on the action. >>> >>> Examples of Aggregates: sum of amount total, amount group by customerId, >>> amount per day(group-by customer), per month etc. >>> >>> One approach we considered is to correlate the aggregates with the `Id`, >>> So for the following input events: >>> >>> {1, "CUST1", 100, $TS1} >>> {2, "CUST2", 5, $TS2} >>> {3, "CUST1", 15, $TS3} >>> >>> We would generate the following(ignoring timestamp for now) into kafka: >>> >>> {1, "TOTAL", 100} , {1, "GROUPBYCUSTTOTAL", "CUST1", 100} >>> {2, "TOTAL", 105} , {2, "GROUPBYCUSTTOTAL", "CUST2", 5} >>> {3, "TOTAL", 120} , {3, "GROUPBYCUSTTOTAL", "CUST1", 115} >>> >>> And our application would read from kafka and process them. >>> >>> So the flow looks like: >>> >>> Application -- kafka---> flink --> kafka <--- Application >>> >>> We want to keep our current request - response model i.e. we need all >>> continuous aggregates out for every ingested event into flink, before >>> we can further process the said event. >>> >>> Unfortunately we don't see a way to do this in flink-SQL: As the >>> aggregates would not have the requestId for us to correlate with e.g. >>> for the following simple continuous query: >>> SELECT sum(amount) from EVENTS >>> >>> We have tried doing this with flink-Datastream API: KeyedProcessFunction >>> with MapState per window, and collecting in processElement and using >>> Kafka sink. >>> >>> A sample code for the windowing would look like the following: >>> >>> public void processElement(Transaction transaction, >>> KeyedProcessFunction<String, Transaction, Aggregate>.Context context, >>> Collector<Aggregate> collector) throws Exception { >>> (....) >>> collector.collect(new Aggregate(transaction.getId(), >>> context.getCurrentKey(), agg0, evTime)); >>> } >>> >>> If we were to use FlinkSQL instead, how would we accomplish this >>> functionality? >>> >>> If there are any alternative approaches to accomplish this while >>> maintaining our invariant: every event must produce all aggregates that >>> consume the corresponding event, we would love to hear from the community. >>> >>> Regards, >>> >>> Jiten >>> >>> *The information contained in this transmission (including any >>> attachments) is confidential and may be privileged. It is intended only for >>> the use of the individual or entity named above. If you are not the >>> intended recipient; dissemination, distribution, or copy of this >>> communication is strictly prohibited. If you have received this >>> communication in error, please erase all copies of this message and its >>> attachments and notify me immediately.* >>> >> > *The information contained in this transmission (including any > attachments) is confidential and may be privileged. It is intended only for > the use of the individual or entity named above. If you are not the > intended recipient; dissemination, distribution, or copy of this > communication is strictly prohibited. If you have received this > communication in error, please erase all copies of this message and its > attachments and notify me immediately.* >