Hi Randall,

Thanks for clarifying the issues with large transactions. I think we're
starting to converge on an understanding of the problem space, if not quite
on an approach to tackle it.

I think my biggest issue with defining transaction boundaries on a
per-task-poll basis are these points that you make:

> First, it reuses something that connector implementations
already employ, and which many connectors have configs to tune the behavior
to maximize throughput. Second, it requires no additional configuration
properties or connector-specific behavior. Third, high-throughput
connectors are likely returning large batches, so write amplification for
the extra offset record per source partition is likely to have a much
smaller impact (assuming that most high throughput connectors have high
records-per-source-partition ratios).

I don't believe the first and third points are safe assumptions to make.
There's actually very little, if any, performance benefit to writing source
connectors whose tasks give Connect larger batches (at least, not as far as
the framework logic goes). Every record is sequentially transformed,
converted, and dispatched to the producer, regardless of whether it came
from a batch of one or one million. So Connect does have the capability
right now to support high-throughput, small-batch connectors.

For a concrete example, Confluent's Datagen connector (
https://github.com/confluentinc/kafka-connect-datagen), which is used to
produce sample data for quickstarts and demos, performs just fine even
though it does absolutely no batching whatsoever and returns only a single
record per call to "SourceTask::Poll". In some not-super-rigorous
performance testing, I ran the connector three times against a local build
of the 2.8.0 release of Connect, then modified the 2.8.0 release of Connect
to perform a producer flush for every task-provided batch of records, then
ran the connector three times against that. Each test run produced exactly
one million records. The worst run out of the initial cases (against the
unmodified local 2.8.0 build) took 15 seconds to complete. The best run out
of the subsequent cases (against the modified local 2.8.0 build) took 2
minutes and 44 seconds, or 164 seconds--over a 10x slowdown. And this was
against a single local broker with no replication factor.

Of course, if it's a matter of accommodating this one (demo-oriented)
connector, then it might still be worth it to put the onus on the
developers of that connector to modify it appropriately to work with
exactly-once support. But my point with this connector is more
general--simply put, there don't appear to be safe grounds for the
assumption that source tasks must produce large batches in order to achieve
high throughput.

> Yes, per-connector offset commit intervals is one approach that would be
more explicit, though see my concerns earlier in this email about
controlling the number of records in a transaction by a time-based config,
even if set on a per-connector basis.

I'll try to summarize the concerns presented; let me know if I'm missing
something:

1. Large transaction sizes can inflate the memory requirements for
consumers and possibly even overwhelm them with out-of-memory errors.
2. High offset commit intervals can inflate the read latency of downstream
applications.

Combining both of these concerns with the high-throughput-small-batch
scenario, it still seems worthwhile to provide users with a way to do some
multi-batch transactions for their source tasks. This is analogous to
consumer-side buffering; yes, by default you probably want events to be
available downstream as soon as possible, but in some cases it's still
necessary to introduce a small hit in latency in order to keep up with high
throughput. And if each batch is relatively small, then heightened memory
requirements for consumers shouldn't really be a problem.

And for other scenarios, with per-connector offset commit intervals, users
can always just set the interval to zero to get exactly the behavior that
you're describing.

> I'm not sure that setting up a separate Connect cluster is that practical
for many Connect users, especially in larger organizations where one group
manages the cluster and others manage connectors.

I agree; I was mostly responding to what I perceived as the assumption that
exactly-once support could be configured on a per-connector basis, which
hopefully we've clarified by now is not the case.

> Reducing throughput is a technical solution, especially if the only
alternative is that the task doesn't run. But IMO it's an impractical
technical solution as most users will want a real-time solution, and
decreasing connector's throughput is the opposite of this.

Are you proposing that this option not be provided to users? I'm afraid if
we don't do that then it'll just become an undocumented workaround that a
handful of people are aware of (or, worse yet, have to discover for
themselves), but most people aren't, even if they would benefit from
hearing it. I don't think we lose out on much by providing this as a
temporary solution to people who are in a pinch and just need to get data
flowing again, especially if they don't have the capability to run their
connector without exactly-once support.

