To sum up here, I don't disagree that timestamp semantics are nice and often useful. But currently, there is no way to opt-out of these semantics. In our case the timestamp of an updated item record, which say, provides a better or corrected description, is simply preferred over the old record, period. Trying to force a temporal relationship between that and an event where the item was viewed is non-sensical.
On Mon, 2018-09-17 at 18:18 +0000, Thomas Becker wrote: Hi Matthias, I'm familiar with how the timestamp synchronization currently works. I also submit that it does not work for our use-case, which is the following: The table-backing topic contains records with the best available data we have for a given item. IF a record in this topic is updated, we would always prefer to join using this data *regardless* of whether it is "newer" than the incoming event we are trying to join it with. Essentially, streams assumes that we must want the table data that was current at the time the event was produced, and here we simply don't. If we have newer data, we want that. But my larger concern here is actually reprocessing; when doing that the older table-data will be log compacted away and the current timestamp semantics will result in events that occurred prior to the latest table updates being unjoined. Does this make sense now? Thanks! Tommy On Mon, 2018-09-17 at 09:51 -0700, Matthias J. Sax wrote: I am not sure if this feature would help with stream-table joins. Also note, that we recently merged a PR that improves the timestamp synchronization of Kafka Streams -- this will vastly improve the guarantees. What I don't understand: So table records that have been updated recently will not be read until the stream records reach or exceed that same timestamp. Yes, this is on purpose / by design. and if they do it will be with old data What do you mean by "old data"? By definition, the stream record will join with a table that contains data up-to the stream record's timestamp. It does semantically not make sense to advance the table beyond the stream record's timestamp, because if you do this, you would semantically join with "future data" what---from my point of view---is semantically incorrect. Shameless plug: you might want to read https://www.confluent.io/blog/streams-tables-two-sides-same-coin -Matthias On 9/17/18 8:23 AM, Thomas Becker wrote: For my part, a major use-case for this feature is stream-table joins. Currently, KafkaStreams does the wrong thing in some cases because the only message choosing strategy available is timestamp-based. So table records that have been updated recently will not be read until the stream records reach or exceed that same timestamp. So there is no guarantee these records get joined at all, and if they do it will be with old data. I realize we're talking about the consumer here and not streams specifically, but as it stands I can't even write a non-streams application that does a join but prioritizes table-topic records over stream records without using multiple consumers. On Wed, 2018-09-05 at 08:18 -0700, Colin McCabe wrote: Hi all, I agree that DISCUSS is more appropriate than VOTE at this point, since I don't remember the last discussion coming to a definite conclusion. I guess my concern is that this will add complexity and memory consumption on the server side. In the case of incremental fetch requests, we will have to track at least two extra bytes per partition, to know what the priority of each partition is within each active fetch session. It would be nice to hear more about the use-cases for this feature. I think Gwen asked about this earlier, and I don't remember reading a response. The fact that we're now talking about Samza interfaces is a bit of a red flag. After all, Samza didn't need partition priorities to do what it did. You can do a lot with muting partitions and using appropriate threading in your code. For example, you can hand data from a partition off to a work queue with a fixed size, which is handled by a separate service thread. If the queue gets full, you can mute the partition until some of the buffered data is processed. Kafka Streams uses a similar approach to avoid reading partition data that isn't immediately needed. There might be some use-cases that need priorities eventually, but I'm concerned that we're jumping the gun by trying to implement this before we know what they are. best, Colin On Wed, Sep 5, 2018, at 01:06, Jan Filipiak wrote: On 05.09.2018 02:38, n...@afshartous.com<mailto:n...@afshartous.com><mailto:n...@afshartous.com<mailto:n...@afshartous.com>><mailto:n...@afshartous.com<mailto:n...@afshartous.com><mailto:n...@afshartous.com<mailto:n...@afshartous.com>>> wrote: On Sep 4, 2018, at 4:20 PM, Jan Filipiak <jan.filip...@trivago.com<mailto:jan.filip...@trivago.com><mailto:jan.filip...@trivago.com<mailto:jan.filip...@trivago.com>><mailto:jan.filip...@trivago.com<mailto:jan.filip...@trivago.com><mailto:jan.filip...@trivago.com<mailto:jan.filip...@trivago.com>>>> wrote: what I meant is litterally this interface: https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html <https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html> Hi Jan, Thanks for the reply and I have a few questions. This Samza doc https://samza.apache.org/learn/documentation/0.14/container/streams.html <https://samza.apache.org/learn/documentation/0.14/container/streams.html> indicates that the chooser is set via configuration. Are you suggesting adding a new configuration for Kafka ? Seems like we could also have a method on KafkaConsumer public void register(MessageChooser messageChooser) I don't have strong opinions regarding this. I like configs, i also don't think it would be a problem to have both. to make it more dynamic. Also, the Samza MessageChooser interface has method /* Notify the chooser that a new envelope is available for a processing. */ void update(IncomingMessageEnvelope envelope) and I’m wondering how this method would be translated to Kafka API. In particular what corresponds to IncomingMessageEnvelope. I think Samza uses the envelop abstraction as they support other sources besides kafka aswell. They are more on the spark end of things when it comes to different input types. I don't have strong opinions but it feels like we wouldn't need such a thing in the kafka consumer but just use a regular ConsumerRecord or so. Best, -- Nick ________________________________ This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement. ________________________________ This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement. ________________________________ This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.