Hi Matthias, Ok, that clarifies quite a bit. I never really went into the timestamp aspects, as time does not really play a role in my application (aside from the repartition topics, I have no KStreams or Windowed operation, just different kind of KTable join).
I do think that the fail case I see (With this version joining two 'old' KTables causes a small percentage of records to vanish) is far from intuitive, and it somehow worked fine until a few weeks ago. I think your option 3 should work. I'll make a custom timestamp extractor (I actually do have a timestamp in my messages), and I'll set it to the current time as they enter the streams application. Thanks, that helped, regards, Frank On Fri, Jun 2, 2017 at 9:17 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Hi Frank, > > yes, retention policy is based on the embedded record timestamps and not > on system time. Thus, if you send messages with an old timestamp, they > can trigger log/segment rolling. > > >> I see that the repartition topics have timestamp.type = CreateTime, does > >> that mean it uses the timestamp of the > >> original message? > > Yes. That's the default setting on the broker side. For Streams, we > maintain a so-called "stream time" that is computed based on the input > record timestamps. This "stream time" is used to set the timestamp for > records that are written by Stream. (so it's more or less the timestamp > of the input records). > > >> Shouldn't that be LogAppendTime for repartition topics? > > No. Streams needs to preserve the original timestamp to guaranteed > correct semantics for downstream window operations. Thus, it should be > CreateTime -- if you switch to LogAppendTime, you might break your > application logic and get wrong results. > > >> Or is there a way to configure that? > > You can configure this on a per topic basis on the brokers. > > >> If I hack into my Kafka streams code to force it to use LogAppendTime > seems > >> to solve my problem, but that appears to > >> take a huge toll on the brokers. Throughput plummets, and I don't really > >> know why. > > I am not sure what you mean by this? As it's a topic config, I don't > understand how you can force this within you Streams application? > > > IMHO, you have multiple options thoug: > - increase the retention time for you re-partitioning topics > - you could change the retention policy to number of bytes instead of > time for the re-partitioning topics > - you can implement a custom timestamp extractor and adjust the > timestamps accordingly ("stream time" is based on whatever timestamp > extractor return) > > However, if you have records with old timestamps, I am wondering why > they are not truncated in your input topic? Do you not face the same > issue there? > > All my topics are compacted, I use no windowed operations at all, the only 'delete' topics are the repartitioning internal topics. > > -Matthias > > > > > > On 6/2/17 9:33 AM, Frank Lyaruu wrote: > > Hi Kafka people, > > > > I'm running an application that pushes database changes into a Kafka > topic. > > I'm also running a Kafka streams application > > that listens to these topics, and groups them using the high level API, > and > > inserts them to another database. > > > > All topics are compacted, with the exception of the 'repartition topics', > > which are configured to be retained for 36 hours. > > > > Note that the changes in the original kafka topics can be old (generally > > more than 36 hours), as they only change when > > the data changes. > > > > When I start an instance of the Kafka Streams application, I see the > > repartition topics being deleted immediately, > > sometimes before they are processed, and it looks like the repartition > > messages use the same timestamp as the > > original message. > > > > I see that the repartition topics have timestamp.type = CreateTime, does > > that mean it uses the timestamp of the > > original message? Shouldn't that be LogAppendTime for repartition topics? > > Or is there a way to configure that? > > > > If I hack into my Kafka streams code to force it to use LogAppendTime > seems > > to solve my problem, but that appears to > > take a huge toll on the brokers. Throughput plummets, and I don't really > > know why. > > > > Any ideas? > > > > Frank > > > >