Hi,my two cents on this. While it would perfectly possible to use consumer group in KafkaIO, it has its own issues. The most visible would be, that using subscriptions might introduce unnecessary duplicates in downstream processing. The reason for this is that consumer in a consumer group might be reassigned partitions and/or being reset to a different offset based on conditions that are out of control of the consumer itself. This might lead to an inability to successfully commit offset of a bundle after it has been sent downstream, while the processed-but-not-yet-committed input element might be reprocessed by a different worker due to partition rebalance. This add unnecessary complexity with questionable benefits (observability of lag in a consumer group and possible automatic discovery of new partitions in a topic).
For these reasons I'd say, that it would be possible to introduce a different (e.g. KafkaConsumerGroupIO) IO, which could be added to Beam itself or (perhaps) some extension, but it makes little sense to introduce this into KafkaIO itself.
Hope this helps, Jan On 10/18/23 05:49, Shaojie Wu wrote:
Can't agree with Shahar Frank more On 2023/04/19 18:17:15 Shahar Frank wrote:Hi Daniel, I think I've already answered these in a previous email but let me answer them again. I was specifically responding to quoted points from your last email. Ireally don't understand why you, as a user, care if the implementation of the framework is using consumer groups or not as long as it has the throughput you need and is correct. If there is something specific this would be useful for, like monitoring or metrics, it seems a reasonable feature request to me to ask to reflect the progress state in a kafka consumer group, but not to use the underlying assignment mechanism for the reasons stated above.I do care for a couple of reasons: 1) Introducing risk with a technology that non one knows in the company vs. a technology people know and trust (i.e. Kafka Consumer Groups) 2) A multitude of alerting, monitoring and other observability tools that are using consumer groups will not be usable and new solutions would be required 3) Expert knowhow on managing (and sometimes fixing) issues with Kafka in the company will become useless - and this in turn introduces risk to projects If you want to run in a single machine application mode, you can trysetting the `flinkMaster` parameter to `[local]`, which should launch an inline flink runner just for your pipeline. If you want to have a scaling out cluster per-application, you can launch a repeatable flink cluster with kubernetes on a per-application basis pretty easily.I do agree that a Flink cluster is a great solution and have maintained a few in my time. Sadly in our use case I have to consider constraints set by security and platform teams and that will take time. By the time these follow through it is very likely that the decision to use Beam would have fallen in favor of other solutions (i.e. Kafka Streams, Camel and others) and this would be a shame in my view. It is very unlikely that once taken this decision would be reversed for a long time. Given that a Flink cluster is not an option for me at this point I have been trying to push a solution where instances of a Beam pipeline are run "disconnected" using a DIrectRunner or (better even) a FlinkRunner in local standalone mode (as you're suggesting) - and like you suggested we are running those using a K8s deployment which allows us to scale up a required. The issue is if more than one pod attempts to run the pipeline - they will not split the partitions between them but rather each would consume ALL partitions and the output would include as many duplications as the number of pods. This solution will then not be able to scale up horizontally. That is exactly why I'm trying to suggest using consumer groups. In this attempt I created - here <https://github.com/srfrnk/demo/blob/cg-reader/beam-kafka2file/src/main/java/kafka/Read.java> - I've already shown it is possible (albeit I admit with limitations such as you described) to use consumer groups and effectively allow our use case to run on a scaled up K8s deployment of DirectRunners. And again finally my question is why should Kafka be treated differently from other messaging systems like SQS and PubSub for which it seems Beam does not attempt to manage the distribution strategy as well the mechanism for managing processed (committed) messages? If Beam is able to perform as well with them managing these couldn't the same be applied to Kafka? Cheers, Shahar. ------------------------------ Shahar Frank srf...@gmail.com +447799561438 ------------------------------ On Wed, 19 Apr 2023 at 13:19, Daniel Collins <dpcoll...@google.com> wrote:Hello, I was specifically responding to quoted points from your last email. I really don't understand why you, as a user, care if the implementation of the framework is using consumer groups or not as long as it has the throughput you need and is correct. If there is something specific this would be useful for, like monitoring or metrics, it seems a reasonable feature request to me to ask to reflect the progress state in a kafka consumer group, but not to use the underlying assignment mechanism for the reasons stated above. Per: "why Beam should recommend using a distributed processing framework" If you want to run in a single machine application mode, you can try setting the `flinkMaster` parameter to `[local]`, which should launch an inline flink runner just for your pipeline. If you want to have a scaling out cluster per-application, you can launch a repeatable flink cluster with kubernetes on a per-application basis pretty easily. -Daniel On Wed, Apr 19, 2023 at 8:11 AM Shahar Frank <srf...@gmail.com> wrote:Hi Daniel, I think you missed my last email that deals exactly with what you just commented. I can send it again if you can't find it Shahar. On Wed, Apr 19, 2023, 13:07 Daniel Collins <dpcoll...@google.com> wrote:The real question I feel is why should there not be an option withBeam to use a recommended best practice (by another apache project in fact) when connecting to an external system? You ignored my previous answer. This is not a "best practice" for streaming frameworks, only for applications, and is not used by other frameworks *including kafka streams*.The same as when connecting to SQS and PubSub should also beimplemented with Kafka I think. This makes it much harder to implement a consistent watermark and involves either expensive secondary processing (Pub/Sub has a whole second tracking subscription) or incorrectness of the watermark bounds when reading from backlog. This makes windowing more likely to be incorrect.the user should be allowed the option of using a mechanism that ispart of the system being connected if willing to accept the implication it has. This requires a complete rewrite of how KafkaIO operates, so its not as easy as "flip a switch".And then the real question behind that would be - is there anythingpreventing Beam from using Apache Kafka Consumer Groups in KafkaIO? You haven't answered my original question: Why do you care if it uses consumer groups or not? On Wed, Apr 19, 2023 at 3:46 AM Shahar Frank <srf...@gmail.com> wrote:Thanks Kenn, I agree with your comments fully. I would rather use a Flink cluster even with the simple use case we have now. Sadly sometimes we have other constraints - especially in larger corporations like the one I work for - which make it harder to create and maintain these without real reasons. My intention is to introduce Apache Beam right now to allow for future growth opportunities such as you described but to do that I have to convince others that it is the best thing to do right now. It so happens that for the use case we are facing now perhaps Apache Beam is too much and it looks like Kafka Streams would allow us to avoid having to maintain another infrastructure cluster such as Flink. I would prefer to be able to propose a way we can use Beam right now without the added overhead of a Flink/Spark cluster so that I can convince the teams that it is a viable option right now. The alternative of switching to Beam once more requirements arrive would be much less preferable as this is likely to never gather enough momentum for a full refactoring. Finally I feel the question that really needs to be asked is not why Beam should recommend using a distributed processing framework which totally makes sense and not even why it may require that in some use cases. The real question I feel is why should there not be an option with Beam to use a recommended best practice (by another apache project in fact) when connecting to an external system? The same as when connecting to SQS and PubSub should also be implemented with Kafka I think. If we think the existing connector has advantages in some use cases then by all means it should still exist as an option however I feel the user should be allowed the option of using a mechanism that is part of the system being connected if willing to accept the implication it has. And then the real question behind that would be - is there anything preventing Beam from using Apache Kafka Consumer Groups in KafkaIO? Cheers, Shahar. ------------------------------ Shahar Frank srf...@gmail.com +447799561438 <+44%207799%20561438> ------------------------------ On Mon, 17 Apr 2023 at 19:15, Kenneth Knowles <k...@apache.org> wrote:Interesting discussion! I don't have a lot of expertise in some of these details but I wanted to just add one little comment. On Sat, Apr 15, 2023 at 10:40 PM Shahar Frank <srf...@gmail.com> wrote:Regarding horizontal scaling I may have not been clear about what I mean. With Kafka Streams I can just run the same app (e.g. with a K8s Deployment) on as many pods I need and with Kafka Consumer Groups managing the distribution of data all would work well. Moreover I can create more pods than the number of partitions and keep them idle so that if/when some crash others pick the slack quicker. And this would all be managed by the Kafka consumer group coordinator. If I do the same with an Apache Beam application - for example with a Direct Runner or a Flink Runner running in "local" mode - each instance will consume the entire topic as it is unaware of the other instances. To work I will be required to use something like a Flink runner with a full fledged Flink cluster. This is a disadvantage for beam in simpler use cases where maintaining such an additional cluster is not required for the actual functionality (e.g. just for filtering) and incurs costs not everyone is willing to make.Horizontal scaling is built into Beam. It all occurs within a single pipeline. You should not try to scale up by running multiple pipelines consuming from the same consumer group. Running your Beam pipeline on a Flink cluster (or any other distributed runner) is the intended way to achieve horizontal scaling. Beam has a very sophisticated (perhaps "the most" sophisticated) sharding and work balancing model, for distributing shards of work across workers. So it is by design that Beam is aware of the sharding, but also does its own thing above and beyond. It is true that if you have a very simple use case then Beam could be more than you need. Beam is very general purpose. Of course, simple uses grow into more complex uses :-) and you might end up stuck and/or porting to a distributed processing engine after all. It all depends on what you are trying to do and what you might do in the future! KennKafka Streams would not run on a Kafka cluster that is true - I was not saying it would. It might be run on a K8s cluster which is also how I might be running K8s itself and any Flink/Spark cluster I might need. While Flink can run in a standalone as well as cluster mode... as I said earlier in this use case when Flink is used in standalone mode the use case fails - which is my issue, as the same will work with Kafka Streams. Kafka Streams can only connect to Kafka - exactly why I would prefer to use Beam but to do that I need to show that we have no disadvantages for the initial use case. I'm also very aware of the other benefits of Beam - being runner agnostic, language agnostic, source and sink agostic etc. which is why I would very much like to use it right now. Sadly right now if we are unable to scale horizontally without maintaining another cluster - i.e. Flink or Spark - this is a major disadvantage which might drive us to use Kafka Streams instead. I have opened a bug in Github <https://github.com/apache/beam/issues/25978>for the issue. I can edit that to be more specific to documentation however I feel this would be a huge miss for the project. Also happy to open the same in JIRA if you can point me to where in JIRA I can do that. I couldn't find that on the contact-us <https://beam.apache.org/community/contact-us/> page - it actually points you to create an issue in Github which is what I did. Finally I've been looking into other stream consumers in Beam IO. I might be missing something but I think SqsIO seems to not be managing the same - but rather expects SQS to manage data distribution between consumers: https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660 I think same goes for PubsubIO which I can't see where it tries to set any control over which part of the stream would be managed by which consumer but lets the Pubsub service do that instead: https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726 I would ask why should Kafka not be considered a "streaming service" just like SQS and Pubsub and allow it to manage data distribution just like they would? Cheers, Shahar. ------------------------------ Shahar Frank srf...@gmail.com +447799561438 <+44%207799%20561438> ------------------------------ On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dpcoll...@google.com> wrote:the best practice recommended to consume for KafkaThis is when using the consumer API directly- when using a framework like beam or even kafka streams (which actually doesn't use the consumer group assignment mechanism, see here https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks) you should rely on the framework's partition to task assignment mechanism.all observability tools around Kafka use consumer group lag whichis going to be unavailable if consumer groups are not being used This is a valid feature request, to reflect the underlying progress in offset commits, filing a feature request on JIRA would be the best way to get this prioritized.Kafka Streams allows me to horizontally scale by adding as manyinstances of my application as I need and relies on Kafka to manage distribution by using consumer groups. No it doesn't, see above. Beam allows you to scale out horizontally as well, but neither allows you to scale the source horizontally beyond the number of partitions on the topic.I'm required to maintain another distributed processing clusterlike Spark or Flink (on top of a Kafka cluster I already have) Kafka streams does not run on your kafka cluster. It is a separate runtime that you need to turn up and run jobs for separately, same as flink. The only difference is that, effectively, you can only run kafka streams in application mode while flink can also be run in session/cluster mode. The main downside of Kafka streams is that it can only be used talking with kafka. If you ever want to read batch data or from/to another streaming system you cannot reuse your existing code/architecture and need to rewrite everything. One advantage of decoupled frameworks like beam (or flink or spark) is that the same pipeline and code can be reused for various data sources and sinks, and they come with a library of prebuilt ones.documentationIf the documentation misleadingly suggests that you can set group.id, and doing so does not make upstream offset commits to allow you to access metrics, please file a bug on JIRA so either the documentation or implementation can be corrected. -Daniel On Sat, Apr 15, 2023 at 1:37 PM Shahar Frank <srf...@gmail.com> wrote:Thanks for the information Daniel, For one - Kafka Consumer Groups is the best practice recommended to consume for Kafka AFAIK and I would prefer to be able to use that. Also all observability tools around Kafka use consumer group lag which is going to be unavailable if consumer groups are not being used. Finally in my use case I'm asked to evaluate using Apache Beam vs. Kafka Streams. Kafka Streams allows me to horizontally scale by adding as many instances of my application as I need and relies on Kafka to manage distribution by using consumer groups. With Apache Beam I'm required to maintain another distributed processing cluster like Spark or Flink (on top of a Kafka cluster I already have) to be able to do the same. To be clear in this use case there is no need for an additional cluster except for consumer groups not being used. This constitutes a disadvantage over Kafka Streams and other solutions that use consumer groups. Furthermore if this use case is not supported I would imagine the documentation would mention that or at least not imply to the contrary. In the latest version of the documentation for KafkaIO <https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html> it shows an example for configuring to use a consumer group while in fact this settings will not be doing anything of the sort: [image: image.png] And: [image: image.png] It seems like this has already been raised in the past - e.g. here <https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id> - so I'm probably not the first person to be confused about that. Cheers, Shahar. ------------------------------ Shahar Frank srf...@gmail.com +447799561438 <+44%207799%20561438> ------------------------------ On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev < dev@beam.apache.org> wrote:Why do you want to use a consumer group? If you have consumers in other jobs, your beam job will fail to receive all messages it should for the topic.It seems the code attempts to circumvent the partitionassignment mechanism provided by Kafka to use it's own. All beam I/Os for partitioned sources do this. They use access to the partitioning structure of the underlying system to track their progress through each partition and provide feedback for scaling, as well as tracking and enforcing exactly once processing semantics. In fact, most interops with streaming data processing systems do this, you can see the documentation of the flink kafka interop ( https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene) that it does not use or respect the partition assignments.By doing that it prevents the user from using consumer groups.Again, why do you (as a user) want to use consumer groups? What value does it provide you? -Daniel On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <srf...@gmail.com> wrote:Hi All, Posting here as suggested here <https://github.com/apache/beam/issues/25978#issuecomment-1508530483> . I'm using KafkaIO to consume events from a Kafka topic. I've added "group.id" to the consumer properties. When running the pipeline I can see this value sent to Kafka in the consumer properties. The consumers created by KafkaIO fail to join the consumer group. Looking into the code I can see that nowhere is the consumer "subscribing" to the topic which is how KafkaConsumer should join a consumer group. It seems the code attempts to circumvent the partition assignment mechanism provided by Kafka to use it's own. By doing that it prevents the user from using consumer groups. Is that by intention? Is there any reason why the decision to avoid using consumer groups has been taken? I would love to see any documentation about that if possible please. Cheers, Shahar.