Hey Hector,

thanks for your reply. Your assumption is entirely correct, I have a few 
Million datasets on the topic already to test a streaming use case. I am 
planning on testing it with a variety of settings, but the problems occur with 
any cluster-configuration. For example Parallelism 1 with 1 Taskmanager and 1 
slot. I plan to scale it up to 10 slots and 10 parallelism for testing 
purposes. 

I do not think that any events are kept on hold, since the output always 
contains windows with the latest timestamp (but not enough of them, it should 
be much more). Nevertheless I will try your suggestion.

Maybe my configuration is wrong? The only “out-of-orderness”-related thing I 
have configured is Watermarks, in the way I sent previously. The docs [1] 
mention per-kafka-partition watermarks, perhaps this would help me? Sadly, it 
does not say there, how to activate it.

Best,
Theo

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector


> On 13. Feb 2023, at 10:42, Hector Rios <oj.r...@gmail.com> wrote:
> 
> Hi Theo
> 
> In your initial email, you mentioned that you have "a bit of Data on it" when 
> referring to your topic with ten partitions. Correct me if I'm wrong, but 
> that sounds like the data in your topic is bounded and trying to test a 
> streaming use-case. What kind of parallelism do you have configured for this 
> job? Is there a configuration to set the number of slots per task manager?
> 
> I've seen varying results based on the amount of parallelism configured on a 
> job. In the end, it usually boils down to the fact that events might be 
> ingested into Flink out of order. If the event time on an event is earlier 
> than the current watermark, then the event might be discarded unless you've 
> configured some level of out-of-orderedness. Even with out-of-orderedness 
> configured, if your data is bounded, you might have events with later event 
> times arriving earlier, which will remain in the state waiting for the 
> watermark to progress. As you can imagine, if there are no more events, then 
> your records are on hold. 
> 
> As a test, after all, your events have been ingested from the topic, try 
> producing one last event with an event time one or 2 hours later than your 
> latest event and see if they show up.
> 
> Hope it helps
> -Hector
> 
> On Mon, Feb 13, 2023 at 8:47 AM Theodor Wübker <theo.wueb...@inside-m2m.de 
> <mailto:theo.wueb...@inside-m2m.de>> wrote:
> Hey,
> 
> so one more thing, the query looks like this:
> 
> SELECT window_start, window_end, a, b, c, count(*) as x FROM 
> TABLE(TUMBLE(TABLE data.v1, DESCRIPTOR(timeStampData), INTERVAL '1' HOUR)) 
> GROUP BY window_start, window_end, a, b, c
> 
> When the non-determinism occurs, the topic is not keyed at all. When I key it 
> by the attribute “a”, I get the incorrect, but deterministic results. Maybe 
> in the second case, only 1 partition out of the 10 is consumed at once?
> 
> Best,
> Theo
> 
>> On 13. Feb 2023, at 08:15, Theodor Wübker <theo.wueb...@inside-m2m.de 
>> <mailto:theo.wueb...@inside-m2m.de>> wrote
>> 
>> Hey Yuxia,
>> 
>> thanks for your response. I figured too, that the events arrive in a 
>> (somewhat) random order and thus cause non-determinism. I used a Watermark 
>> like this:"timeStampData - INTERVAL '10' SECOND” . Increasing the Watermark 
>> Interval does not solve the problem though, the results are still not 
>> deterministic. Instead I keyed the 10 partition topic. Now results are 
>> deterministic, but they are incorrect (way too few). Am I doing something 
>> fundamentally wrong? I just need the messages to be in somewhat in order 
>> (just so they don’t violate the watermark). 
>> 
>> Best,
>> Theo
>> 
>> (sent again, sorry, I previously only responded to you, not the Mailing list 
>> by accident)
>> 
>>> On 13. Feb 2023, at 08:14, Theodor Wübker <theo.wueb...@inside-m2m.de 
>>> <mailto:theo.wueb...@inside-m2m.de>> wrote:
>>> 
>>> Hey Yuxia,
>>> 
>>> thanks for your response. I figured too, that the events arrive in a 
>>> (somewhat) random order and thus cause non-determinism. I used a Watermark 
>>> like this: "timeStampData - INTERVAL '10' SECOND” . Increasing the 
>>> Watermark Interval does not solve the problem though, the results are still 
>>> not deterministic. Instead I keyed the 10 partition topic. Now results are 
>>> deterministic, but they are incorrect (way too few). Am I doing something 
>>> fundamentally wrong? I just need the messages to be in somewhat in order 
>>> (just so they don’t violate the watermark). 
>>> 
>>> Best,
>>> Theo
>>> 
>>>> On 13. Feb 2023, at 04:23, yuxia <luoyu...@alumni.sjtu.edu.cn 
>>>> <mailto:luoyu...@alumni.sjtu.edu.cn>> wrote:
>>>> 
>>>> HI, Theo.
>>>> I'm wondering what the Event-Time-Windowed Query you are using looks like.
>>>> For example, how do you define the watermark?
>>>> Considering you read records from the 10 partitions, and it may well that 
>>>> the records will arrive the window process operator out of order. 
>>>> Is it possible that the records exceed the watermark, but there're still 
>>>> some records will arrive?
>>>> 
>>>> If that's the case, every time, the records used to calculate result may 
>>>> well different and then result in non-determinism result.
>>>> 
>>>> Best regards,
>>>> Yuxia
>>>> 
>>>> ----- 原始邮件 -----
>>>> 发件人: "Theodor Wübker" <theo.wueb...@inside-m2m.de 
>>>> <mailto:theo.wueb...@inside-m2m.de>>
>>>> 收件人: "User" <user@flink.apache.org <mailto:user@flink.apache.org>>
>>>> 发送时间: 星期日, 2023年 2 月 12日 下午 4:25:45
>>>> 主题: Non-Determinism in Table-API with Kafka and Event Time
>>>> 
>>>> Hey everyone,
>>>> 
>>>> I experience non-determinism in my Table API Program at the moment and (as 
>>>> a relatively unexperienced Flink and Kafka user) I can’t really explain to 
>>>> myself why it happens. So, I have a topic with 10 Partitions and a bit of 
>>>> Data on it. Now I run a simple SELECT * query on this, that moves some 
>>>> attributes around and writes everything on another topic with 10 
>>>> partitions. Then, on this topic I run a Event-Time-Windowed Query. Now I 
>>>> experience Non-Determinism: The results of the windowed query differ with 
>>>> every execution. 
>>>> I thought this might be, because the SELECT query wrote the data to the 
>>>> partitioned topic without keys. So I tried it again with the same key I 
>>>> used for the original topic. It resulted in the exact same topic 
>>>> structure. Now when I run the Event-Time-Windowed query, I get incorrect 
>>>> results (too few result-entries). 
>>>> 
>>>> I have already read a lot of the Docs on this and can’t seem to figure it 
>>>> out. I would much appreciate, if someone could shed a bit of light on 
>>>> this. Is there anything in particular I should be aware of, when reading 
>>>> partitioned topics and running an event time query on that? Thanks :)
>>>> 
>>>> 
>>>> Best,
>>>> Theo
>>> 
>> 
> 

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to