Hi,
We are currently evaluating Flink for our analytics engine. We would
appreciate any help with our experiment in using flink for real-time
request-response use-case.

To demonstrate the current use-case: our application produces events of the
following form:

{id, customerId, amount, timestamp}

We calculate some continuous aggregates triggered by each event produced
and use them to decide on the action.

Examples of Aggregates: sum of amount total, amount group by customerId,
amount per day(group-by customer), per month etc.

One approach we considered is to correlate the aggregates with the `Id`, So
for the following input events:

{1, "CUST1", 100, $TS1}
{2, "CUST2", 5, $TS2}
{3, "CUST1", 15, $TS3}

We would generate the following(ignoring timestamp for now) into kafka:

{1, "TOTAL", 100} , {1, "GROUPBYCUSTTOTAL", "CUST1", 100}
{2, "TOTAL", 105} , {2, "GROUPBYCUSTTOTAL", "CUST2", 5}
{3, "TOTAL", 120} , {3, "GROUPBYCUSTTOTAL", "CUST1", 115}

And our application would read from kafka and process them.

So the flow looks like:

Application -- kafka---> flink --> kafka <--- Application

We want to keep our current request - response model i.e. we need all
continuous aggregates out for every ingested event into flink, before we
can further process the said event.

Unfortunately we don't see a way to do this in flink-SQL: As the aggregates
would not have the requestId for us to correlate with e.g. for the
following simple continuous query:
SELECT sum(amount) from EVENTS

We have tried doing this with flink-Datastream API: KeyedProcessFunction
 with MapState per window, and collecting in processElement and using Kafka
sink.

A sample code for the windowing would look like the following:

 public void processElement(Transaction transaction,
KeyedProcessFunction<String, Transaction, Aggregate>.Context context,
Collector<Aggregate> collector) throws Exception {
        (....)
        collector.collect(new Aggregate(transaction.getId(),
context.getCurrentKey(), agg0, evTime));
    }

If we were to use FlinkSQL instead, how would we accomplish this
functionality?

If there are any alternative approaches to accomplish this while
maintaining our invariant: every event must produce all aggregates that
consume the corresponding event, we would love to hear from the community.

Regards,

Jiten

-- 
*The information contained in this transmission (including any attachments) 
is confidential and may be privileged. It is intended only for the use of 
the individual or entity named above. If you are not the intended 
recipient; dissemination, distribution, or copy of this communication is 
strictly prohibited. If you have received this communication in error, 
please erase all copies of this message and its attachments and notify me 
immediately.*

Reply via email to