Hi,

my two cents on this. While it would perfectly possible to use consumer group in KafkaIO, it has its own issues. The most visible would be, that using subscriptions might introduce unnecessary duplicates in downstream processing. The reason for this is that consumer in a consumer group might be reassigned partitions and/or being reset to a different offset based on conditions that are out of control of the consumer itself. This might lead to an inability to successfully commit offset of a bundle after it has been sent downstream, while the processed-but-not-yet-committed input element might be reprocessed by a different worker due to partition rebalance. This add unnecessary complexity with questionable benefits (observability of lag in a consumer group and possible automatic discovery of new partitions in a topic).

For these reasons I'd say, that it would be possible to introduce a different (e.g. KafkaConsumerGroupIO) IO, which could be added to Beam itself or (perhaps) some extension, but it makes little sense to introduce this into KafkaIO itself.

Hope this helps,

 Jan

On 10/18/23 05:49, Shaojie Wu wrote:
Can't agree with Shahar Frank more

On 2023/04/19 18:17:15 Shahar Frank wrote:
Hi Daniel,

I think I've already answered these in a previous email but let me answer
them again.

I was specifically responding to quoted points from your last email. I
really don't understand why you, as a user, care if the implementation of
the framework is using consumer groups or not as long as it has the
throughput you need and is correct. If there is something specific this
would be useful for, like monitoring or metrics, it seems a reasonable
feature request to me to ask to reflect the progress state in a kafka
consumer group, but not to use the underlying assignment mechanism for the
reasons stated above.

I do care for a couple of reasons:
1) Introducing risk with a technology that non one knows in the company vs.
a technology people know and trust (i.e. Kafka Consumer Groups)
2) A multitude of alerting, monitoring and other observability tools that
are using consumer groups will not be usable and new solutions would be
required
3) Expert knowhow on managing (and sometimes fixing) issues with Kafka in
the company will become useless - and this in turn introduces risk to
projects

If you want to run in a single machine application mode, you can try
setting the `flinkMaster` parameter to `[local]`, which should launch an
inline flink runner just for your pipeline. If you want to have a scaling
out cluster per-application, you can launch a repeatable flink cluster with
kubernetes on a per-application basis pretty easily.

I do agree that a Flink cluster is a great solution and have maintained a
few in my time.
Sadly in our use case I have to consider constraints set by security and
platform teams and that will take time.
By the time these follow through it is very likely that the decision to use
Beam would have fallen in favor of other solutions (i.e. Kafka Streams,
Camel and others) and this would be a shame in my view. It is very unlikely
that once taken this decision would be reversed for a long time.

Given that a Flink cluster is not an option for me at this point I have
been trying to push a solution where instances of a Beam pipeline are run
"disconnected" using a DIrectRunner or (better even) a FlinkRunner in local
standalone mode (as you're suggesting) - and like you suggested we are
running those using a K8s deployment which allows us to scale up a required.
The issue is if more than one pod attempts to run the pipeline - they will
not split the partitions between them but rather each would consume ALL
partitions and the output would include as many duplications as the number
of pods. This solution will then not be able to scale up horizontally.

That is exactly why I'm trying to suggest using consumer groups.
In this attempt I created - here
<https://github.com/srfrnk/demo/blob/cg-reader/beam-kafka2file/src/main/java/kafka/Read.java>
-  I've already shown it is possible (albeit I admit with limitations such
as you described) to use consumer groups and effectively allow our use case
to run on a scaled up K8s deployment of DirectRunners.

And again finally my question is why should Kafka be treated differently
from other messaging systems like SQS and PubSub for which it seems Beam
does not attempt to manage the distribution strategy as well the mechanism
for managing processed (committed) messages?

If Beam is able to perform as well with them managing these couldn't the
same be applied to Kafka?

Cheers,
Shahar.

------------------------------

Shahar Frank

srf...@gmail.com

+447799561438

------------------------------





On Wed, 19 Apr 2023 at 13:19, Daniel Collins <dpcoll...@google.com> wrote:

Hello,

