I am not sure if this feature would help with stream-table joins. Also
note, that we recently merged a PR that improves the timestamp
synchronization of Kafka Streams -- this will vastly improve the guarantees.

What I don't understand:

> So table records that have been updated recently will not be read until the 
> stream records reach or exceed that same timestamp.

Yes, this is on purpose / by design.

> and if they do it will be with old data

What do you mean by "old data"? By definition, the stream record will
join with a table that contains data up-to the stream record's
timestamp. It does semantically not make sense to advance the table
beyond the stream record's timestamp, because if you do this, you would
semantically join with "future data" what---from my point of view---is
semantically incorrect.

Shameless plug: you might want to read
https://www.confluent.io/blog/streams-tables-two-sides-same-coin



-Matthias

On 9/17/18 8:23 AM, Thomas Becker wrote:
> For my part, a major use-case for this feature is stream-table joins. 
> Currently, KafkaStreams does the wrong thing in some cases because the only 
> message choosing strategy available is timestamp-based. So table records that 
> have been updated recently will not be read until the stream records reach or 
> exceed that same timestamp. So there is no guarantee these records get joined 
> at all, and if they do it will be with old data. I realize we're talking 
> about the consumer here and not streams specifically, but as it stands I 
> can't even write a non-streams application that does a join but prioritizes 
> table-topic records over stream records without using multiple consumers.
> 
> On Wed, 2018-09-05 at 08:18 -0700, Colin McCabe wrote:
> 
> Hi all,
> 
> 
> I agree that DISCUSS is more appropriate than VOTE at this point, since I 
> don't remember the last discussion coming to a definite conclusion.
> 
> 
> I guess my concern is that this will add complexity and memory consumption on 
> the server side.  In the case of incremental fetch requests, we will have to 
> track at least two extra bytes per partition, to know what the priority of 
> each partition is within each active fetch session.
> 
> 
> It would be nice to hear more about the use-cases for this feature.  I think 
> Gwen asked about this earlier, and I don't remember reading a response.  The 
> fact that we're now talking about Samza interfaces is a bit of a red flag.  
> After all, Samza didn't need partition priorities to do what it did.  You can 
> do a lot with muting partitions and using appropriate threading in your code.
> 
> 
> For example, you can hand data from a partition off to a work queue with a 
> fixed size, which is handled by a separate service thread.  If the queue gets 
> full, you can mute the partition until some of the buffered data is 
> processed.  Kafka Streams uses a similar approach to avoid reading partition 
> data that isn't immediately needed.
> 
> 
> There might be some use-cases that need priorities eventually, but I'm 
> concerned that we're jumping the gun by trying to implement this before we 
> know what they are.
> 
> 
> best,
> 
> Colin
> 
> 
> 
> On Wed, Sep 5, 2018, at 01:06, Jan Filipiak wrote:
> 
> 
> On 05.09.2018 02:38, n...@afshartous.com<mailto:n...@afshartous.com> wrote:
> 
> 
> On Sep 4, 2018, at 4:20 PM, Jan Filipiak 
> <jan.filip...@trivago.com<mailto:jan.filip...@trivago.com>> wrote:
> 
> 
> what I meant is litterally this interface:
> 
> 
> https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html
>  
> <https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html>
> 
> Hi Jan,
> 
> 
> Thanks for the reply and I have a few questions.  This Samza doc
> 
> 
>    https://samza.apache.org/learn/documentation/0.14/container/streams.html 
> <https://samza.apache.org/learn/documentation/0.14/container/streams.html>
> 
> 
> indicates that the chooser is set via configuration.  Are you suggesting 
> adding a new configuration for Kafka ?  Seems like we could also have a 
> method on KafkaConsumer
> 
> 
>      public void register(MessageChooser messageChooser)
> 
> I don't have strong opinions regarding this. I like configs, i also
> 
> don't think it would be a problem to have both.
> 
> 
> 
> to make it more dynamic.
> 
> 
> Also, the Samza MessageChooser interface has method
> 
> 
>    /* Notify the chooser that a new envelope is available for a processing. */
> 
> void update(IncomingMessageEnvelope envelope)
> 
> 
> and I’m wondering how this method would be translated to Kafka API.  In 
> particular what corresponds to IncomingMessageEnvelope.
> 
> I think Samza uses the envelop abstraction as they support other sources
> 
> besides kafka aswell. They are more
> 
> on the spark end of things when it comes to different input types. I
> 
> don't have strong opinions but it feels like
> 
> we wouldn't need such a thing in the kafka consumer but just use a
> 
> regular ConsumerRecord or so.
> 
> 
> Best,
> 
> --
> 
>        Nick
> 
> 
> 
> 
> 
> 
> ________________________________
> 
> This email and any attachments may contain confidential and privileged 
> material for the sole use of the intended recipient. Any review, copying, or 
> distribution of this email (or any attachments) by others is prohibited. If 
> you are not the intended recipient, please contact the sender immediately and 
> permanently delete this email and any attachments. No employee or agent of 
> TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo 
> Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed 
> written agreement.
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to