> Thus, only left/outer KStream-KStream and KStream-KTable join have some runtime dependencies. For more details about join, check out this blog post: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
So I am trying to reprocess and topology and seem to have encountered this. I posted my question to https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly. I fear that this will not be something I can work around :( On Sat, Dec 9, 2017 at 7:52 PM, Matthias J. Sax <matth...@confluent.io> wrote: > About timestamps: embedding timestamps in the payload itself is not > really necessary IMHO. Each record has meta-data timestamp that provides > the exact same semantic. If you just copy data from one topic to > another, the timestamp can be preserved (using plain consumer/producer > and setting the timestamp of the input record explicitly as timestamp > for the output recrod-- for streams, it could be that "some" timestamps > get altered as we apply slightly different timestamp inference > logic---but there are plans to improve this and to better inference that > would preserve the timestamp exactly in Streams, too). > > With regard to flow control: it depends on the operators you use. Some > are fully deterministic, other have some runtime dependencies. Fully > deterministic are all aggregations (non-windowed and windowed), as well > as inner KStream-KStream join and all variants (inner/left/outer) of > KTable-KTable join. > > > If the consumer reads P2 before P1, will the task still > > properly align these two records given their timestamps for the correct > > inner join, assuming both records within the record buffer? > > This will always be computed correctly, even if both records are not in > the buffer at the same time :) > > > Thus, only left/outer KStream-KStream and KStream-KTable join have some > runtime dependencies. For more details about join, check out this blog > post: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/ > > Btw: we are aware of some weaknesses in the current implementation and I > it's on our road map to strengthen our guarantees. Also with regard to > the internally used record buffer, time management in general, as well > as operator semantics. > > Note though: Kafka guarantees offset-based ordering, not > timestamp-ordering. And thus, also in Kafka Streams we process records > in offset order. This implies, that records might be out-of-order with > regard to their timestamps, but our operators are implemented to handle > this case correctly (minus some know issues as mentioned above that we > are going to fix in future releases). > > > Stateless: I mean, if you write a program that only uses stateless > operators like filter/map but not aggregation/joins. > > > > -Matthias > > > On 12/9/17 11:59 AM, Dmitry Minkovsky wrote: > >> How large is the record buffer? Is it configurable? > > > > I seem to have just discovered this answer to this: > > buffered.records.per.partition > > > > On Sat, Dec 9, 2017 at 2:48 PM, Dmitry Minkovsky <dminkov...@gmail.com> > > wrote: > > > >> Hi Matthias, yes that definitely helps. A few thoughts inline below. > >> > >> Thank you! > >> > >> On Fri, Dec 8, 2017 at 4:21 PM, Matthias J. Sax <matth...@confluent.io> > >> wrote: > >> > >>> Hard to give a generic answer. > >>> > >>> 1. We recommend to over-partitions your input topics to start with (to > >>> avoid that you need to add new partitions later on); problem avoidance > >>> is the best strategy. There will be some overhead for this obviously on > >>> the broker side, but it's not too big. > >>> > >> > >> Yes, I will definitely be doing this. > >> > >> > >>> > >>> 2. Not sure why you would need a new cluster? You can just create a new > >>> topic in the same cluster and let Kafka Streams read from there. > >>> > >> > >> Motivated by fear of disturbing/manipulating a production cluster and > the > >> relative ease of putting up a new cluster. Perhaps that fear is > irrational. > >> I could alternatively just prefix topics. > >> > >> > >>> > >>> 3. Depending on your state requirements, you could also run two > >>> applications in parallel -- the new one reads from the new input topic > >>> with more partitions and you configure your producer to write to the > new > >>> topic (or maybe even to dual writes to both). If your new application > is > >>> ramped up, you can stop the old one. > >>> > >> > >> Yes, this is my plan for migrations. If I could run it past you: > >> > >> (i) Write input topics from the old prefix to the new prefix. > >> (ii) Start the new Kafka Streams application against the new prefix. > >> (iii) When the two applications are in sync, stop writing to the old > >> topics > >> > >> Since I will be copying from an old prefix to new prefix, it seems > >> essential here to have timestamps embedded in the data records along > with a > >> custom timestamp extractor. > >> > >> I really wish I could get some more flavor on "Flow Control With > >> Timestamps > >> <https://docs.confluent.io/current/streams/architecture. > html#flow-control-with-timestamps>" > >> in this regard. Assuming my timestamps are monotonically increasing > within > >> each input topic, from my reading of that section it still appears that > the > >> result of reprocessing input topics is non-deterministic beyond the > >> "records in its stream record buffer". Some seemingly crucial sentences: > >> > >>> *This flow control is best-effort because it is not always possible to > >> strictly enforce execution order across streams by record timestamp; in > >> fact, in order to enforce strict execution ordering, one must either > wait > >> until the system has received all the records from all streams (which > may > >> be quite infeasible in practice) or inject additional information about > >> timestamp boundaries or heuristic estimates such as MillWheel’s > watermarks.* > >> > >> > >> Practically, how am I to understand this? How large is the record > buffer? > >> Is it configurable? > >> > >> For example, suppose I am re-processing an inner join on partitions P1 > >> (left) and P2 (right). In the original processing, record K1V1T1 was > >> recorded onto P1, then some time laster record K1V2T2 was recorded onto > P2. > >> As a result, K1V2T2 was joined with K1V1T1. Now, during re-processing, > P1 > >> and P2 contain historical data and the Kafka Streams consumers can read > P2 > >> before P1. If the consumer reads P2 before P1, will the task still > >> properly align these two records given their timestamps for the correct > >> inner join, assuming both records within the record buffer? I've > >> experimented with this, but unfortunately I didn't have time to really > set > >> up good experiments to satisfy myself. > >> > >> > >>> 4. If you really need to add new partitions, you need to fix up all > >>> topics manually -- including all topics Kafka Streams created for you. > >>> Adding partitions messes up all your state shared as key-based > >>> partitioning changes. This implies that you application must be > stopped! > >>> Thus, if you have zero downtime requirements you can't do this at all. > >>> > >>> 5. If you have a stateless application all those issues go away though > >>> and you can even add new partitions during runtime. > >>> > >>> > >> Stateless in what sense? Kafka Streams seems to be all about aligning > and > >> manipulating state to create more state. Are you referring to internal > >> state, specifically? > >> > >> > >> > >>> > >>> Hope this helps. > >>> > >>> > >>> -Matthias > >>> > >>> > >>> > >>> On 12/8/17 11:02 AM, Dmitry Minkovsky wrote: > >>>> I am about to put a topology into production and I am concerned that I > >>>> don't know how to repartition/rebalance the topics in the event that I > >>> need > >>>> to add more partitions. > >>>> > >>>> My inclination is that I should spin up a new cluster and run some > kind > >>> of > >>>> consumer/producer combination that takes data from the previous > cluster > >>> and > >>>> writes it to the new cluster. A new instance of the Kafka Streams > >>>> application then works against this new cluster. But I'm not sure how > to > >>>> best execute this, or whether this approach is sound at all. I am > >>> imagining > >>>> many things may go wrong. Without going into further speculation, what > >>> is > >>>> the best way to do this? > >>>> > >>>> Thank you, > >>>> Dmitry > >>>> > >>> > >>> > >> > > > >