I was specifically responding to quoted points from your last email. I
really don't understand why you, as a user, care if the implementation of
the framework is using consumer groups or not as long as it has the
throughput you need and is correct. If there is something specific this
would be useful for, like monitoring or metrics, it seems a reasonable
feature request to me to ask to reflect the progress state in a kafka
consumer group, but not to use the underlying assignment mechanism for the
reasons stated above.

Per: "why Beam should recommend using a distributed processing framework"

If you want to run in a single machine application mode, you can try
setting the `flinkMaster` parameter to `[local]`, which should launch an
inline flink runner just for your pipeline. If you want to have a scaling
out cluster per-application, you can launch a repeatable flink cluster with
kubernetes on a per-application basis pretty easily.

-Daniel

On Wed, Apr 19, 2023 at 8:11 AM Shahar Frank <srf...@gmail.com> wrote:

Hi Daniel,

I think you missed my last email that deals exactly with what you just
commented.

I can send it again if you can't find it

Shahar.


On Wed, Apr 19, 2023, 13:07 Daniel Collins <dpcoll...@google.com> wrote:

The real question I feel is why should there not be an option with
Beam to use a recommended best practice (by another apache project in fact)
when connecting to an external system?

You ignored my previous answer. This is not a "best practice" for
streaming frameworks, only for applications, and is not used by other
frameworks *including kafka streams*.

The same as when connecting to SQS and PubSub should also be
implemented with Kafka I think.

This makes it much harder to implement a consistent watermark and
involves either expensive secondary processing (Pub/Sub has a whole second
tracking subscription) or incorrectness of the watermark bounds when
reading from backlog. This makes windowing more likely to be incorrect.

the user should be allowed the option of using a mechanism that is
part of the system being connected if willing to accept the implication it
has.

This requires a complete rewrite of how KafkaIO operates, so its not as
easy as "flip a switch".

  And then the real question behind that would be - is there anything
preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?

You haven't answered my original question: Why do you care if it uses
consumer groups or not?

On Wed, Apr 19, 2023 at 3:46 AM Shahar Frank <srf...@gmail.com> wrote:

Thanks Kenn,

I agree with your comments fully.
I would rather use a Flink cluster even with the simple use case we
have now.

Sadly sometimes we have other constraints - especially in larger
corporations like the one I work for - which make it harder to create and
maintain these without real reasons.
My intention is to introduce Apache Beam right now to allow for future
growth opportunities such as you described but to do that I have to
convince others that it is the best thing to do right now.

It so happens that for the use case we are facing now perhaps Apache
Beam is too much and it looks like Kafka Streams would allow us to avoid
having to maintain another infrastructure cluster such as Flink.

I would prefer to be able to propose a way we can use Beam right now
without the added overhead of a Flink/Spark cluster so that I can convince
the teams that it is a viable option right now.
The alternative of switching to Beam once more requirements arrive
would be much less preferable as this is likely to never gather enough
momentum for a full refactoring.

Finally I feel the question that really needs to be asked is not why
Beam should recommend using a distributed processing framework which
totally makes sense and not even why it may require that in some use cases.

The real question I feel is why should there not be an option with Beam
to use a recommended best practice (by another apache project in fact) when
connecting to an external system?
The same as when connecting to SQS and PubSub should also be
implemented with Kafka I think.
If we think the existing connector has advantages in some use cases
then by all means it should still exist as an option however I feel the
user should be allowed the option of using a mechanism that is part of the
system being connected if willing to accept the implication it has.

And then the real question behind that would be - is there anything
preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?

Cheers,
Shahar.

------------------------------

Shahar Frank

srf...@gmail.com

+447799561438 <+44%207799%20561438>

------------------------------





On Mon, 17 Apr 2023 at 19:15, Kenneth Knowles <k...@apache.org> wrote:

Interesting discussion! I don't have a lot of expertise in some of
these details but I wanted to just add one little comment.

On Sat, Apr 15, 2023 at 10:40 PM Shahar Frank <srf...@gmail.com>
wrote:

