Hey Chris!

Nice work on this KIP!

What are the thoughts about letting the connector developer control
the boundaries of the transaction? For example kafka-connect-spooldir
is used to parse and import files to kafka. It would be amazing if I
could begin and end the transaction as I open and close files. This
would allow me to guard from the dreaded exception a line 732 and wrap
an entire file in a transaction. It either makes it or it doesn't.

J


On Tue, May 18, 2021 at 10:04 AM Chris Egerton
<chr...@confluent.io.invalid> wrote:
>
> Hi Randall,
>
> Thanks for clarifying the issues with large transactions. I think we're
> starting to converge on an understanding of the problem space, if not quite
> on an approach to tackle it.
>
> I think my biggest issue with defining transaction boundaries on a
> per-task-poll basis are these points that you make:
>
> > First, it reuses something that connector implementations
> already employ, and which many connectors have configs to tune the behavior
> to maximize throughput. Second, it requires no additional configuration
> properties or connector-specific behavior. Third, high-throughput
> connectors are likely returning large batches, so write amplification for
> the extra offset record per source partition is likely to have a much
> smaller impact (assuming that most high throughput connectors have high
> records-per-source-partition ratios).
>
> I don't believe the first and third points are safe assumptions to make.
> There's actually very little, if any, performance benefit to writing source
> connectors whose tasks give Connect larger batches (at least, not as far as
> the framework logic goes). Every record is sequentially transformed,
> converted, and dispatched to the producer, regardless of whether it came
> from a batch of one or one million. So Connect does have the capability
> right now to support high-throughput, small-batch connectors.
>
> For a concrete example, Confluent's Datagen connector (
> https://github.com/confluentinc/kafka-connect-datagen), which is used to
> produce sample data for quickstarts and demos, performs just fine even
> though it does absolutely no batching whatsoever and returns only a single
> record per call to "SourceTask::Poll". In some not-super-rigorous
> performance testing, I ran the connector three times against a local build
> of the 2.8.0 release of Connect, then modified the 2.8.0 release of Connect
> to perform a producer flush for every task-provided batch of records, then
> ran the connector three times against that. Each test run produced exactly
> one million records. The worst run out of the initial cases (against the
> unmodified local 2.8.0 build) took 15 seconds to complete. The best run out
> of the subsequent cases (against the modified local 2.8.0 build) took 2
> minutes and 44 seconds, or 164 seconds--over a 10x slowdown. And this was
> against a single local broker with no replication factor.
>
> Of course, if it's a matter of accommodating this one (demo-oriented)
> connector, then it might still be worth it to put the onus on the
> developers of that connector to modify it appropriately to work with
> exactly-once support. But my point with this connector is more
> general--simply put, there don't appear to be safe grounds for the
> assumption that source tasks must produce large batches in order to achieve
> high throughput.
>
> > Yes, per-connector offset commit intervals is one approach that would be
> more explicit, though see my concerns earlier in this email about
> controlling the number of records in a transaction by a time-based config,
> even if set on a per-connector basis.
>
> I'll try to summarize the concerns presented; let me know if I'm missing
> something:
>
> 1. Large transaction sizes can inflate the memory requirements for
> consumers and possibly even overwhelm them with out-of-memory errors.
> 2. High offset commit intervals can inflate the read latency of downstream
> applications.
>
> Combining both of these concerns with the high-throughput-small-batch
> scenario, it still seems worthwhile to provide users with a way to do some
> multi-batch transactions for their source tasks. This is analogous to
> consumer-side buffering; yes, by default you probably want events to be
> available downstream as soon as possible, but in some cases it's still
> necessary to introduce a small hit in latency in order to keep up with high
> throughput. And if each batch is relatively small, then heightened memory
> requirements for consumers shouldn't really be a problem.
>
> And for other scenarios, with per-connector offset commit intervals, users
> can always just set the interval to zero to get exactly the behavior that
> you're describing.
>
> > I'm not sure that setting up a separate Connect cluster is that practical
> for many Connect users, especially in larger organizations where one group
> manages the cluster and others manage connectors.
>
> I agree; I was mostly responding to what I perceived as the assumption that
> exactly-once support could be configured on a per-connector basis, which
> hopefully we've clarified by now is not the case.
>
> > Reducing throughput is a technical solution, especially if the only
> alternative is that the task doesn't run. But IMO it's an impractical
> technical solution as most users will want a real-time solution, and
> decreasing connector's throughput is the opposite of this.
>
> Are you proposing that this option not be provided to users? I'm afraid if
> we don't do that then it'll just become an undocumented workaround that a
> handful of people are aware of (or, worse yet, have to discover for
> themselves), but most people aren't, even if they would benefit from
> hearing it. I don't think we lose out on much by providing this as a
> temporary solution to people who are in a pinch and just need to get data
> flowing again, especially if they don't have the capability to run their
> connector without exactly-once support.
>
> > The KIP says: "It may take longer than the transaction timeout for a task
> to flush all of its records to Kafka." Might another option be to reduce
> the number of records in the transaction?
> Is it possible to eliminate transaction timeouts by reducing the size of
> the transactions?
>
> Yes, that is an option. This can be accomplished either by reducing the
> offset commit interval (already noted in the KIP) or, if the offset commit
> interval is already zero, reconfiguring the connector to produce smaller
> batches.
>
>
>
> Considering the points that have been raised about transaction boundaries,
> I wonder if we can agree on enabling per-poll offset boundaries by default
> but providing users with the option to override this. This way, we get the
> lowest-latency, lowest-memory-footprint settings out of the box, but we
> don't force connector developers to switch to a batching model that has not
> been implicitly or explicitly required by Connect for high throughput, in
> order to play nicely with exactly-once support.
>
> Cheers,
>
> Chris
>
> On Mon, May 17, 2021 at 7:24 PM Randall Hauch <rha...@gmail.com> wrote:
>
> > On Mon, May 10, 2021 at 10:26 AM Chris Egerton <chr...@confluent.io.invalid
> > >
> > wrote:
> >
> > > RE transaction boundaries:
> > >
> > > > First, the offset commit interval is currently 60 seconds by default,
> > and
> > > source connectors can produce a lot of records in that time. Until
> > someone
> > > operating a Connect cluster changed that to a much lower value, it's
> > > possible that source connector performance could be significantly
> > impacted.
> > > ...
> > > > At first blush,
> > > controlling transaction boundaries based upon batches seems like it would
> > > work better for high throughput connectors.
> > >
> > > Sorry, what's the concern about performance here? Write throughput for a
> > > transactional producer doesn't suffer as the number of records per
> > > transaction increases. I'm also worried about source connectors that
> > return
> > > small batches with high frequency; those would not fare well with a lower
> > > offset commit interval.
> > >
> >
> > Recall that with Kafka transactions, a consumer in read_committed mode
> > buffers messages that are part of a transaction until that transaction is
> > committed or aborted. When a consumer sees an aborted transaction marker
> > for that transaction, the consumer discards all buffered messages
> > associated with that transaction. When a consumer sees a commit transaction
> > marker, it then forwards those buffered messages that are associated with
> > that transaction to its application. Note the transaction markers and
> > buffered messages are per topic partition.
> >
> > So I have two concerns with using time-based transaction boundaries.
> >
> > The first is easier to understand: consumers using read_committed mode may
> > consume a lot of memory while buffering records that are part of active
> > transactions. In fact, the memory required is a function of the number of
> > records in the transaction in the assigned topic partitions. However, the
> > only way a Connect user can control the number of records in each
> > transaction is by reducing the offset commit interval used for _all
> > connectors_. This means that the Connect user cannot directly limit the
> > size of the transaction (and therefore the buffering memory required by the
> > consumers) but can only limit the maximum duration of the transactions.
> > This seems challenging at best, because the size of the transaction is a
> > function of the throughput and the offset commit interval.
> >
> > My second concern is that applications consuming the topics to which a
> > source connector writes will perceive those writes as having a potentially
> > very high lag w/r/t the connector first seeing in the source system the
> > information for those records. The best case is that the source connector
> > discovers some information, generates a source record and hands it to
> > Connect, and Connect writes that record to the topic just before the offset
> > commit window closes and commits the transaction. A worse case is that the
> > source connector gives the record to Connect just after the offset commit
> > window closes (and the transaction is closed), and the transaction with
> > that record will not be committed for another commit interval. This means
> > the worst case *perceived lag* could be at least the offset commit
> > interval.
> >
> > So if we agree that it will be difficult for some Connect users to choose a
> > one-size-fits-all offset commit interval to work well for source connectors
> > with a range of throughputs and different consumer application
> > requirements, we may need to consider the ability for each connector to
> > control the transaction boundaries, albeit still somewhat indirectly. The
> > question then becomes how to specify the boundaries. Record counts alone
> > are insufficient, since a transaction could be 1 record short for some
> > time, resulting in the entire transaction timing out. Time/duration is also
> > not sufficient, since for low perceived lag this should be set as small as
> > is feasible (and thus poor for high throughput).
> >
> > Using a separate transaction for each batch seems like a very worthwhile
> > compromise. First, it reuses something that connector implementations
> > already employ, and which many connectors have configs to tune the behavior
> > to maximize throughput. Second, it requires no additional configuration
> > properties or connector-specific behavior. Third, high-throughput
> > connectors are likely returning large batches, so write amplification for
> > the extra offset record per source partition is likely to have a much
> > smaller impact (assuming that most high throughput connectors have high
> > records-per-source-partition ratios). Likewise, low-throughput connectors
> > will either have very small or infrequent batches that will easily handle
> > the higher write amplification (up to 2x: one offset record for every
> > record). Regardless of the throughput, infrequent batches would have higher
> > lag anyway even without EOS, and EOS transactions coupled to those batches
> > would add minimum overhead.
> >
> >
> > >
> > > > It's true we already have
> > > that constraint now, but transactions and more importantly transaction
> > size
> > > and latency add a completely different aspect to the calculation of how
> > to
> > > set the offset commit interval for a Connect cluster.
> > > ...
> > > > If we're committing transactions every 60 seconds (or even as
> > frequently
> > > as
> > > every 5 seconds), then the _perceived lag_ will be significantly higher
> > > with transactions than without.
> > >
> > > That's a fair point. High-throughput connectors are likely to benefit
> > from
> > > higher commit intervals so that they have to pause to commit offsets, and
> > > produce offset records, less frequently. Low-latency connectors are
> > likely
> > > to benefit from lower commit intervals in order to shorten the time
> > between
> > > source records being produced to Kafka and their transactions being
> > > committed. There's no reason to assume that there's a one-size-fits-all
> > > offset commit interval for an entire cluster once offset commit becomes a
> > > requirement for records to be visible to downstream consumers.
> > >
> > > > Have you considered having the worker create a separate transaction for
> > > each batch of records returned by the connector?
> > >
> > > I had not considered this, but for reasons outlined above regarding
> > > performance with high-throughput connectors, I'm hesitant to put it into
> > > play by default.
> > >
> > > Given the conflicting requirements of high-throughput and low-latency
> > > connectors with regards to offset commit, I do agree that for some
> > > connectors, the most useful default behavior would be to commit offsets
> > as
> > > soon as possible, and committing once for every batch returned from
> > > "SourceTask::poll" is a great way to allow that.
> > >
> > > The most straightforward way to accommodate these conflicting cases seems
> > > to be to just go ahead and allow per-connector offset commit intervals.
> > > That way, the user gets to define an approximate upper bound on the time
> > > between a record being produced and its transaction being committed--and
> > if
> > > they want the upper bound to be zero, they can just configure their
> > > connector with an offset commit interval of zero. Thoughts?
> > >
> >
> > Yes, per-connector offset commit intervals is one approach that would be
> > more explicit, though see my concerns earlier in this email about
> > controlling the number of records in a transaction by a time-based config,
> > even if set on a per-connector basis.
> >
> >
> > >
> > > > It is true that low-throughput
> > > connectors would result in higher write amplification (worst case being
> > 2x
> > > when each batch returns a single record), but for low-throughput
> > connectors
> > > this seems like this might be an acceptable tradeoff if EOS is really
> > > needed, or if not then EOS can be disabled for this connector.
> > >
> > > Just to be clear--disabling exactly-once support for a connector is
> > > technically an option but would require setting up a dedicated
> > > "non-exactly-once" Connect cluster, since exactly-once will only be
> > > configurable on a per-worker level and not a per-connector level.
> > >
> >
> > I'm not sure that setting up a separate Connect cluster is that practical
> > for many Connect users, especially in larger organizations where one group
> > manages the cluster and others manage connectors.
> >
> >
> > > RE other comments:
> > >
> > > > This seems to contradict the earlier quote. Specifically, the first
> > quote
> > > above states that transaction boundaries are dictated by offset flushes,
> > > which is controlled by the `offset.flush.interval.ms` property. However,
> > > the second quote says the `offset.flush.timeout.ms` property will be
> > > ignored for EOS source tasks. I must be missing something.
> > >
> > > One of these discusses the offset commit interval property (
> > >
> > >
> > https://kafka.apache.org/27/documentation.html#connectconfigs_offset.flush.interval.ms
> > > ),
> > > and the other discusses the offset commit timeout property (
> > >
> > >
> > https://kafka.apache.org/27/documentation.html#connectconfigs_offset.flush.timeout.ms
> > > ).
> > > They are separate properties and control separate things; hope this
> > helps.
> > >
> > >
> > Okay, that makes sense. But boy is that subtle.
> >
> >
> > > > Does it seem realistic or practical to suggest to reduce the source
> > > connector's throughput?
> > >
> > > I believe so, yes. Obviously this option should be reserved for last, but
> > > since the penalty for failure here is that the task is rendered
> > completely
> > > unusable, a slower task seems preferable to a failed one. I've updated
> > this
> > > section to both add two more alternatives (of tuning the producer for
> > > higher throughput and using a smaller offset commit interval) and to
> > > reorder things in descending order of preference.
> > >
> >
> > Reducing throughput is a technical solution, especially if the only
> > alternative is that the task doesn't run. But IMO it's an impractical
> > technical solution as most users will want a real-time solution, and
> > decreasing connector's throughput is the opposite of this.
> >
> > The KIP says: "It may take longer than the transaction timeout for a task
> > to flush all of its records to Kafka." Might another option be to reduce
> > the number of records in the transaction?
> > Is it possible to eliminate transaction timeouts by reducing the size of
> > the transactions?
> >
> > >
> > > I should also make it clear that this should occur very rarely in a setup
> > > with a reasonably-high transaction timeout. Today, if a task's producer
> > is
> > > unable to keep up with the throughput of the records generated by the
> > task,
> > > this leads to a situation where it becomes impossible to commit offsets
> > for
> > > that task (due to https://issues.apache.org/jira/browse/KAFKA-12226); in
> > > practice, I have never seen this happen with a healthy Kafka cluster and
> > a
> > > sufficiently-provisioned (enough tasks, uncrowded workers, etc.)
> > connector.
> > >
> > > > Do you mean to use "task A" and "task B" here? Do they imply tasks from
> > > the
> > > same connector?
> > >
> > > The scenario outlined here applies for both cases: the tasks could be
> > from
> > > the same connector, or from different connectors.
> > >
> > > > If so, then won't the offsets from both of those tasks
> > > still be written to the same connector-specific offset topic, and suffer
> > > from the potential blocking issue mentioned above? While it's useful to
> > > call this out, I'm not sure how that example helps support the motivation
> > > for a separate per-connector offset topic. OTOH, if these were tasks from
> > > _different_ connectors, then it becomes clear that the offsets from one
> > > source connector using a connector-specific offsets topic will never
> > block
> > > the offsets from another connector using a different connector-specific
> > > offsets topic. Thus, having each connector use separate
> > connector-specific
> > > offset topics at least avoids the problem of one connector's tasks
> > blocking
> > > the tasks of the other connector just because of transaction commit
> > timeout
> > > issues.
> > >
> > > I believe this is already touched on in the third paragraph of the
> > > "Per-connector offsets topics"/"Motivation" section:
> > >
> > > > This way, although tasks of the same connector may still interfere with
> > > each other, they will at least not interfere with tasks of other
> > > connectors.
> >
> >
> > > > It's implied but not explicitly stated that this will result in
> > > duplicating
> > > the offsets for EOS source connectors: the workers will continue to track
> > > all source offsets in its own offsets topic, and EOS-enabled source
> > > connectors will also track their own offsets in connector-specific
> > offsets
> > > topics. It might be worth making that more obvious, and which will be
> > used
> > > upon connector restarts.
> > >
> > > I believe the duplication is already touched on in the "Offset (and
> > record)
> > > writes" section:
> > >
> > > > Once an offset commit is complete, if the connector is (implicitly or
> > > explicitly) configured with a separate offsets topic, the committed
> > offsets
> > > will also be written to the worker’s global offsets topic using a
> > > non-transactional producer and the worker’s principal.
> > >
> > > And the precedence of the various topics is described in detail in the
> > > "Per-connector offsets topics"/"Smooth migration" section (TL;DR: if an
> > > offset is present in a connector's separate offsets topic, it's used; if
> > > it's not found, then the global offsets topic is searched as a fallback).
> > >
> > > > Are you suggesting that the source connector and task implementations
> > be
> > > responsible for this? Or are you suggesting that the worker will be
> > > responsible for creating the offsets topics on behalf of source connector
> > > and task implementations (using an admin client per the configurations),
> > > just like the worker currently does so for the cluster's offsets topic?
> > > Please clarify this in the KIP.
> > >
> > > Sure, done.
> > >
> > > > Do you think it's worthwhile adding a parenthetical clause after this
> > > statement that says "(either `exactly.once.source.enabled=false` or an
> > > older Connect worker that does not have EOS support for source
> > > connectors)"? Yes, those are called out above, but IMO this would help
> > > clarify exactly what this paragraph means for operators -- and how it is
> > > not a burden for them to avoid this condition.
> > >
> > > Sure, done.
> > >
> > > > In the "Connector principal permissions" section, the "IdempotentWrite"
> > > row
> > > says "Kafka cluster targeted by the Connect cluster" for the resource
> > name
> > > column. Is this right? Should that instead be "Kafka cluster to which the
> > > Connector writes"?
> > >
> > > Good catch, done.
> > >
> > > > In the "Rolling upgrade(s)" section, should the first and second
> > rolling
> > > upgrades be completed before enabling EOS on any source connector? If so,
> > > should this section call this out in more detail?
> > >
> > > I'm not sure I understand the question? Exactly-once source support is
> > only
> > > configurable on a per-worker basis and, when enabled, will apply to all
> > > connectors. So the steps provided in the "Rolling upgrade(s)" section are
> > > sufficient to get every connector on your cluster running with
> > exactly-once
> > > enabled, and there is no need or way to enable exactly-once on a
> > > per-connector basis.
> > >
> > > > How does this work with a rolling "downgrade"? The "Heterogeneous
> > > clusters"
> > > subsection of the "Permitted failure scenarios" section talks about (I
> > > think) how setting `exactly.once.source.enabled=false` for one worker
> > > effectively breaks the EOS delivery guarantees for the whole Connect
> > > cluster. If that is the case, how does this rolling downgrade work? Would
> > > it simply require that all source connectors configured to use EOS be
> > > changed to not use EOS, and then to do a rolling downgrade of the
> > worker's
> > > configuration to disable EOS? If this is the case, let's be more explicit
> > > in this section on downgrading.
> > > > In the "Soft downgrade" section, it sounds like this is effectively
> > not a
> > > rolling downgrade. Nothing in the "Downgrades" section really implies
> > that
> > > rolling downgrades are not supported, but at the same time it's not
> > > terribly obvious.
> > > Overall, it might be worth considering (if you haven't already) modifying
> > > the Downgrade section to be a bit more prescriptive in terms of steps
> > that
> > > should be taken.
> > >
> > > I've touched up the sections that touch on downgrades, to make it clear
> > how
> > > to safely perform a rolling downgrade for both "hard" and "soft"
> > > downgrades, and what the consequences of performing an unsafe rolling
> > > "hard" downgrade are and how to deal with the consequences. Hopefully
> > this,
> > > coupled with the knowledge that exactly-once is not going to be
> > > configurable on a per-connector basis, should make things clear.
> > >
> > > > Finally, do we need to expose any metrics that are related to maybe the
> > > average number of records in each transaction? Or should we rely solely
> > > upon existing producer metrics?
> > >
> > > That's a great point--after perusing
> > > https://kafka.apache.org/28/documentation.html#producer_monitoring it
> > > doesn't look like there are any metrics on producer transaction size.
> > I've
> > > updated the KIP to add source task metrics for minimum, maximum, and
> > > average transaction size. I think that this, in conjunction with existing
> > > metrics for poll time, commit time, batch size (per call to
> > > "SourceTask::poll"), and active record count, should cover our bases.
> > >
> > >
> > Thanks again!
> >
> > Randall
> >

Reply via email to