This is a pretty hard problem. I would be inclined to try Queryable State (
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/queryable_state/)
first.

-0xe1a

On Mon, Aug 21, 2023 at 11:04 PM Jiten Pathy <jite...@netxd.com> wrote:

> Hi,
> We are currently evaluating Flink for our analytics engine. We would
> appreciate any help with our experiment in using flink for real-time
> request-response use-case.
>
> To demonstrate the current use-case: our application produces events of
> the following form:
>
> {id, customerId, amount, timestamp}
>
> We calculate some continuous aggregates triggered by each event produced
> and use them to decide on the action.
>
> Examples of Aggregates: sum of amount total, amount group by customerId,
> amount per day(group-by customer), per month etc.
>
> One approach we considered is to correlate the aggregates with the `Id`,
> So for the following input events:
>
> {1, "CUST1", 100, $TS1}
> {2, "CUST2", 5, $TS2}
> {3, "CUST1", 15, $TS3}
>
> We would generate the following(ignoring timestamp for now) into kafka:
>
> {1, "TOTAL", 100} , {1, "GROUPBYCUSTTOTAL", "CUST1", 100}
> {2, "TOTAL", 105} , {2, "GROUPBYCUSTTOTAL", "CUST2", 5}
> {3, "TOTAL", 120} , {3, "GROUPBYCUSTTOTAL", "CUST1", 115}
>
> And our application would read from kafka and process them.
>
> So the flow looks like:
>
> Application -- kafka---> flink --> kafka <--- Application
>
> We want to keep our current request - response model i.e. we need all
> continuous aggregates out for every ingested event into flink, before we
> can further process the said event.
>
> Unfortunately we don't see a way to do this in flink-SQL: As the
> aggregates would not have the requestId for us to correlate with e.g. for
> the following simple continuous query:
> SELECT sum(amount) from EVENTS
>
> We have tried doing this with flink-Datastream API: KeyedProcessFunction
>  with MapState per window, and collecting in processElement and using
> Kafka sink.
>
> A sample code for the windowing would look like the following:
>
>  public void processElement(Transaction transaction, 
> KeyedProcessFunction<String, Transaction, Aggregate>.Context context, 
> Collector<Aggregate> collector) throws Exception {
>         (....)
>         collector.collect(new Aggregate(transaction.getId(), 
> context.getCurrentKey(), agg0, evTime));
>     }
>
> If we were to use FlinkSQL instead, how would we accomplish this
> functionality?
>
> If there are any alternative approaches to accomplish this while
> maintaining our invariant: every event must produce all aggregates that
> consume the corresponding event, we would love to hear from the community.
>
> Regards,
>
> Jiten
>
> *The information contained in this transmission (including any
> attachments) is confidential and may be privileged. It is intended only for
> the use of the individual or entity named above. If you are not the
> intended recipient; dissemination, distribution, or copy of this
> communication is strictly prohibited. If you have received this
> communication in error, please erase all copies of this message and its
> attachments and notify me immediately.*
>

Reply via email to