Regarding horizontal scaling I may have not been clear about what I
mean.
With Kafka Streams I can just run the same app (e.g. with a K8s
Deployment) on as many pods I need and with Kafka Consumer Groups managing
the distribution of data all would work well.
Moreover I can create more pods than the number of partitions and
keep them idle so that if/when some crash others pick the slack quicker. And
this would all be managed by the Kafka consumer group coordinator.
If I do the same with an Apache Beam application - for example with a
Direct Runner or a Flink Runner running in "local" mode - each instance
will consume the entire topic as it is unaware of the other instances. To
work I will be required to use something like a Flink runner with a full
fledged Flink cluster. This is a disadvantage for beam in simpler use cases
where maintaining such an additional cluster is not required for the actual
functionality (e.g. just for filtering) and incurs costs not everyone is
willing to make.

Horizontal scaling is built into Beam. It all occurs within a single
pipeline. You should not try to scale up by running multiple pipelines
consuming from the same consumer group. Running your Beam pipeline on a
Flink cluster (or any other distributed runner) is the intended way to
achieve horizontal scaling. Beam has a very sophisticated (perhaps "the
most" sophisticated) sharding and work balancing model, for distributing
shards of work across workers. So it is by design that Beam is aware of the
sharding, but also does its own thing above and beyond. It is true that if
you have a very simple use case then Beam could be more than you need. Beam
is very general purpose. Of course, simple uses grow into more complex uses
:-) and you might end up stuck and/or porting to a distributed processing
engine after all. It all depends on what you are trying to do and what you
might do in the future!

Kenn



Kafka Streams would not run on a Kafka cluster that is true - I was
not saying it would. It might be run on a K8s cluster which is also how I
might be running K8s itself and any Flink/Spark cluster I might need. While
Flink can run in a standalone as well as cluster mode... as I said earlier
in this use case when Flink is used in standalone mode the use case fails -
which is my issue, as the same will work with Kafka Streams.

Kafka Streams can only connect to Kafka - exactly why I would prefer
to use Beam but to do that I need to show that we have no disadvantages for
the initial use case.
I'm also very aware of the other benefits of Beam - being runner
agnostic, language agnostic, source and sink agostic etc. which is why I
would very much like to use it right now.
Sadly right now if we are unable to scale horizontally without
maintaining another cluster - i.e. Flink or Spark - this is a major
disadvantage which might drive us to use Kafka Streams instead.

I have opened a bug in Github
<https://github.com/apache/beam/issues/25978>for the issue. I can
edit that to be more specific to documentation however I feel this would be
a huge miss for the project.
Also happy to open the same in JIRA if you can point me to where in
JIRA I can do that.
I couldn't find that on the contact-us
<https://beam.apache.org/community/contact-us/> page - it actually
points you to create an issue in Github which is what I did.

Finally I've been looking into other stream consumers in Beam IO.

I might be missing something but I think SqsIO seems to not be
managing the same - but rather expects SQS to manage data distribution
between consumers:

https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660

I think same goes for PubsubIO which I can't see where it tries to
set any control over which part of the stream would be managed by which
consumer but lets the Pubsub service do that instead:

https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726

I would ask why should Kafka not be considered a "streaming service"
just like SQS and Pubsub and allow it to manage data distribution just like
they would?

Cheers,
Shahar.

------------------------------

Shahar Frank

srf...@gmail.com

+447799561438 <+44%207799%20561438>

------------------------------






On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dpcoll...@google.com>
wrote:

the best practice recommended to consume for Kafka
This is when using the consumer API directly- when using a framework
like beam or even kafka streams (which actually doesn't use the consumer
group assignment mechanism, see here
https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks)
you should rely on the framework's partition to task assignment mechanism.

all observability tools around Kafka use consumer group lag which
is going to be unavailable if consumer groups are not being used

This is a valid feature request, to reflect the underlying progress
in offset commits, filing a feature request on JIRA would be the best way
to get this prioritized.

Kafka Streams allows me to horizontally scale by adding as many
instances of my application as I need and relies on Kafka to manage
distribution by using consumer groups.

No it doesn't, see above. Beam allows you to scale out horizontally
as well, but neither allows you to scale the source horizontally beyond the
number of partitions on the topic.

