A few thoughts:

> introduce a Streams metadata topic (single partitioned and log compacted;
one for each application-ID)

I would consider whether the single partition is a strict requirement. A
single partition increases the likelihood of outages. This is something I'd
like to see changed in Connect at some point -- the config topic is single
partition because it's already complicated enough given compaction (see
this complicated implementation explanation
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L58-L149).
But it also means that you can't make progress on anything if that topic
partition has some outage. When starting up from scratch, maybe you're just
stuck since a single topic partition *could* block progress. But I could
imagine the completed marker in your example simply containing the offsets
for completion in all the other topic partitions that were written to,
making it trivial to determine if you need to read a specific topic
partition in the metadata topic. In that type of design, more partitions
become a tradeoff -- a single streams app might be less likely to hit a
missing partition, but with more partitions if you spread them across the
same # of brokers the chance of any given partition being offline could
increase.

In any case, I think it is worth thinking carefully about that tradeoff
since it's hard to change later.

> Because there will be just one leader, this operation is single-threaded
and thus safe.

Another way to make this safe would be to be able to take advantage of the
generation ID from the consumer. This isn't exposed publicly atm, but I
think you can get at it in the streams impl and also helps protect against
zombie writers (the setup you're describing here and in Connect is
vulnerable to an old leader going through a GC pause or something and then
continuing to overwrite data). Again, this is something we didn't do for
Connect and I now regret it since I think it would also simplify some of
the recovery process when you've had partial writes and then a crash. (Now
that I think about it, adding some sort of version number and making the
log cleaner aware of it would be quite nice...)

> After the last instance did terminate itself, the metadata topic will
have one more record written.
> The writing order into the metadata topic ensures, that we shut down
cleanly.

Not sure I understand this. How does each instance know that it is the
last? Couldn't the last 2 or N complete within the same consumer session,
before a rebalance occurs, and both think they are not the last one? Even
if you can guarantee this, couldn't only some of the messages be written
and the deletion wouldn't be completed? After compaction you'd end up with
partial data that I don't think you can tell is partial. I think the
startup algorithm isn't actually complete as you need to handle 1) no
end-offsets 2) some end-offsets 3) all end-offsets with a completed marker.
(Checking if the completed marker was the last message read isn't
sufficient since we expect the following cleanup messages to be compacted).

Jay's suggestion of using the offset commit metadata was also what I was
thinking before reading the multiple sub topology stuff, but even then a
large input set could probably run into issues with offset commit size
since you'd want to write all of that from the leader in a single
messageset.

-Ewen

