Regarding KAFKA-4468, as discussed on the JIRA we intentionally did not
write the end-timestamp to RocksDB for storage optimization, i.e. we will
still write the combo of window-start-time and key, that is because for
TimeWindow the window length is fixed and accessible in the Windows object,
so we can just read the start-timestamp and key, and then use the Windows'
length value to calculate the end-timestamp.

The problem is that currently we do not set the end-timestamp after reading
it from RocksDB since WindowedSerde was considered internal class and not
used by users directly; this should be fixed, but I think we can still
maintain the current storage format to not write the end-timestamp as it
seems not necessary.


Guozhang

On Tue, Jan 17, 2017 at 3:16 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> With regard to the JIRA. I guess we do not want to put the end timestamp
> into the key. For general usage, windows of different type are written
> into different topics.
>
> Thus, Nicolas' use case is quite special and using custom Serde is the
> better approach to handle it, instead of changing Kafka Streams.
>
> Nicolas, of course you are still welcome to work on
> https://issues.apache.org/jira/browse/KAFKA-4468 but the patch should
> not change the key format but only compute the correct window end
> timestamp if a window gets deserialized.
>
> @Guozhang: please correct me if I am wrong and we want to follow Eno's
> suggestion.
>
>
> -Matthias
>
> On 1/17/17 1:39 AM, Eno Thereska wrote:
> > For changes that may be backwards incompatible or change the APIs we
> usually do a short KIP first (e.g., I just did one yesterday:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 114%3A+KTable+materialization+and+improved+semantics <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 114:+KTable+materialization+and+improved+semantics>). It's not meant to
> be overly-burdensome, and it encourages the community to participate in the
> design. In this case I suspect the KIP can be very short, a paragraph or so.
> >
> > Thanks
> > Eno
> >
> >> On 16 Jan 2017, at 22:52, Nicolas Fouché <nfou...@onfocus.io> wrote:
> >>
> >> In the case of KAFKA-4468, it's more about state stores. But still, keys
> >> would not be backward compatible. What is the "official" policy about
> this
> >> kind of change ?
> >>
> >> 2017-01-16 23:47 GMT+01:00 Nicolas Fouché <nfou...@onfocus.io>:
> >>
> >>> Hi Eno,
> >>> I thought it would be impossible to put this in Kafka because of
> backward
> >>> incompatibility with the existing windowed keys, no ?
> >>> In my case, I had to recreate a new output topic, reset the topology,
> and
> >>> and reprocess all my data.
> >>>
> >>> 2017-01-16 23:05 GMT+01:00 Eno Thereska <eno.there...@gmail.com>:
> >>>
> >>>> Nicolas,
> >>>>
> >>>> I'm checking with Bill who originally was interested in KAFKA-4468.
> If he
> >>>> isn't actively working on it, why don't you give it a go and create a
> pull
> >>>> request (PR) for it? That way your contribution is properly
> acknowledged
> >>>> etc. We can help you through with that.
> >>>>
> >>>> Thanks
> >>>> Eno
> >>>>> On 16 Jan 2017, at 18:46, Nicolas Fouché <nfou...@onfocus.io> wrote:
> >>>>>
> >>>>> My current implementation:
> >>>>> https://gist.github.com/nfo/eaf350afb5667a3516593da4d48e757a . I
> just
> >>>>> appended the window `end` at the end of the byte array.
> >>>>> Comments and suggestions are welcome !
> >>>>>
> >>>>>
> >>>>> 2017-01-16 15:48 GMT+01:00 Nicolas Fouché <nfou...@onfocus.io>:
> >>>>>
> >>>>>> Hi Damian,
> >>>>>>
> >>>>>> I recall now that I copied the `WindowedSerde` class [1] from
> Confluent
> >>>>>> examples by Confluent, which uses the internal `WindowedSerializer`
> >>>> class.
> >>>>>> Better write my own Serde them. You're right, I should not rely on
> >>>>>> internal classes, especially for data written outside Kafka Streams
> >>>>>> topologies.
> >>>>>>
> >>>>>> Thanks for the insights on KAFKA-4468.
> >>>>>>
> >>>>>> https://github.com/confluentinc/examples/blob/
> >>>>>> 89db45c6890cf757b8e18565bdf7bc23f119a2ff/kafka-streams/src/
> >>>>>> main/java/io/confluent/examples/streams/utils/WindowedSerde.java
> >>>>>>
> >>>>>> Nicolas.
> >>>>>>
> >>>>>> 2017-01-16 12:31 GMT+01:00 Damian Guy <damian....@gmail.com>:
> >>>>>>
> >>>>>>> Hi Nicolas,
> >>>>>>>
> >>>>>>> I guess you are using the Processor API for your topology? The
> >>>>>>> WindowedSerializer is an internal class that is used as part of the
> >>>> DSL.
> >>>>>>> In
> >>>>>>> the DSL a topic will be created for each window operation, so we
> don't
> >>>>>>> need
> >>>>>>> the end time as it can be calculated from the window size.
> >>>>>>> However, there is an open jira for this:
> >>>>>>> https://issues.apache.org/jira/browse/KAFKA-4468
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Damian
> >>>>>>>
> >>>>>>> On Mon, 16 Jan 2017 at 11:18 Nicolas Fouché <nfou...@onfocus.io>
> >>>> wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> In the same topology, I generate aggregates with 1-day windows and
> >>>>>>> 1-week
> >>>>>>>> windows and write them in one single topic. On Mondays, these
> windows
> >>>>>>> have
> >>>>>>>> the same start time. The effect: these aggregates overrides each
> >>>> other.
> >>>>>>>>
> >>>>>>>> That happens because WindowedSerializer [1] only serializes the
> >>>> window
> >>>>>>>> start time. I'm a bit surprised, a window has by definition a
> start
> >>>> and
> >>>>>>> an
> >>>>>>>> end. I suppose one wanted save on key sizes ? And/or one would
> >>>> consider
> >>>>>>>> that topics should not contain aggregates with different
> >>>> granularities ?
> >>>>>>>>
> >>>>>>>> I have two choices then, either create as many output topics as I
> >>>> have
> >>>>>>>> granularities, or create my own serializer which also includes the
> >>>>>>> window
> >>>>>>>> end time. What would the community recommend ?
> >>>>>>>>
> >>>>>>>> Getting back to the core problem:
> >>>>>>>> I could understand that it's not "right" to store different
> >>>>>>> granularities
> >>>>>>>> in one topic, and I thought it would save resources (less topic to
> >>>>>>> manage
> >>>>>>>> by Kafka). But, I'm really not sure about this default
> serializer: it
> >>>>>>> does
> >>>>>>>> not serialize all instance variables of the `Window` class, and
> more
> >>>>>>>> generally does comply to the definition of a window.
> >>>>>>>>
> >>>>>>>> [1]
> >>>>>>>>
> >>>>>>>> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
> >>>>>>> /java/org/apache/kafka/streams/kstream/internals/WindowedSer
> >>>> ializer.java
> >>>>>>>>
> >>>>>>>> Thanks.
> >>>>>>>> Nicolas
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>>
> >
> >
>
>


-- 
-- Guozhang

Reply via email to