Can't agree with Shahar Frank more
On 2023/04/19 18:17:15 Shahar Frank wrote: > Hi Daniel, > > I think I've already answered these in a previous email but let me answer > them again. > > I was specifically responding to quoted points from your last email. I > > really don't understand why you, as a user, care if the implementation of > > the framework is using consumer groups or not as long as it has the > > throughput you need and is correct. If there is something specific this > > would be useful for, like monitoring or metrics, it seems a reasonable > > feature request to me to ask to reflect the progress state in a kafka > > consumer group, but not to use the underlying assignment mechanism for the > > reasons stated above. > > > > I do care for a couple of reasons: > 1) Introducing risk with a technology that non one knows in the company vs. > a technology people know and trust (i.e. Kafka Consumer Groups) > 2) A multitude of alerting, monitoring and other observability tools that > are using consumer groups will not be usable and new solutions would be > required > 3) Expert knowhow on managing (and sometimes fixing) issues with Kafka in > the company will become useless - and this in turn introduces risk to > projects > > If you want to run in a single machine application mode, you can try > > setting the `flinkMaster` parameter to `[local]`, which should launch an > > inline flink runner just for your pipeline. If you want to have a scaling > > out cluster per-application, you can launch a repeatable flink cluster with > > kubernetes on a per-application basis pretty easily. > > > I do agree that a Flink cluster is a great solution and have maintained a > few in my time. > Sadly in our use case I have to consider constraints set by security and > platform teams and that will take time. > By the time these follow through it is very likely that the decision to use > Beam would have fallen in favor of other solutions (i.e. Kafka Streams, > Camel and others) and this would be a shame in my view. It is very unlikely > that once taken this decision would be reversed for a long time. > > Given that a Flink cluster is not an option for me at this point I have > been trying to push a solution where instances of a Beam pipeline are run > "disconnected" using a DIrectRunner or (better even) a FlinkRunner in local > standalone mode (as you're suggesting) - and like you suggested we are > running those using a K8s deployment which allows us to scale up a required. > The issue is if more than one pod attempts to run the pipeline - they will > not split the partitions between them but rather each would consume ALL > partitions and the output would include as many duplications as the number > of pods. This solution will then not be able to scale up horizontally. > > That is exactly why I'm trying to suggest using consumer groups. > In this attempt I created - here > <https://github.com/srfrnk/demo/blob/cg-reader/beam-kafka2file/src/main/java/kafka/Read.java> > - I've already shown it is possible (albeit I admit with limitations such > as you described) to use consumer groups and effectively allow our use case > to run on a scaled up K8s deployment of DirectRunners. > > And again finally my question is why should Kafka be treated differently > from other messaging systems like SQS and PubSub for which it seems Beam > does not attempt to manage the distribution strategy as well the mechanism > for managing processed (committed) messages? > > If Beam is able to perform as well with them managing these couldn't the > same be applied to Kafka? > > Cheers, > Shahar. > > ------------------------------ > > Shahar Frank > > srf...@gmail.com > > +447799561438 > > ------------------------------ > > > > > > On Wed, 19 Apr 2023 at 13:19, Daniel Collins <dpcoll...@google.com> wrote: > > > Hello, > > > > I was specifically responding to quoted points from your last email. I > > really don't understand why you, as a user, care if the implementation of > > the framework is using consumer groups or not as long as it has the > > throughput you need and is correct. If there is something specific this > > would be useful for, like monitoring or metrics, it seems a reasonable > > feature request to me to ask to reflect the progress state in a kafka > > consumer group, but not to use the underlying assignment mechanism for the > > reasons stated above. > > > > Per: "why Beam should recommend using a distributed processing framework" > > > > If you want to run in a single machine application mode, you can try > > setting the `flinkMaster` parameter to `[local]`, which should launch an > > inline flink runner just for your pipeline. If you want to have a scaling > > out cluster per-application, you can launch a repeatable flink cluster with > > kubernetes on a per-application basis pretty easily. > > > > -Daniel > > > > On Wed, Apr 19, 2023 at 8:11 AM Shahar Frank <srf...@gmail.com> wrote: > > > >> Hi Daniel, > >> > >> I think you missed my last email that deals exactly with what you just > >> commented. > >> > >> I can send it again if you can't find it > >> > >> Shahar. > >> > >> > >> On Wed, Apr 19, 2023, 13:07 Daniel Collins <dpcoll...@google.com> wrote: > >> > >>> > The real question I feel is why should there not be an option with > >>> Beam to use a recommended best practice (by another apache project in > >>> fact) > >>> when connecting to an external system? > >>> > >>> You ignored my previous answer. This is not a "best practice" for > >>> streaming frameworks, only for applications, and is not used by other > >>> frameworks *including kafka streams*. > >>> > >>> > The same as when connecting to SQS and PubSub should also be > >>> implemented with Kafka I think. > >>> > >>> This makes it much harder to implement a consistent watermark and > >>> involves either expensive secondary processing (Pub/Sub has a whole second > >>> tracking subscription) or incorrectness of the watermark bounds when > >>> reading from backlog. This makes windowing more likely to be incorrect. > >>> > >>> > the user should be allowed the option of using a mechanism that is > >>> part of the system being connected if willing to accept the implication it > >>> has. > >>> > >>> This requires a complete rewrite of how KafkaIO operates, so its not as > >>> easy as "flip a switch". > >>> > >>> > And then the real question behind that would be - is there anything > >>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO? > >>> > >>> You haven't answered my original question: Why do you care if it uses > >>> consumer groups or not? > >>> > >>> On Wed, Apr 19, 2023 at 3:46 AM Shahar Frank <srf...@gmail.com> wrote: > >>> > >>>> Thanks Kenn, > >>>> > >>>> I agree with your comments fully. > >>>> I would rather use a Flink cluster even with the simple use case we > >>>> have now. > >>>> > >>>> Sadly sometimes we have other constraints - especially in larger > >>>> corporations like the one I work for - which make it harder to create and > >>>> maintain these without real reasons. > >>>> My intention is to introduce Apache Beam right now to allow for future > >>>> growth opportunities such as you described but to do that I have to > >>>> convince others that it is the best thing to do right now. > >>>> > >>>> It so happens that for the use case we are facing now perhaps Apache > >>>> Beam is too much and it looks like Kafka Streams would allow us to avoid > >>>> having to maintain another infrastructure cluster such as Flink. > >>>> > >>>> I would prefer to be able to propose a way we can use Beam right now > >>>> without the added overhead of a Flink/Spark cluster so that I can > >>>> convince > >>>> the teams that it is a viable option right now. > >>>> The alternative of switching to Beam once more requirements arrive > >>>> would be much less preferable as this is likely to never gather enough > >>>> momentum for a full refactoring. > >>>> > >>>> Finally I feel the question that really needs to be asked is not why > >>>> Beam should recommend using a distributed processing framework which > >>>> totally makes sense and not even why it may require that in some use > >>>> cases. > >>>> > >>>> The real question I feel is why should there not be an option with Beam > >>>> to use a recommended best practice (by another apache project in fact) > >>>> when > >>>> connecting to an external system? > >>>> The same as when connecting to SQS and PubSub should also be > >>>> implemented with Kafka I think. > >>>> If we think the existing connector has advantages in some use cases > >>>> then by all means it should still exist as an option however I feel the > >>>> user should be allowed the option of using a mechanism that is part of > >>>> the > >>>> system being connected if willing to accept the implication it has. > >>>> > >>>> And then the real question behind that would be - is there anything > >>>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO? > >>>> > >>>> Cheers, > >>>> Shahar. > >>>> > >>>> ------------------------------ > >>>> > >>>> Shahar Frank > >>>> > >>>> srf...@gmail.com > >>>> > >>>> +447799561438 <+44%207799%20561438> > >>>> > >>>> ------------------------------ > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> On Mon, 17 Apr 2023 at 19:15, Kenneth Knowles <k...@apache.org> wrote: > >>>> > >>>>> Interesting discussion! I don't have a lot of expertise in some of > >>>>> these details but I wanted to just add one little comment. > >>>>> > >>>>> On Sat, Apr 15, 2023 at 10:40 PM Shahar Frank <srf...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Regarding horizontal scaling I may have not been clear about what I > >>>>>> mean. > >>>>>> With Kafka Streams I can just run the same app (e.g. with a K8s > >>>>>> Deployment) on as many pods I need and with Kafka Consumer Groups > >>>>>> managing > >>>>>> the distribution of data all would work well. > >>>>>> Moreover I can create more pods than the number of partitions and > >>>>>> keep them idle so that if/when some crash others pick the slack > >>>>>> quicker. And > >>>>>> this would all be managed by the Kafka consumer group coordinator. > >>>>>> If I do the same with an Apache Beam application - for example with a > >>>>>> Direct Runner or a Flink Runner running in "local" mode - each instance > >>>>>> will consume the entire topic as it is unaware of the other instances. > >>>>>> To > >>>>>> work I will be required to use something like a Flink runner with a > >>>>>> full > >>>>>> fledged Flink cluster. This is a disadvantage for beam in simpler use > >>>>>> cases > >>>>>> where maintaining such an additional cluster is not required for the > >>>>>> actual > >>>>>> functionality (e.g. just for filtering) and incurs costs not everyone > >>>>>> is > >>>>>> willing to make. > >>>>>> > >>>>> > >>>>> Horizontal scaling is built into Beam. It all occurs within a single > >>>>> pipeline. You should not try to scale up by running multiple pipelines > >>>>> consuming from the same consumer group. Running your Beam pipeline on a > >>>>> Flink cluster (or any other distributed runner) is the intended way to > >>>>> achieve horizontal scaling. Beam has a very sophisticated (perhaps "the > >>>>> most" sophisticated) sharding and work balancing model, for distributing > >>>>> shards of work across workers. So it is by design that Beam is aware of > >>>>> the > >>>>> sharding, but also does its own thing above and beyond. It is true that > >>>>> if > >>>>> you have a very simple use case then Beam could be more than you need. > >>>>> Beam > >>>>> is very general purpose. Of course, simple uses grow into more complex > >>>>> uses > >>>>> :-) and you might end up stuck and/or porting to a distributed > >>>>> processing > >>>>> engine after all. It all depends on what you are trying to do and what > >>>>> you > >>>>> might do in the future! > >>>>> > >>>>> Kenn > >>>>> > >>>>> > >>>>> > >>>>>> > >>>>>> Kafka Streams would not run on a Kafka cluster that is true - I was > >>>>>> not saying it would. It might be run on a K8s cluster which is also > >>>>>> how I > >>>>>> might be running K8s itself and any Flink/Spark cluster I might need. > >>>>>> While > >>>>>> Flink can run in a standalone as well as cluster mode... as I said > >>>>>> earlier > >>>>>> in this use case when Flink is used in standalone mode the use case > >>>>>> fails - > >>>>>> which is my issue, as the same will work with Kafka Streams. > >>>>>> > >>>>>> Kafka Streams can only connect to Kafka - exactly why I would prefer > >>>>>> to use Beam but to do that I need to show that we have no > >>>>>> disadvantages for > >>>>>> the initial use case. > >>>>>> I'm also very aware of the other benefits of Beam - being runner > >>>>>> agnostic, language agnostic, source and sink agostic etc. which is why > >>>>>> I > >>>>>> would very much like to use it right now. > >>>>>> Sadly right now if we are unable to scale horizontally without > >>>>>> maintaining another cluster - i.e. Flink or Spark - this is a major > >>>>>> disadvantage which might drive us to use Kafka Streams instead. > >>>>>> > >>>>>> I have opened a bug in Github > >>>>>> <https://github.com/apache/beam/issues/25978>for the issue. I can > >>>>>> edit that to be more specific to documentation however I feel this > >>>>>> would be > >>>>>> a huge miss for the project. > >>>>>> Also happy to open the same in JIRA if you can point me to where in > >>>>>> JIRA I can do that. > >>>>>> I couldn't find that on the contact-us > >>>>>> <https://beam.apache.org/community/contact-us/> page - it actually > >>>>>> points you to create an issue in Github which is what I did. > >>>>>> > >>>>>> Finally I've been looking into other stream consumers in Beam IO. > >>>>>> > >>>>>> I might be missing something but I think SqsIO seems to not be > >>>>>> managing the same - but rather expects SQS to manage data distribution > >>>>>> between consumers: > >>>>>> > >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660 > >>>>>> > >>>>>> I think same goes for PubsubIO which I can't see where it tries to > >>>>>> set any control over which part of the stream would be managed by which > >>>>>> consumer but lets the Pubsub service do that instead: > >>>>>> > >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726 > >>>>>> > >>>>>> I would ask why should Kafka not be considered a "streaming service" > >>>>>> just like SQS and Pubsub and allow it to manage data distribution just > >>>>>> like > >>>>>> they would? > >>>>>> > >>>>>> Cheers, > >>>>>> Shahar. > >>>>>> > >>>>>> ------------------------------ > >>>>>> > >>>>>> Shahar Frank > >>>>>> > >>>>>> srf...@gmail.com > >>>>>> > >>>>>> +447799561438 <+44%207799%20561438> > >>>>>> > >>>>>> ------------------------------ > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dpcoll...@google.com> > >>>>>> wrote: > >>>>>> > >>>>>>> > the best practice recommended to consume for Kafka > >>>>>>> > >>>>>>> This is when using the consumer API directly- when using a framework > >>>>>>> like beam or even kafka streams (which actually doesn't use the > >>>>>>> consumer > >>>>>>> group assignment mechanism, see here > >>>>>>> https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks) > >>>>>>> you should rely on the framework's partition to task assignment > >>>>>>> mechanism. > >>>>>>> > >>>>>>> > all observability tools around Kafka use consumer group lag which > >>>>>>> is going to be unavailable if consumer groups are not being used > >>>>>>> > >>>>>>> This is a valid feature request, to reflect the underlying progress > >>>>>>> in offset commits, filing a feature request on JIRA would be the best > >>>>>>> way > >>>>>>> to get this prioritized. > >>>>>>> > >>>>>>> > Kafka Streams allows me to horizontally scale by adding as many > >>>>>>> instances of my application as I need and relies on Kafka to manage > >>>>>>> distribution by using consumer groups. > >>>>>>> > >>>>>>> No it doesn't, see above. Beam allows you to scale out horizontally > >>>>>>> as well, but neither allows you to scale the source horizontally > >>>>>>> beyond the > >>>>>>> number of partitions on the topic. > >>>>>>> > >>>>>>> > I'm required to maintain another distributed processing cluster > >>>>>>> like Spark or Flink (on top of a Kafka cluster I already have) > >>>>>>> > >>>>>>> Kafka streams does not run on your kafka cluster. It is a separate > >>>>>>> runtime that you need to turn up and run jobs for separately, same as > >>>>>>> flink. The only difference is that, effectively, you can only run > >>>>>>> kafka > >>>>>>> streams in application mode while flink can also be run in > >>>>>>> session/cluster > >>>>>>> mode. > >>>>>>> > >>>>>>> The main downside of Kafka streams is that it can only be used > >>>>>>> talking with kafka. If you ever want to read batch data or from/to > >>>>>>> another > >>>>>>> streaming system you cannot reuse your existing code/architecture and > >>>>>>> need > >>>>>>> to rewrite everything. One advantage of decoupled frameworks like > >>>>>>> beam (or > >>>>>>> flink or spark) is that the same pipeline and code can be reused for > >>>>>>> various data sources and sinks, and they come with a library of > >>>>>>> prebuilt > >>>>>>> ones. > >>>>>>> > >>>>>>> > documentation > >>>>>>> > >>>>>>> If the documentation misleadingly suggests that you can set group.id, > >>>>>>> and doing so does not make upstream offset commits to allow you to > >>>>>>> access > >>>>>>> metrics, please file a bug on JIRA so either the documentation or > >>>>>>> implementation can be corrected. > >>>>>>> > >>>>>>> -Daniel > >>>>>>> > >>>>>>> On Sat, Apr 15, 2023 at 1:37 PM Shahar Frank <srf...@gmail.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Thanks for the information Daniel, > >>>>>>>> > >>>>>>>> For one - Kafka Consumer Groups is the best practice recommended to > >>>>>>>> consume for Kafka AFAIK and I would prefer to be able to use that. > >>>>>>>> Also all observability tools around Kafka use consumer group lag > >>>>>>>> which is going to be unavailable if consumer groups are not being > >>>>>>>> used. > >>>>>>>> > >>>>>>>> Finally in my use case I'm asked to evaluate using Apache Beam vs. > >>>>>>>> Kafka Streams. > >>>>>>>> Kafka Streams allows me to horizontally scale by adding as many > >>>>>>>> instances of my application as I need and relies on Kafka to manage > >>>>>>>> distribution by using consumer groups. > >>>>>>>> With Apache Beam I'm required to maintain another distributed > >>>>>>>> processing cluster like Spark or Flink (on top of a Kafka cluster I > >>>>>>>> already > >>>>>>>> have) to be able to do the same. > >>>>>>>> To be clear in this use case there is no need for an additional > >>>>>>>> cluster except for consumer groups not being used. > >>>>>>>> This constitutes a disadvantage over Kafka Streams and other > >>>>>>>> solutions that use consumer groups. > >>>>>>>> > >>>>>>>> Furthermore if this use case is not supported I would imagine the > >>>>>>>> documentation would mention that or at least not imply to the > >>>>>>>> contrary. > >>>>>>>> In the latest version of the documentation for KafkaIO > >>>>>>>> <https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html> > >>>>>>>> it > >>>>>>>> shows an example for configuring to use a consumer group while in > >>>>>>>> fact this > >>>>>>>> settings will not be doing anything of the sort: > >>>>>>>> [image: image.png] > >>>>>>>> And: > >>>>>>>> [image: image.png] > >>>>>>>> > >>>>>>>> It seems like this has already been raised in the past - e.g. here > >>>>>>>> <https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id> > >>>>>>>> - > >>>>>>>> so I'm probably not the first person to be confused about that. > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Shahar. > >>>>>>>> > >>>>>>>> ------------------------------ > >>>>>>>> > >>>>>>>> Shahar Frank > >>>>>>>> > >>>>>>>> srf...@gmail.com > >>>>>>>> > >>>>>>>> +447799561438 <+44%207799%20561438> > >>>>>>>> > >>>>>>>> ------------------------------ > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev < > >>>>>>>> dev@beam.apache.org> wrote: > >>>>>>>> > >>>>>>>>> Why do you want to use a consumer group? If you have consumers in > >>>>>>>>> other jobs, your beam job will fail to receive all messages it > >>>>>>>>> should for > >>>>>>>>> the topic. > >>>>>>>>> > >>>>>>>>> > It seems the code attempts to circumvent the partition > >>>>>>>>> assignment mechanism provided by Kafka to use it's own. > >>>>>>>>> > >>>>>>>>> All beam I/Os for partitioned sources do this. They use access to > >>>>>>>>> the partitioning structure of the underlying system to track their > >>>>>>>>> progress > >>>>>>>>> through each partition and provide feedback for scaling, as well as > >>>>>>>>> tracking and enforcing exactly once processing semantics. In fact, > >>>>>>>>> most > >>>>>>>>> interops with streaming data processing systems do this, you can > >>>>>>>>> see the > >>>>>>>>> documentation of the flink kafka interop ( > >>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene) > >>>>>>>>> that it does not use or respect the partition assignments. > >>>>>>>>> > >>>>>>>>> > By doing that it prevents the user from using consumer groups. > >>>>>>>>> > >>>>>>>>> Again, why do you (as a user) want to use consumer groups? What > >>>>>>>>> value does it provide you? > >>>>>>>>> > >>>>>>>>> -Daniel > >>>>>>>>> > >>>>>>>>> On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <srf...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi All, > >>>>>>>>>> > >>>>>>>>>> Posting here as suggested here > >>>>>>>>>> <https://github.com/apache/beam/issues/25978#issuecomment-1508530483> > >>>>>>>>>> . > >>>>>>>>>> > >>>>>>>>>> I'm using KafkaIO to consume events from a Kafka topic. > >>>>>>>>>> I've added "group.id" to the consumer properties. > >>>>>>>>>> When running the pipeline I can see this value sent to Kafka in > >>>>>>>>>> the consumer properties. > >>>>>>>>>> The consumers created by KafkaIO fail to join the consumer group. > >>>>>>>>>> Looking into the code I can see that nowhere is the consumer > >>>>>>>>>> "subscribing" to the topic which is how KafkaConsumer should join a > >>>>>>>>>> consumer group. It seems the code attempts to circumvent the > >>>>>>>>>> partition > >>>>>>>>>> assignment mechanism provided by Kafka to use it's own. > >>>>>>>>>> By doing that it prevents the user from using consumer groups. > >>>>>>>>>> Is that by intention? Is there any reason why the decision to > >>>>>>>>>> avoid using consumer groups has been taken? > >>>>>>>>>> I would love to see any documentation about that if possible > >>>>>>>>>> please. > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> Shahar. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> >