[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841611#comment-16841611
 ] 

Andrew edited comment on KAFKA-8315 at 5/16/19 6:30 PM:
--------------------------------------------------------

Well, Im not sure that just fixing the start is sufficient. I suspect that, if 
you run over a long enough period, through sheer bad luck you will encounter an 
issue where a queue is drained, and yet also fails to fetch any records on the 
next fetch cycle, in this case you will end up with the same situation I think 
i.e. an empty queue that will still be processed because of the zero max idle 
time. This can happen on either side of the join, so if one side travels too 
far ahead of the other you would lose the join windows.

As I was rummaging through the code, I also wondered whether a closer 
relationship with the consumer would make things a bit easier. Then you can 
control better how many records you grab to push into the queues. For example, 
I'm not sure, but I kinda have a suspicion that the queues can grow quite 
large, as the limit is on the fetch, and the queue cant put back-pressure on 
the fetch. But, if you had control over the consumer, and you wanted to limit 
the size of the queue, then you would know if one side is full then you don't 
need to not fetch more records on that side etc. You would also potentially be 
able to use streamTime to make such decisions I suppose.

I also wondered whether you could allow the left(right) stream to process (in 
the absence of right(left records), provided the left(right) streamTime stays 
less than the right(left) streamTime -right(left) grace - windowSize. i.e. not 
to go beyond the first active window on the right(left) stream.


was (Author: the4thamigo_uk):
Well, Im not sure that just fixing the start is sufficient. I suspect that, if 
you run over a long enough period, through sheer bad luck you will encounter an 
issue where a queue is drained, and yet also fails to fetch any records on the 
next fetch cycle, in this case you will end up with the same situation I think 
i.e. an empty queue that will still be processed because of the zero max idle 
time. This can happen on either side of the join, so if one side travels too 
far ahead of the other you would lose the join windows.

As I was rummaging through the code, I also wondered whether a closer 
relationship with the consumer would make things a bit easier. Then you can 
control better how many records you grab to push into the queues. For example, 
I'm not sure, but I kinda have a suspicion that the queues can grow quite 
large, as the limit is on the fetch, and the queue cant put back-pressure on 
the fetch. But, if you had control over the consumer, and you wanted to limit 
the size of the queue, then you would know if one side is full then you don't 
need to not fetch more records on that side etc. You would also potentially be 
able to use streamTime to make such decisions I suppose.

I also wondered whether you could allow the left(right) stream to process (in 
the absence of right(left records), provided the left(right) streamTime stays 
less than right(left) streamTime -right(left) grace - windowSize. i.e. not to 
go beyond the first active window on the right(left) stream.

> Historical join issues
> ----------------------
>
>                 Key: KAFKA-8315
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8315
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Andrew
>            Assignee: John Roesler
>            Priority: Major
>         Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to