> The KIP says: "It may take longer than the transaction timeout for a task
to flush all of its records to Kafka." Might another option be to reduce
the number of records in the transaction?
Is it possible to eliminate transaction timeouts by reducing the size of
the transactions?

Yes, that is an option. This can be accomplished either by reducing the
offset commit interval (already noted in the KIP) or, if the offset commit
interval is already zero, reconfiguring the connector to produce smaller
batches.



Considering the points that have been raised about transaction boundaries,
I wonder if we can agree on enabling per-poll offset boundaries by default
but providing users with the option to override this. This way, we get the
lowest-latency, lowest-memory-footprint settings out of the box, but we
don't force connector developers to switch to a batching model that has not
been implicitly or explicitly required by Connect for high throughput, in
order to play nicely with exactly-once support.

Cheers,

Chris

On Mon, May 17, 2021 at 7:24 PM Randall Hauch <rha...@gmail.com> wrote:

> On Mon, May 10, 2021 at 10:26 AM Chris Egerton <chr...@confluent.io.invalid
> >
> wrote:
>
> > RE transaction boundaries:
> >
> > > First, the offset commit interval is currently 60 seconds by default,
> and
> > source connectors can produce a lot of records in that time. Until
> someone
> > operating a Connect cluster changed that to a much lower value, it's
> > possible that source connector performance could be significantly
> impacted.
> > ...
> > > At first blush,
> > controlling transaction boundaries based upon batches seems like it would
> > work better for high throughput connectors.
> >
> > Sorry, what's the concern about performance here? Write throughput for a
> > transactional producer doesn't suffer as the number of records per
> > transaction increases. I'm also worried about source connectors that
> return
> > small batches with high frequency; those would not fare well with a lower
> > offset commit interval.
> >
>
> Recall that with Kafka transactions, a consumer in read_committed mode
> buffers messages that are part of a transaction until that transaction is
> committed or aborted. When a consumer sees an aborted transaction marker
> for that transaction, the consumer discards all buffered messages
> associated with that transaction. When a consumer sees a commit transaction
> marker, it then forwards those buffered messages that are associated with
> that transaction to its application. Note the transaction markers and
> buffered messages are per topic partition.
>
> So I have two concerns with using time-based transaction boundaries.
>
> The first is easier to understand: consumers using read_committed mode may
> consume a lot of memory while buffering records that are part of active
> transactions. In fact, the memory required is a function of the number of
> records in the transaction in the assigned topic partitions. However, the
> only way a Connect user can control the number of records in each
> transaction is by reducing the offset commit interval used for _all
> connectors_. This means that the Connect user cannot directly limit the
> size of the transaction (and therefore the buffering memory required by the
> consumers) but can only limit the maximum duration of the transactions.
> This seems challenging at best, because the size of the transaction is a
> function of the throughput and the offset commit interval.
>
> My second concern is that applications consuming the topics to which a
> source connector writes will perceive those writes as having a potentially
> very high lag w/r/t the connector first seeing in the source system the
> information for those records. The best case is that the source connector
> discovers some information, generates a source record and hands it to
> Connect, and Connect writes that record to the topic just before the offset
> commit window closes and commits the transaction. A worse case is that the
> source connector gives the record to Connect just after the offset commit
> window closes (and the transaction is closed), and the transaction with
> that record will not be committed for another commit interval. This means
> the worst case *perceived lag* could be at least the offset commit
> interval.
>
> So if we agree that it will be difficult for some Connect users to choose a
> one-size-fits-all offset commit interval to work well for source connectors
> with a range of throughputs and different consumer application
> requirements, we may need to consider the ability for each connector to
> control the transaction boundaries, albeit still somewhat indirectly. The
> question then becomes how to specify the boundaries. Record counts alone
> are insufficient, since a transaction could be 1 record short for some
> time, resulting in the entire transaction timing out. Time/duration is also
> not sufficient, since for low perceived lag this should be set as small as
> is feasible (and thus poor for high throughput).
>
> Using a separate transaction for each batch seems like a very worthwhile
> compromise. First, it reuses something that connector implementations
> already employ, and which many connectors have configs to tune the behavior
> to maximize throughput. Second, it requires no additional configuration
> properties or connector-specific behavior. Third, high-throughput
> connectors are likely returning large batches, so write amplification for
> the extra offset record per source partition is likely to have a much
> smaller impact (assuming that most high throughput connectors have high
> records-per-source-partition ratios). Likewise, low-throughput connectors
> will either have very small or infrequent batches that will easily handle
> the higher write amplification (up to 2x: one offset record for every
> record). Regardless of the throughput, infrequent batches would have higher
> lag anyway even without EOS, and EOS transactions coupled to those batches
> would add minimum overhead.
>
>
> >
> > > It's true we already have
> > that constraint now, but transactions and more importantly transaction
> size
> > and latency add a completely different aspect to the calculation of how
> to
> > set the offset commit interval for a Connect cluster.
> > ...
> > > If we're committing transactions every 60 seconds (or even as
> frequently
> > as
> > every 5 seconds), then the _perceived lag_ will be significantly higher
> > with transactions than without.
> >
> > That's a fair point. High-throughput connectors are likely to benefit
> from
> > higher commit intervals so that they have to pause to commit offsets, and
> > produce offset records, less frequently. Low-latency connectors are
> likely
> > to benefit from lower commit intervals in order to shorten the time
> between
> > source records being produced to Kafka and their transactions being
> > committed. There's no reason to assume that there's a one-size-fits-all
> > offset commit interval for an entire cluster once offset commit becomes a
> > requirement for records to be visible to downstream consumers.
> >
> > > Have you considered having the worker create a separate transaction for
> > each batch of records returned by the connector?
> >
> > I had not considered this, but for reasons outlined above regarding
> > performance with high-throughput connectors, I'm hesitant to put it into
> > play by default.
> >
> > Given the conflicting requirements of high-throughput and low-latency
> > connectors with regards to offset commit, I do agree that for some
> > connectors, the most useful default behavior would be to commit offsets
> as
> > soon as possible, and committing once for every batch returned from
> > "SourceTask::poll" is a great way to allow that.
> >
> > The most straightforward way to accommodate these conflicting cases seems
> > to be to just go ahead and allow per-connector offset commit intervals.
> > That way, the user gets to define an approximate upper bound on the time
> > between a record being produced and its transaction being committed--and
> if
> > they want the upper bound to be zero, they can just configure their
> > connector with an offset commit interval of zero. Thoughts?
> >
>
> Yes, per-connector offset commit intervals is one approach that would be
> more explicit, though see my concerns earlier in this email about
> controlling the number of records in a transaction by a time-based config,
> even if set on a per-connector basis.
>
>
> >
> > > It is true that low-throughput
> > connectors would result in higher write amplification (worst case being
> 2x
> > when each batch returns a single record), but for low-throughput
> connectors
> > this seems like this might be an acceptable tradeoff if EOS is really
> > needed, or if not then EOS can be disabled for this connector.
> >
> > Just to be clear--disabling exactly-once support for a connector is
> > technically an option but would require setting up a dedicated
> > "non-exactly-once" Connect cluster, since exactly-once will only be
> > configurable on a per-worker level and not a per-connector level.
> >
>
> I'm not sure that setting up a separate Connect cluster is that practical
> for many Connect users, especially in larger organizations where one group
> manages the cluster and others manage connectors.
>
>
> > RE other comments:
> >
> > > This seems to contradict the earlier quote. Specifically, the first
> quote
> > above states that transaction boundaries are dictated by offset flushes,
> > which is controlled by the `offset.flush.interval.ms` property. However,
> > the second quote says the `offset.flush.timeout.ms` property will be
> > ignored for EOS source tasks. I must be missing something.
> >
> > One of these discusses the offset commit interval property (
> >
> >
> https://kafka.apache.org/27/documentation.html#connectconfigs_offset.flush.interval.ms
> > ),
> > and the other discusses the offset commit timeout property (
> >
> >
> https://kafka.apache.org/27/documentation.html#connectconfigs_offset.flush.timeout.ms
> > ).
> > They are separate properties and control separate things; hope this
> helps.
> >
> >
> Okay, that makes sense. But boy is that subtle.
>
>
> > > Does it seem realistic or practical to suggest to reduce the source
> > connector's throughput?
> >
> > I believe so, yes. Obviously this option should be reserved for last, but
> > since the penalty for failure here is that the task is rendered
> completely
> > unusable, a slower task seems preferable to a failed one. I've updated
> this
> > section to both add two more alternatives (of tuning the producer for
> > higher throughput and using a smaller offset commit interval) and to
> > reorder things in descending order of preference.
> >
>
> Reducing throughput is a technical solution, especially if the only
> alternative is that the task doesn't run. But IMO it's an impractical
> technical solution as most users will want a real-time solution, and
> decreasing connector's throughput is the opposite of this.
>
> The KIP says: "It may take longer than the transaction timeout for a task
> to flush all of its records to Kafka." Might another option be to reduce
> the number of records in the transaction?
> Is it possible to eliminate transaction timeouts by reducing the size of
> the transactions?
>
> >
> > I should also make it clear that this should occur very rarely in a setup
> > with a reasonably-high transaction timeout. Today, if a task's producer
> is
> > unable to keep up with the throughput of the records generated by the
> task,
> > this leads to a situation where it becomes impossible to commit offsets
> for
> > that task (due to https://issues.apache.org/jira/browse/KAFKA-12226); in
> > practice, I have never seen this happen with a healthy Kafka cluster and
> a
> > sufficiently-provisioned (enough tasks, uncrowded workers, etc.)
> connector.
> >
> > > Do you mean to use "task A" and "task B" here? Do they imply tasks from
> > the
> > same connector?
> >
> > The scenario outlined here applies for both cases: the tasks could be
> from
> > the same connector, or from different connectors.
> >
> > > If so, then won't the offsets from both of those tasks
> > still be written to the same connector-specific offset topic, and suffer
> > from the potential blocking issue mentioned above? While it's useful to
> > call this out, I'm not sure how that example helps support the motivation
> > for a separate per-connector offset topic. OTOH, if these were tasks from
> > _different_ connectors, then it becomes clear that the offsets from one
> > source connector using a connector-specific offsets topic will never
> block
> > the offsets from another connector using a different connector-specific
> > offsets topic. Thus, having each connector use separate
> connector-specific
> > offset topics at least avoids the problem of one connector's tasks
> blocking
> > the tasks of the other connector just because of transaction commit
> timeout
> > issues.
> >
> > I believe this is already touched on in the third paragraph of the
> > "Per-connector offsets topics"/"Motivation" section:
> >
> > > This way, although tasks of the same connector may still interfere with
> > each other, they will at least not interfere with tasks of other
> > connectors.
>
>
> > > It's implied but not explicitly stated that this will result in
> > duplicating
> > the offsets for EOS source connectors: the workers will continue to track
> > all source offsets in its own offsets topic, and EOS-enabled source
> > connectors will also track their own offsets in connector-specific
> offsets
> > topics. It might be worth making that more obvious, and which will be
> used
> > upon connector restarts.
> >
> > I believe the duplication is already touched on in the "Offset (and
> record)
> > writes" section:
> >
> > > Once an offset commit is complete, if the connector is (implicitly or
> > explicitly) configured with a separate offsets topic, the committed
> offsets
> > will also be written to the worker’s global offsets topic using a
> > non-transactional producer and the worker’s principal.
> >
> > And the precedence of the various topics is described in detail in the
> > "Per-connector offsets topics"/"Smooth migration" section (TL;DR: if an
> > offset is present in a connector's separate offsets topic, it's used; if
> > it's not found, then the global offsets topic is searched as a fallback).
> >
> > > Are you suggesting that the source connector and task implementations
> be
> > responsible for this? Or are you suggesting that the worker will be
> > responsible for creating the offsets topics on behalf of source connector
> > and task implementations (using an admin client per the configurations),
> > just like the worker currently does so for the cluster's offsets topic?
> > Please clarify this in the KIP.
> >
> > Sure, done.
> >
> > > Do you think it's worthwhile adding a parenthetical clause after this
> > statement that says "(either `exactly.once.source.enabled=false` or an
> > older Connect worker that does not have EOS support for source
> > connectors)"? Yes, those are called out above, but IMO this would help
> > clarify exactly what this paragraph means for operators -- and how it is
> > not a burden for them to avoid this condition.
> >
> > Sure, done.
> >
> > > In the "Connector principal permissions" section, the "IdempotentWrite"
> > row
> > says "Kafka cluster targeted by the Connect cluster" for the resource
> name
> > column. Is this right? Should that instead be "Kafka cluster to which the
> > Connector writes"?
> >
> > Good catch, done.
> >
> > > In the "Rolling upgrade(s)" section, should the first and second
> rolling
> > upgrades be completed before enabling EOS on any source connector? If so,
> > should this section call this out in more detail?
> >
> > I'm not sure I understand the question? Exactly-once source support is
> only
> > configurable on a per-worker basis and, when enabled, will apply to all
> > connectors. So the steps provided in the "Rolling upgrade(s)" section are
> > sufficient to get every connector on your cluster running with
> exactly-once
> > enabled, and there is no need or way to enable exactly-once on a
> > per-connector basis.
> >
> > > How does this work with a rolling "downgrade"? The "Heterogeneous
> > clusters"
> > subsection of the "Permitted failure scenarios" section talks about (I
> > think) how setting `exactly.once.source.enabled=false` for one worker
> > effectively breaks the EOS delivery guarantees for the whole Connect
> > cluster. If that is the case, how does this rolling downgrade work? Would
> > it simply require that all source connectors configured to use EOS be
> > changed to not use EOS, and then to do a rolling downgrade of the
> worker's
> > configuration to disable EOS? If this is the case, let's be more explicit
> > in this section on downgrading.
> > > In the "Soft downgrade" section, it sounds like this is effectively
> not a
> > rolling downgrade. Nothing in the "Downgrades" section really implies
> that
> > rolling downgrades are not supported, but at the same time it's not
> > terribly obvious.
> > Overall, it might be worth considering (if you haven't already) modifying
> > the Downgrade section to be a bit more prescriptive in terms of steps
> that
> > should be taken.
> >
> > I've touched up the sections that touch on downgrades, to make it clear
> how
> > to safely perform a rolling downgrade for both "hard" and "soft"
> > downgrades, and what the consequences of performing an unsafe rolling
> > "hard" downgrade are and how to deal with the consequences. Hopefully
> this,
> > coupled with the knowledge that exactly-once is not going to be
> > configurable on a per-connector basis, should make things clear.
> >
> > > Finally, do we need to expose any metrics that are related to maybe the
> > average number of records in each transaction? Or should we rely solely
> > upon existing producer metrics?
> >
> > That's a great point--after perusing
> > https://kafka.apache.org/28/documentation.html#producer_monitoring it
> > doesn't look like there are any metrics on producer transaction size.
> I've
> > updated the KIP to add source task metrics for minimum, maximum, and
> > average transaction size. I think that this, in conjunction with existing
> > metrics for poll time, commit time, batch size (per call to
> > "SourceTask::poll"), and active record count, should cover our bases.
> >
> >
> Thanks again!
>
> Randall
>

Reply via email to