Hi Pathy,

Pls check if following SQL fits ur need,


CREATE TABLE event1 (id BIGINT, customer_id STRING, amount INT, ts TIMESTAMP
);

CREATE VIEW event2 AS
SELECT *
FROM event1;

CREATE VIEW event3 AS
SELECT *
FROM event1;

CREATE VIEW temp1 AS
SELECT id AS event2.id,
"Total" AS Total,
SUM(event3.amount) AS total_amount
FROM event2
LEFT JOIN
event3
ON event2.ts >= event3.ts
GROUP BY
event2.id;

CREATE VIEW temp2 AS
SELECT MAX(event2.id) as id,
event2.customer_id AS customer_id,
SUM(event3.amount) AS client_total_amount
FROM event2
LEFT JOIN
event3
ON event2.ts >= event3.ts
AND event2.customer_id = event3.customer_id
GROUP BY
event2.customer_id;

Regards,
Xiangyu

Jiten Pathy <jite...@netxd.com> 于2023年8月23日周三 16:38写道:

> Hi Xiangyu,
> Yes, that's correct. It is the requestId, we will have for each request.
>
> On Wed, 23 Aug 2023 at 13:47, xiangyu feng <xiangyu...@gmail.com> wrote:
>
>> Hi Pathy,
>>
>> I want to know if the 'id' in {id, customerId, amount, timestamp} stands
>> for 'requestId'? If not,  how is this 'id' field generated and can we
>> add 'requestId' field in the event?
>>
>> Thx,
>> Xiangyu
>>
>> Jiten Pathy <jite...@netxd.com> 于2023年8月22日周二 14:04写道:
>>
>>> Hi,
>>> We are currently evaluating Flink for our analytics engine. We would
>>> appreciate any help with our experiment in using flink for real-time
>>> request-response use-case.
>>>
>>> To demonstrate the current use-case: our application produces events of
>>> the following form:
>>>
>>> {id, customerId, amount, timestamp}
>>>
>>> We calculate some continuous aggregates triggered by each event produced
>>> and use them to decide on the action.
>>>
>>> Examples of Aggregates: sum of amount total, amount group by customerId,
>>> amount per day(group-by customer), per month etc.
>>>
>>> One approach we considered is to correlate the aggregates with the `Id`,
>>> So for the following input events:
>>>
>>> {1, "CUST1", 100, $TS1}
>>> {2, "CUST2", 5, $TS2}
>>> {3, "CUST1", 15, $TS3}
>>>
>>> We would generate the following(ignoring timestamp for now) into kafka:
>>>
>>> {1, "TOTAL", 100} , {1, "GROUPBYCUSTTOTAL", "CUST1", 100}
>>> {2, "TOTAL", 105} , {2, "GROUPBYCUSTTOTAL", "CUST2", 5}
>>> {3, "TOTAL", 120} , {3, "GROUPBYCUSTTOTAL", "CUST1", 115}
>>>
>>> And our application would read from kafka and process them.
>>>
>>> So the flow looks like:
>>>
>>> Application -- kafka---> flink --> kafka <--- Application
>>>
>>> We want to keep our current request - response model i.e. we need all
>>> continuous aggregates out for every ingested event into flink, before
>>> we can further process the said event.
>>>
>>> Unfortunately we don't see a way to do this in flink-SQL: As the
>>> aggregates would not have the requestId for us to correlate with e.g.
>>> for the following simple continuous query:
>>> SELECT sum(amount) from EVENTS
>>>
>>> We have tried doing this with flink-Datastream API: KeyedProcessFunction
>>>  with MapState per window, and collecting in processElement and using
>>> Kafka sink.
>>>
>>> A sample code for the windowing would look like the following:
>>>
>>>  public void processElement(Transaction transaction, 
>>> KeyedProcessFunction<String, Transaction, Aggregate>.Context context, 
>>> Collector<Aggregate> collector) throws Exception {
>>>         (....)
>>>         collector.collect(new Aggregate(transaction.getId(), 
>>> context.getCurrentKey(), agg0, evTime));
>>>     }
>>>
>>> If we were to use FlinkSQL instead, how would we accomplish this
>>> functionality?
>>>
>>> If there are any alternative approaches to accomplish this while
>>> maintaining our invariant: every event must produce all aggregates that
>>> consume the corresponding event, we would love to hear from the community.
>>>
>>> Regards,
>>>
>>> Jiten
>>>
>>> *The information contained in this transmission (including any
>>> attachments) is confidential and may be privileged. It is intended only for
>>> the use of the individual or entity named above. If you are not the
>>> intended recipient; dissemination, distribution, or copy of this
>>> communication is strictly prohibited. If you have received this
>>> communication in error, please erase all copies of this message and its
>>> attachments and notify me immediately.*
>>>
>>
> *The information contained in this transmission (including any
> attachments) is confidential and may be privileged. It is intended only for
> the use of the individual or entity named above. If you are not the
> intended recipient; dissemination, distribution, or copy of this
> communication is strictly prohibited. If you have received this
> communication in error, please erase all copies of this message and its
> attachments and notify me immediately.*
>

Reply via email to