On Fri, Dec 9, 2016 at 5:19 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> About using offset-topic metadata field:
>
> Even with KIP-98, I think this would not work (or maybe in a weird way).
> If we have the following topology:
>
> topic1 -> subTopologyA -> topic2 -> subTopologyB
>
> If producer of subTopologyA commits, it will commit its input offsets
> from topic1. Thus, the stop-offsets of topic2 for subTopologyB would we
> committed with the metadata of topic1 commits. But subTopologyB is not
> really related to topic1. I guess it would not be impossible to make
> this work, however, the design seems to be somewhat weird.
>
> But maybe, I do miss something.
>
> Furthermore, I am not sure if Streams will use transactions all the
> same. Will there be an option for the user to disable transactions and
> stick with at-least-once processing?
>
> Also, we would need to delay this KIP until KIP-98 and a Streams EOS KIP
> is in place... I would rather include this in next release 0.10.2.
>
>
> -Matthias
>
> On 12/9/16 10:47 AM, Guozhang Wang wrote:
> > I will read through the KIP doc once again to provide more detailed
> > feedbacks, but let me through my two cents just for the above email.
> >
> > There are a few motivations to have a "consistent" stop-point across
> tasks
> > belonging to different sub-topologies. One of them is for interactive
> > queries: say you have two state stores belonging to two sub-topologies,
> if
> > they stopped at different points, then when user querying them they will
> > see inconsistent answers (think about the example people always use in
> > databases: the stores represent A and B's bank account and a record is
> > processed to move X dollar from A to B).
> >
> > As for the implementation to support such consistent stop-points though,
> I
> > think the metadata field in offset topic does worth exploring, because
> > Streams may very likely use the transactional APIs proposed in KIP-98 to
> > let producers send offsets in a transactional manner, not the consumers
> > themselves (details can be found here
> > https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> 0wSw9ra8
> > ).
> >
> >
> >
> > Guozhang
> >
> >
> > On Tue, Dec 6, 2016 at 2:45 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Thanks for the input Jay.
> >>
> >> From my understanding, your question boils down to how fuzzy the stop
> >> point can/should be, and what guarantees we want to provide to the user.
> >> This has multiple dimension:
> >>
> >>
> >> 1. Using a timestamp has the main issue, that if an input topic has no
> >> data with this timestamp, the application does never finish (ie, stop
> >> timestamp is in "the future").
> >>
> >> Furthermore, it would require the user to specify the timestamp because
> >> if we use current (ie, on startup) system time as stop-timestamp, the
> >> case of a "future stop-timestamp" might be very common (think no active
> >> producers for a topic -- it's batch processing). Thus, the user needs to
> >> know the stop-timestamp, which might be hard for her to figure out -- in
> >> the current design it's way simpler for the user to activate "auto
> stop".
> >>
> >> Last but not least, assume an application with two subtopologies that
> >> are connected via an intermediate topic and both subtopologies are
> >> executed in different JVMs. The first topology could filter a lot of
> >> messages and thus it might happen, that it never writes a record with
> >> timestamp >= stop-timestamp into the intermediate topic. Thus, even if
> >> the first JVM terminates the second would not stop automatically as it
> >> never reads a record with timestamp >= stop-timestamp.
> >>
> >> There would be some workaround if we shut down in a "fuzzy way", ie,
> >> with no guarantees what record will actually get processed (ie, stop
> >> processing somewhat early of some cases). But I will argue in (3) why
> >> this "stop roughly about this time semantic" is not a good idea.
> >>
> >>
> >> 2. I was not aware of a metadata field for committed offsets and this
> >> sounds quite appealing. However, thinking about it in more detail, I
> >> have my doubts we can use it:
> >>
> >> If we want to propagate stop-offsets for intermediate topics, all
> >> producer instances would need to update this metadata field, thus need
> >> to commit (A producer that does commit? Well we could use "restore
> >> consumer" with manual partition assignment for this.) -- however, this
> >> would not only conflict with the commits of the actual consumer, but
> >> also in between all running producers.
> >>
> >>
> >> 3. This is the overall "how fuzzy we want to be" discussion. I would
> >> argue that we should provide somewhat strong stop consistency. Assume
> >> the following use case. An external application generates data in
> >> batches and writes files to HDFS. Those files are imported into Kafka
> >> via Connect. Each time a batch of data gots inserted into Kafka, this
> >> data should be processed with a Streams application. If we cannot
> >> guarantee that all data of this batch is fully processed and the result
> >> is complete, use experience would be quite bad.
> >>
> >> Furthermore, we want to guard a running batch job to process "too much"
> >> data: Assume the same scenario as before with HDFS + Connect. For
> >> whatever reason, a Streams batch job takes longer than usual, and while
> >> it is running new data is appended to the topic as new files (of the
> >> next batch) are already available. We don't want to process this data
> >> with the current running app, but want it to be included in the next
> >> batch run (you could imagine, that each batch job will write the result
> >> into a different output topic). Thus, we want to have all data processed
> >> completely, ie, provide strong start-stop consistency.
> >>
> >>
> >> Sorry for the quite long answer. Hope it is convincing though :)
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 12/5/16 4:44 PM, Jay Kreps wrote:
> >>> I'd like to second the discouragement of adding a new topic per job. We
> >>> went down this path in Samza and I think the result was quite a mess.
> You
> >>> had to read the full topic every time a job started and so it added a
> lot
> >>> of overhead and polluted the topic space.
> >>>
> >>> What if we did the following:
> >>>
> >>>    1. Use timestamp instead of offset
> >>>    2. Store the "stopping timestamp" in the metadata field associated
> >> with
> >>>    the existing offset storage mechanism
> >>>    3. Don't worry about fully processing the entire DAG. After all,
> >>>    partially processing a tuple isn't much different from not
> processing
> >> it,
> >>>    and in any case the stopping point is a heuristic so no point in
> being
> >>>    overly precise here.
> >>>
> >>> Probably I'm missing something, though, I haven't thought through the
> >>> implications of using time instead of offset.
> >>>
> >>> -Jay
> >>>
> >>> On Mon, Nov 28, 2016 at 10:47 AM, Matthias J. Sax <
> matth...@confluent.io
> >>>
> >>> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> I want to start a discussion about KIP-95:
> >>>>
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
> >>>>
> >>>> Looking forward to your feedback.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
> >
>
>

Reply via email to