Can't agree with Shahar Frank more
On 2023/04/19 18:17:15 Shahar Frank wrote: > Hi Daniel, > > I think I've already answered these in a previous email but let me answer > them again. > > I was specifically responding to quoted points from your last email. I >> really don't understand why you, as a user, care if the implementation of >> the framework is using consumer groups or not as long as it has the >> throughput you need and is correct. If there is something specific this >> would be useful for, like monitoring or metrics, it seems a reasonable >> feature request to me to ask to reflect the progress state in a kafka >> consumer group, but not to use the underlying assignment mechanism for the >> reasons stated above. >> > > I do care for a couple of reasons: > 1) Introducing risk with a technology that non one knows in the company vs. > a technology people know and trust (i.e. Kafka Consumer Groups) > 2) A multitude of alerting, monitoring and other observability tools that > are using consumer groups will not be usable and new solutions would be > required > 3) Expert knowhow on managing (and sometimes fixing) issues with Kafka in > the company will become useless - and this in turn introduces risk to > projects > > If you want to run in a single machine application mode, you can try >> setting the `flinkMaster` parameter to `[local]`, which should launch an >> inline flink runner just for your pipeline. If you want to have a scaling >> out cluster per-application, you can launch a repeatable flink cluster with >> kubernetes on a per-application basis pretty easily. > > > I do agree that a Flink cluster is a great solution and have maintained a > few in my time. > Sadly in our use case I have to consider constraints set by security and > platform teams and that will take time. > By the time these follow through it is very likely that the decision to use > Beam would have fallen in favor of other solutions (i.e. Kafka Streams, > Camel and others) and this would be a shame in my view. It is very unlikely > that once taken this decision would be reversed for a long time. > > Given that a Flink cluster is not an option for me at this point I have > been trying to push a solution where instances of a Beam pipeline are run > "disconnected" using a DIrectRunner or (better even) a FlinkRunner in local > standalone mode (as you're suggesting) - and like you suggested we are > running those using a K8s deployment which allows us to scale up a required. > The issue is if more than one pod attempts to run the pipeline - they will > not split the partitions between them but rather each would consume ALL > partitions and the output would include as many duplications as the number > of pods. This solution will then not be able to scale up horizontally. > > That is exactly why I'm trying to suggest using consumer groups. > In this attempt I created - here > <https://github.com/srfrnk/demo/blob/cg-reader/beam-kafka2file/src/main/java/kafka/Read.java> > - I've already shown it is possible (albeit I admit with limitations such > as you described) to use consumer groups and effectively allow our use case > to run on a scaled up K8s deployment of DirectRunners. > > And again finally my question is why should Kafka be treated differently > from other messaging systems like SQS and PubSub for which it seems Beam > does not attempt to manage the distribution strategy as well the mechanism > for managing processed (committed) messages? > > If Beam is able to perform as well with them managing these couldn't the > same be applied to Kafka? > > Cheers, > Shahar. > > ------------------------------ > > Shahar Frank > > srf...@gmail.com > > +447799561438 > > ------------------------------ > > > > > > On Wed, 19 Apr 2023 at 13:19, Daniel Collins <dpcoll...@google.com> wrote: > >> Hello, >> >> I was specifically responding to quoted points from your last email. I >> really don't understand why you, as a user, care if the implementation of >> the framework is using consumer groups or not as long as it has the >> throughput you need and is correct. If there is something specific this >> would be useful for, like monitoring or metrics, it seems a reasonable >> feature request to me to ask to reflect the progress state in a kafka >> consumer group, but not to use the underlying assignment mechanism for the >> reasons stated above. >> >> Per: "why Beam should recommend using a distributed processing framework" >> >> If you want to run in a single machine application mode, you can try >> setting the `flinkMaster` parameter to `[local]`, which should launch an >> inline flink runner just for your pipeline. If you want to have a scaling >> out cluster per-application, you can launch a repeatable flink cluster with >> kubernetes on a per-application basis pretty easily. >> >> -Daniel >> >> On Wed, Apr 19, 2023 at 8:11 AM Shahar Frank <srf...@gmail.com> wrote: >> >>> Hi Daniel, >>> >>> I think you missed my last email that deals exactly with what you just >>> commented. >>> >>> I can send it again if you can't find it >>> >>> Shahar. >>> >>> >>> On Wed, Apr 19, 2023, 13:07 Daniel Collins <dpcoll...@google.com> wrote: >>> >>>>> The real question I feel is why should there not be an option with >>>> Beam to use a recommended best practice (by another apache project in fact) >>>> when connecting to an external system? >>>> >>>> You ignored my previous answer. This is not a "best practice" for >>>> streaming frameworks, only for applications, and is not used by other >>>> frameworks *including kafka streams*. >>>> >>>>> The same as when connecting to SQS and PubSub should also be >>>> implemented with Kafka I think. >>>> >>>> This makes it much harder to implement a consistent watermark and >>>> involves either expensive secondary processing (Pub/Sub has a whole second >>>> tracking subscription) or incorrectness of the watermark bounds when >>>> reading from backlog. This makes windowing more likely to be incorrect. >>>> >>>>> the user should be allowed the option of using a mechanism that is >>>> part of the system being connected if willing to accept the implication it >>>> has. >>>> >>>> This requires a complete rewrite of how KafkaIO operates, so its not as >>>> easy as "flip a switch". >>>> >>>>> And then the real question behind that would be - is there anything >>>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO? >>>> >>>> You haven't answered my original question: Why do you care if it uses >>>> consumer groups or not? >>>> >>>> On Wed, Apr 19, 2023 at 3:46 AM Shahar Frank <srf...@gmail.com> wrote: >>>> >>>>> Thanks Kenn, >>>>> >>>>> I agree with your comments fully. >>>>> I would rather use a Flink cluster even with the simple use case we >>>>> have now. >>>>> >>>>> Sadly sometimes we have other constraints - especially in larger >>>>> corporations like the one I work for - which make it harder to create and >>>>> maintain these without real reasons. >>>>> My intention is to introduce Apache Beam right now to allow for future >>>>> growth opportunities such as you described but to do that I have to >>>>> convince others that it is the best thing to do right now. >>>>> >>>>> It so happens that for the use case we are facing now perhaps Apache >>>>> Beam is too much and it looks like Kafka Streams would allow us to avoid >>>>> having to maintain another infrastructure cluster such as Flink. >>>>> >>>>> I would prefer to be able to propose a way we can use Beam right now >>>>> without the added overhead of a Flink/Spark cluster so that I can convince >>>>> the teams that it is a viable option right now. >>>>> The alternative of switching to Beam once more requirements arrive >>>>> would be much less preferable as this is likely to never gather enough >>>>> momentum for a full refactoring. >>>>> >>>>> Finally I feel the question that really needs to be asked is not why >>>>> Beam should recommend using a distributed processing framework which >>>>> totally makes sense and not even why it may require that in some use >>>>> cases. >>>>> >>>>> The real question I feel is why should there not be an option with Beam >>>>> to use a recommended best practice (by another apache project in fact) >>>>> when >>>>> connecting to an external system? >>>>> The same as when connecting to SQS and PubSub should also be >>>>> implemented with Kafka I think. >>>>> If we think the existing connector has advantages in some use cases >>>>> then by all means it should still exist as an option however I feel the >>>>> user should be allowed the option of using a mechanism that is part of the >>>>> system being connected if willing to accept the implication it has. >>>>> >>>>> And then the real question behind that would be - is there anything >>>>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO? >>>>> >>>>> Cheers, >>>>> Shahar. >>>>> >>>>> ------------------------------ >>>>> >>>>> Shahar Frank >>>>> >>>>> srf...@gmail.com >>>>> >>>>> +447799561438 <+44%207799%20561438> >>>>> >>>>> ------------------------------ >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, 17 Apr 2023 at 19:15, Kenneth Knowles <k...@apache.org> wrote: >>>>> >>>>>> Interesting discussion! I don't have a lot of expertise in some of >>>>>> these details but I wanted to just add one little comment. >>>>>> >>>>>> On Sat, Apr 15, 2023 at 10:40 PM Shahar Frank <srf...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Regarding horizontal scaling I may have not been clear about what I >>>>>>> mean. >>>>>>> With Kafka Streams I can just run the same app (e.g. with a K8s >>>>>>> Deployment) on as many pods I need and with Kafka Consumer Groups >>>>>>> managing >>>>>>> the distribution of data all would work well. >>>>>>> Moreover I can create more pods than the number of partitions and >>>>>>> keep them idle so that if/when some crash others pick the slack >>>>>>> quicker. And >>>>>>> this would all be managed by the Kafka consumer group coordinator. >>>>>>> If I do the same with an Apache Beam application - for example with a >>>>>>> Direct Runner or a Flink Runner running in "local" mode - each instance >>>>>>> will consume the entire topic as it is unaware of the other instances. >>>>>>> To >>>>>>> work I will be required to use something like a Flink runner with a full >>>>>>> fledged Flink cluster. This is a disadvantage for beam in simpler use >>>>>>> cases >>>>>>> where maintaining such an additional cluster is not required for the >>>>>>> actual >>>>>>> functionality (e.g. just for filtering) and incurs costs not everyone is >>>>>>> willing to make. >>>>>>> >>>>>> >>>>>> Horizontal scaling is built into Beam. It all occurs within a single >>>>>> pipeline. You should not try to scale up by running multiple pipelines >>>>>> consuming from the same consumer group. Running your Beam pipeline on a >>>>>> Flink cluster (or any other distributed runner) is the intended way to >>>>>> achieve horizontal scaling. Beam has a very sophisticated (perhaps "the >>>>>> most" sophisticated) sharding and work balancing model, for distributing >>>>>> shards of work across workers. So it is by design that Beam is aware of >>>>>> the >>>>>> sharding, but also does its own thing above and beyond. It is true that >>>>>> if >>>>>> you have a very simple use case then Beam could be more than you need. >>>>>> Beam >>>>>> is very general purpose. Of course, simple uses grow into more complex >>>>>> uses >>>>>> :-) and you might end up stuck and/or porting to a distributed processing >>>>>> engine after all. It all depends on what you are trying to do and what >>>>>> you >>>>>> might do in the future! >>>>>> >>>>>> Kenn >>>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> Kafka Streams would not run on a Kafka cluster that is true - I was >>>>>>> not saying it would. It might be run on a K8s cluster which is also how >>>>>>> I >>>>>>> might be running K8s itself and any Flink/Spark cluster I might need. >>>>>>> While >>>>>>> Flink can run in a standalone as well as cluster mode... as I said >>>>>>> earlier >>>>>>> in this use case when Flink is used in standalone mode the use case >>>>>>> fails - >>>>>>> which is my issue, as the same will work with Kafka Streams. >>>>>>> >>>>>>> Kafka Streams can only connect to Kafka - exactly why I would prefer >>>>>>> to use Beam but to do that I need to show that we have no disadvantages >>>>>>> for >>>>>>> the initial use case. >>>>>>> I'm also very aware of the other benefits of Beam - being runner >>>>>>> agnostic, language agnostic, source and sink agostic etc. which is why I >>>>>>> would very much like to use it right now. >>>>>>> Sadly right now if we are unable to scale horizontally without >>>>>>> maintaining another cluster - i.e. Flink or Spark - this is a major >>>>>>> disadvantage which might drive us to use Kafka Streams instead. >>>>>>> >>>>>>> I have opened a bug in Github >>>>>>> <https://github.com/apache/beam/issues/25978>for the issue. I can >>>>>>> edit that to be more specific to documentation however I feel this >>>>>>> would be >>>>>>> a huge miss for the project. >>>>>>> Also happy to open the same in JIRA if you can point me to where in >>>>>>> JIRA I can do that. >>>>>>> I couldn't find that on the contact-us >>>>>>> <https://beam.apache.org/community/contact-us/> page - it actually >>>>>>> points you to create an issue in Github which is what I did. >>>>>>> >>>>>>> Finally I've been looking into other stream consumers in Beam IO. >>>>>>> >>>>>>> I might be missing something but I think SqsIO seems to not be >>>>>>> managing the same - but rather expects SQS to manage data distribution >>>>>>> between consumers: >>>>>>> >>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660 >>>>>>> >>>>>>> I think same goes for PubsubIO which I can't see where it tries to >>>>>>> set any control over which part of the stream would be managed by which >>>>>>> consumer but lets the Pubsub service do that instead: >>>>>>> >>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726 >>>>>>> >>>>>>> I would ask why should Kafka not be considered a "streaming service" >>>>>>> just like SQS and Pubsub and allow it to manage data distribution just >>>>>>> like >>>>>>> they would? >>>>>>> >>>>>>> Cheers, >>>>>>> Shahar. >>>>>>> >>>>>>> ------------------------------ >>>>>>> >>>>>>> Shahar Frank >>>>>>> >>>>>>> srf...@gmail.com >>>>>>> >>>>>>> +447799561438 <+44%207799%20561438> >>>>>>> >>>>>>> ------------------------------ >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dpcoll...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>>> the best practice recommended to consume for Kafka >>>>>>>> >>>>>>>> This is when using the consumer API directly- when using a framework >>>>>>>> like beam or even kafka streams (which actually doesn't use the >>>>>>>> consumer >>>>>>>> group assignment mechanism, see here >>>>>>>> https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks) >>>>>>>> you should rely on the framework's partition to task assignment >>>>>>>> mechanism. >>>>>>>> >>>>>>>>> all observability tools around Kafka use consumer group lag which >>>>>>>> is going to be unavailable if consumer groups are not being used >>>>>>>> >>>>>>>> This is a valid feature request, to reflect the underlying progress >>>>>>>> in offset commits, filing a feature request on JIRA would be the best >>>>>>>> way >>>>>>>> to get this prioritized. >>>>>>>> >>>>>>>>> Kafka Streams allows me to horizontally scale by adding as many >>>>>>>> instances of my application as I need and relies on Kafka to manage >>>>>>>> distribution by using consumer groups. >>>>>>>> >>>>>>>> No it doesn't, see above. Beam al [message truncated...]