We currently do not have a KIP for it yet. On Wed, Jun 7, 2017 at 3:21 AM, Frank Lyaruu <flya...@gmail.com> wrote:
> I tried to use a TimestampExtractor that uses our timestamps from the > messages, and use a 'map' operation on the KTable to set it to current, to > have a precise point where I discard our original timestamps. That does not > work, (I verified by writing a separate java Kafka Consumer and spit out > the timestamps) as the TimestampExtractor only gets called once, and it > will stick with that time. I did not really have a good reason not to > simply use the WallclockTimeExtractor, and that one seems to do exactly > what I wanted. > > So, I'm good! I am interested in the community discussion Guozhang > mentions. Is there a KIP for that? > > regards, Frank > > > On Mon, Jun 5, 2017 at 8:25 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > Frank, > > > > If you use "now", I assume you are calling System.currentTimeMillis(). > > If yes, you can also use predefined WallclockTimestampExtractor that > > ships with Streams (no need to write your own one). > > > > > I thought that the Timestamp extractor would then also use > > >> that updated timestamp as 'stream time', but I don't really see that > > >> happening, so that assumption was wrong. > > > > Yes, this should happen. Not sure why you don't observe this. And thus, > > the producer should use this timestamp to write the records. > > > > How did you verify the timestamps that are set for your output records? > > > > > > -Matthias > > > > > > On 6/5/17 6:15 AM, Frank Lyaruu wrote: > > > Thanks Guozhang, > > > > > > I figured I could use a custom timestamp extractor, and set that > > timestamp > > > to 'now' when reading a source topic, as the original timestamp is > pretty > > > much irrelevant. I thought that the Timestamp extractor would then also > > use > > > that updated timestamp as 'stream time', but I don't really see that > > > happening, so that assumption was wrong. > > > > > > If I could configure a timestamp extractor that would also be used by > the > > > producer I think I'd be in business, but right now I don't see an > elegant > > > way forward, so any ideas for work arounds are welcome. > > > > > > regards, Frank > > > > > > On Mon, Jun 5, 2017 at 7:01 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > >> Frank, thanks for sharing with your findings. > > >> > > >> I think this is a general issue to consider in Streams, and the > > community > > >> has been thinking about it: we write intermediate topics with the > stream > > >> time that is inherited from the source topic's timestamps, however > that > > >> timestamp is used for log rolling / retention etc as well, and these > two > > >> purposes (use timestamps in processing for out-of-ordering and late > > >> arrivals, and operations on the Kafka topics) could rely on different > > >> timestamp semantics. We need to revisit on timestamps can be > maintained > > >> across the topology in Streams. > > >> > > >> Guozhang > > >> > > >> On Sat, Jun 3, 2017 at 10:54 AM, Frank Lyaruu <flya...@gmail.com> > > wrote: > > >> > > >>> Hi Matthias, > > >>> > > >>> Ok, that clarifies quite a bit. I never really went into the > timestamp > > >>> aspects, as time does not really play a role in my application (aside > > >> from > > >>> the repartition topics, I have no KStreams or Windowed operation, > just > > >>> different kind of KTable join). > > >>> > > >>> I do think that the fail case I see (With this version joining two > > 'old' > > >>> KTables causes a small percentage of records to vanish) is far from > > >>> intuitive, and it somehow worked fine until a few weeks ago. > > >>> > > >>> I think your option 3 should work. I'll make a custom timestamp > > extractor > > >>> (I actually do have a timestamp in my messages), and I'll set it to > the > > >>> current time as they enter the streams application. > > >>> > > >>> Thanks, that helped, regards, Frank > > >>> > > >>> On Fri, Jun 2, 2017 at 9:17 PM, Matthias J. Sax < > matth...@confluent.io > > > > > >>> wrote: > > >>> > > >>>> Hi Frank, > > >>>> > > >>>> yes, retention policy is based on the embedded record timestamps and > > >> not > > >>>> on system time. Thus, if you send messages with an old timestamp, > they > > >>>> can trigger log/segment rolling. > > >>>> > > >>>>>> I see that the repartition topics have timestamp.type = > CreateTime, > > >>> does > > >>>>>> that mean it uses the timestamp of the > > >>>>>> original message? > > >>>> > > >>>> Yes. That's the default setting on the broker side. For Streams, we > > >>>> maintain a so-called "stream time" that is computed based on the > input > > >>>> record timestamps. This "stream time" is used to set the timestamp > for > > >>>> records that are written by Stream. (so it's more or less the > > timestamp > > >>>> of the input records). > > >>>> > > >>>>>> Shouldn't that be LogAppendTime for repartition topics? > > >>>> > > >>>> No. Streams needs to preserve the original timestamp to guaranteed > > >>>> correct semantics for downstream window operations. Thus, it should > be > > >>>> CreateTime -- if you switch to LogAppendTime, you might break your > > >>>> application logic and get wrong results. > > >>>> > > >>>>>> Or is there a way to configure that? > > >>>> > > >>>> You can configure this on a per topic basis on the brokers. > > >>>> > > >>>>>> If I hack into my Kafka streams code to force it to use > > >> LogAppendTime > > >>>> seems > > >>>>>> to solve my problem, but that appears to > > >>>>>> take a huge toll on the brokers. Throughput plummets, and I don't > > >>> really > > >>>>>> know why. > > >>>> > > >>>> I am not sure what you mean by this? As it's a topic config, I don't > > >>>> understand how you can force this within you Streams application? > > >>>> > > >>>> > > >>>> IMHO, you have multiple options thoug: > > >>>> - increase the retention time for you re-partitioning topics > > >>>> - you could change the retention policy to number of bytes instead > of > > >>>> time for the re-partitioning topics > > >>>> - you can implement a custom timestamp extractor and adjust the > > >>>> timestamps accordingly ("stream time" is based on whatever timestamp > > >>>> extractor return) > > >>>> > > >>>> However, if you have records with old timestamps, I am wondering why > > >>>> they are not truncated in your input topic? Do you not face the same > > >>>> issue there? > > >>>> > > >>>> All my topics are compacted, I use no windowed operations at all, > the > > >>> only > > >>> 'delete' > > >>> topics are the repartitioning internal topics. > > >>> > > >>>> > > >>>> -Matthias > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> On 6/2/17 9:33 AM, Frank Lyaruu wrote: > > >>>>> Hi Kafka people, > > >>>>> > > >>>>> I'm running an application that pushes database changes into a > Kafka > > >>>> topic. > > >>>>> I'm also running a Kafka streams application > > >>>>> that listens to these topics, and groups them using the high level > > >> API, > > >>>> and > > >>>>> inserts them to another database. > > >>>>> > > >>>>> All topics are compacted, with the exception of the 'repartition > > >>> topics', > > >>>>> which are configured to be retained for 36 hours. > > >>>>> > > >>>>> Note that the changes in the original kafka topics can be old > > >>> (generally > > >>>>> more than 36 hours), as they only change when > > >>>>> the data changes. > > >>>>> > > >>>>> When I start an instance of the Kafka Streams application, I see > the > > >>>>> repartition topics being deleted immediately, > > >>>>> sometimes before they are processed, and it looks like the > > >> repartition > > >>>>> messages use the same timestamp as the > > >>>>> original message. > > >>>>> > > >>>>> I see that the repartition topics have timestamp.type = CreateTime, > > >>> does > > >>>>> that mean it uses the timestamp of the > > >>>>> original message? Shouldn't that be LogAppendTime for repartition > > >>> topics? > > >>>>> Or is there a way to configure that? > > >>>>> > > >>>>> If I hack into my Kafka streams code to force it to use > LogAppendTime > > >>>> seems > > >>>>> to solve my problem, but that appears to > > >>>>> take a huge toll on the brokers. Throughput plummets, and I don't > > >>> really > > >>>>> know why. > > >>>>> > > >>>>> Any ideas? > > >>>>> > > >>>>> Frank > > >>>>> > > >>>> > > >>>> > > >>> > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > -- -- Guozhang