A .connect() method may work.
You can connect the changeLogStream coming from the table with fields
Customer and sum with the original stream consisting of Customer and
EventId.
In a coProcessFunction, you can keep the latest EventID of a Customer in
the state and whenever you receive data from table you can merge them.


On Thu, Aug 24, 2023 at 4:10 PM Jiten Pathy <jite...@netxd.com> wrote:

> Most of the aggregates can be added/removed/updated dynamically, it would
> be easier from an implementation point of view, if we could use SQL.
>
> On Thu, 24 Aug 2023 at 16:09, Kenan Kılıçtepe <kkilict...@gmail.com>
> wrote:
>
>> I think Datastream fits your requirements better. Is there a specific
>> reason for using SQL ?
>>
>>
>> On Tue, Aug 22, 2023 at 9:04 AM Jiten Pathy <jite...@netxd.com> wrote:
>>
>>> Hi,
>>> We are currently evaluating Flink for our analytics engine. We would
>>> appreciate any help with our experiment in using flink for real-time
>>> request-response use-case.
>>>
>>> To demonstrate the current use-case: our application produces events of
>>> the following form:
>>>
>>> {id, customerId, amount, timestamp}
>>>
>>> We calculate some continuous aggregates triggered by each event produced
>>> and use them to decide on the action.
>>>
>>> Examples of Aggregates: sum of amount total, amount group by customerId,
>>> amount per day(group-by customer), per month etc.
>>>
>>> One approach we considered is to correlate the aggregates with the `Id`,
>>> So for the following input events:
>>>
>>> {1, "CUST1", 100, $TS1}
>>> {2, "CUST2", 5, $TS2}
>>> {3, "CUST1", 15, $TS3}
>>>
>>> We would generate the following(ignoring timestamp for now) into kafka:
>>>
>>> {1, "TOTAL", 100} , {1, "GROUPBYCUSTTOTAL", "CUST1", 100}
>>> {2, "TOTAL", 105} , {2, "GROUPBYCUSTTOTAL", "CUST2", 5}
>>> {3, "TOTAL", 120} , {3, "GROUPBYCUSTTOTAL", "CUST1", 115}
>>>
>>> And our application would read from kafka and process them.
>>>
>>> So the flow looks like:
>>>
>>> Application -- kafka---> flink --> kafka <--- Application
>>>
>>> We want to keep our current request - response model i.e. we need all
>>> continuous aggregates out for every ingested event into flink, before
>>> we can further process the said event.
>>>
>>> Unfortunately we don't see a way to do this in flink-SQL: As the
>>> aggregates would not have the requestId for us to correlate with e.g.
>>> for the following simple continuous query:
>>> SELECT sum(amount) from EVENTS
>>>
>>> We have tried doing this with flink-Datastream API: KeyedProcessFunction
>>>  with MapState per window, and collecting in processElement and using
>>> Kafka sink.
>>>
>>> A sample code for the windowing would look like the following:
>>>
>>>  public void processElement(Transaction transaction, 
>>> KeyedProcessFunction<String, Transaction, Aggregate>.Context context, 
>>> Collector<Aggregate> collector) throws Exception {
>>>         (....)
>>>         collector.collect(new Aggregate(transaction.getId(), 
>>> context.getCurrentKey(), agg0, evTime));
>>>     }
>>>
>>> If we were to use FlinkSQL instead, how would we accomplish this
>>> functionality?
>>>
>>> If there are any alternative approaches to accomplish this while
>>> maintaining our invariant: every event must produce all aggregates that
>>> consume the corresponding event, we would love to hear from the community.
>>>
>>> Regards,
>>>
>>> Jiten
>>>
>>> *The information contained in this transmission (including any
>>> attachments) is confidential and may be privileged. It is intended only for
>>> the use of the individual or entity named above. If you are not the
>>> intended recipient; dissemination, distribution, or copy of this
>>> communication is strictly prohibited. If you have received this
>>> communication in error, please erase all copies of this message and its
>>> attachments and notify me immediately.*
>>>
>>
> *The information contained in this transmission (including any
> attachments) is confidential and may be privileged. It is intended only for
> the use of the individual or entity named above. If you are not the
> intended recipient; dissemination, distribution, or copy of this
> communication is strictly prohibited. If you have received this
> communication in error, please erase all copies of this message and its
> attachments and notify me immediately.*
>

Reply via email to