I'm required to maintain another distributed processing cluster
like Spark or Flink (on top of a Kafka cluster I already have)

Kafka streams does not run on your kafka cluster. It is a separate
runtime that you need to turn up and run jobs for separately, same as
flink. The only difference is that, effectively, you can only run kafka
streams in application mode while flink can also be run in session/cluster
mode.

The main downside of Kafka streams is that it can only be used
talking with kafka. If you ever want to read batch data or from/to another
streaming system you cannot reuse your existing code/architecture and need
to rewrite everything. One advantage of decoupled frameworks like beam (or
flink or spark) is that the same pipeline and code can be reused for
various data sources and sinks, and they come with a library of prebuilt
ones.

documentation
If the documentation misleadingly suggests that you can set group.id,
and doing so does not make upstream offset commits to allow you to access
metrics, please file a bug on JIRA so either the documentation or
implementation can be corrected.

-Daniel

On Sat, Apr 15, 2023 at 1:37 PM Shahar Frank <srf...@gmail.com>
wrote:

Thanks for the information Daniel,

For one - Kafka Consumer Groups is the best practice recommended to
consume for Kafka AFAIK and I would prefer to be able to use that.
Also all observability tools around Kafka use consumer group lag
which is going to be unavailable if consumer groups are not being used.

Finally in my use case I'm asked to evaluate using Apache Beam vs.
Kafka Streams.
Kafka Streams allows me to horizontally scale by adding as many
instances of my application as I need and relies on Kafka to manage
distribution by using consumer groups.
With Apache Beam I'm required to maintain another distributed
processing cluster like Spark or Flink (on top of a Kafka cluster I already
have) to be able to do the same.
To be clear in this use case there is no need for an additional
cluster except for consumer groups not being used.
This constitutes a disadvantage over Kafka Streams and other
solutions that use consumer groups.

Furthermore if this use case is not supported I would imagine the
documentation would mention that or at least not imply to the contrary.
In the latest version of the documentation for KafkaIO
<https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html>
 it
shows an example for configuring to use a consumer group while in fact this
settings will not be doing anything of the sort:
[image: image.png]
And:
[image: image.png]

It seems like this has already been raised in the past - e.g. here
<https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id>
 -
so I'm probably not the first person to be confused about that.

Cheers,
Shahar.

------------------------------

Shahar Frank

srf...@gmail.com

+447799561438 <+44%207799%20561438>

------------------------------





On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev <
dev@beam.apache.org> wrote:

Why do you want to use a consumer group? If you have consumers in
other jobs, your beam job will fail to receive all messages it should for
the topic.

It seems the code attempts to circumvent the partition
assignment mechanism provided by Kafka to use it's own.

All beam I/Os for partitioned sources do this. They use access to
the partitioning structure of the underlying system to track their progress
through each partition and provide feedback for scaling, as well as
tracking and enforcing exactly once processing semantics. In fact, most
interops with streaming data processing systems do this, you can see the
documentation of the flink kafka interop (
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene)
that it does not use or respect the partition assignments.

By doing that it prevents the user from using consumer groups.
Again, why do you (as a user) want to use consumer groups? What
value does it provide you?

-Daniel

On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <srf...@gmail.com>
wrote:

Hi All,

Posting here as suggested here
<https://github.com/apache/beam/issues/25978#issuecomment-1508530483>
.

I'm using KafkaIO to consume events from a Kafka topic.
I've added "group.id" to the consumer properties.
When running the pipeline I can see this value sent to Kafka in
the consumer properties.
The consumers created by KafkaIO fail to join the consumer group.
Looking into the code I can see that nowhere is the consumer
"subscribing" to the topic which is how KafkaConsumer should join a
consumer group. It seems the code attempts to circumvent the partition
assignment mechanism provided by Kafka to use it's own.
By doing that it prevents the user from using consumer groups.
Is that by intention? Is there any reason why the decision to
avoid using consumer groups has been taken?
I would love to see any documentation about that if possible
please.

Cheers,
Shahar.





Reply via email to