Hi Alex,

Thanks for the reply!

I have 3 Flink SQL table which is joined using temporal join.
I am using the timestamp field in the first table as the event time attribute.

eg:

SELECT * FROM
TABLE1  T1
LEFT JOIN TABLE2 FOR SYSTEM_TIME AS OF T1.timestamp  as T2          -- Temporal 
join
ON *JOIN CONDITION*
LEFT JOIN TABLE3 FOR SYSTEM_TIME AS OF T1.timestamp  AS T3         -- Temporal 
join
ON  *JOIN CONDITION*

Note:
We are not using the session window/ process function for this use case.

Thanks,
Yad
________________________________
From: Alex Cruise <a...@cluonflux.com>
Sent: Friday, December 15, 2023 2:08 AM
To: T, Yadhunath <yadhunat...@boarshead.com>
Cc: user@flink.apache.org <user@flink.apache.org>; Shames, Joy 
<joy.sha...@boarshead.com>; Maj, Marek <marek....@boarshead.com>; Kornegay, 
Robert <robert.korne...@boarshead.com>; Michael, Dennis 
<dennis.mich...@boarshead.com>; Kerelli, Sharath <sharath.kere...@boarshead.com>
Subject: Re: Event stuck in the Flink operator

Can you share your precise join semantics? I don't know about Flink SQL 
offhand, but here are a couple ways to do this when you're using the DataStream 
API: * use the Session Window join type, which automatically closes a window 
after
ZjQcmQRYFpfptBannerStart
This Message Is From an Untrusted Sender
"Caution" You have not previously corresponded with this sender. If you do not 
recognize this send, verify their identity offline. If the message appears 
suspicious, please click the "Report Phish" button in your Outlook client.

ZjQcmQRYFpfptBannerEnd
Can you share your precise join semantics?

I don't know about Flink SQL offhand, but here are a couple ways to do this 
when you're using the DataStream API:

* use the Session Window 
join<https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/joining/*session-window-join__;Iw!!KtM2tloZCg!v4NwvO28vhpzMxtkBn1_q0YEoTChPz_XKbzWwLEVVQKVwjGegR-rwFlsJ6md14HsOiK5BkmrAb7FRX7AinU$>
 type, which automatically closes a window after a configurable delay since the 
last record
* if you're using a ProcessFunction (very low-level), you can set a 
timer<https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/process_function/*timers__;Iw!!KtM2tloZCg!v4NwvO28vhpzMxtkBn1_q0YEoTChPz_XKbzWwLEVVQKVwjGegR-rwFlsJ6md14HsOiK5BkmrAb7Fc7_URkc$>
 if you need to react to the non-arrival of data. Just make sure you cancel it 
when data finally does arrive. :)

-0xe1a

On Thu, Dec 14, 2023 at 6:36 AM T, Yadhunath 
<yadhunat...@boarshead.com<mailto:yadhunat...@boarshead.com>> wrote:

Hi,

I am using Flink version 1.16 and I have a streaming job that uses PyFlinkSQL 
API.
Whenever a new streaming event comes in it is not getting processed in the last 
Flink operator  ( it performs temporal join along with writing data into Kafka 
topic) and it will be only pushed to Kafka on the arrival of the next streaming 
event. It is like the last operator needs an event to process the previous 
event. Did anyone experience a similar issue?
Really appreciate if someone could advise a solution for this.
Please let me know if you require more input.

Thanks,
Yad

Reply via email to