Can't agree with Shahar Frank more 

On 2023/04/19 18:17:15 Shahar Frank wrote:
> Hi Daniel,
> 
> I think I've already answered these in a previous email but let me answer
> them again.
> 
> I was specifically responding to quoted points from your last email. I
> > really don't understand why you, as a user, care if the implementation of
> > the framework is using consumer groups or not as long as it has the
> > throughput you need and is correct. If there is something specific this
> > would be useful for, like monitoring or metrics, it seems a reasonable
> > feature request to me to ask to reflect the progress state in a kafka
> > consumer group, but not to use the underlying assignment mechanism for the
> > reasons stated above.
> >
> 
> I do care for a couple of reasons:
> 1) Introducing risk with a technology that non one knows in the company vs.
> a technology people know and trust (i.e. Kafka Consumer Groups)
> 2) A multitude of alerting, monitoring and other observability tools that
> are using consumer groups will not be usable and new solutions would be
> required
> 3) Expert knowhow on managing (and sometimes fixing) issues with Kafka in
> the company will become useless - and this in turn introduces risk to
> projects
> 
> If you want to run in a single machine application mode, you can try
> > setting the `flinkMaster` parameter to `[local]`, which should launch an
> > inline flink runner just for your pipeline. If you want to have a scaling
> > out cluster per-application, you can launch a repeatable flink cluster with
> > kubernetes on a per-application basis pretty easily.
> 
> 
> I do agree that a Flink cluster is a great solution and have maintained a
> few in my time.
> Sadly in our use case I have to consider constraints set by security and
> platform teams and that will take time.
> By the time these follow through it is very likely that the decision to use
> Beam would have fallen in favor of other solutions (i.e. Kafka Streams,
> Camel and others) and this would be a shame in my view. It is very unlikely
> that once taken this decision would be reversed for a long time.
> 
> Given that a Flink cluster is not an option for me at this point I have
> been trying to push a solution where instances of a Beam pipeline are run
> "disconnected" using a DIrectRunner or (better even) a FlinkRunner in local
> standalone mode (as you're suggesting) - and like you suggested we are
> running those using a K8s deployment which allows us to scale up a required.
> The issue is if more than one pod attempts to run the pipeline - they will
> not split the partitions between them but rather each would consume ALL
> partitions and the output would include as many duplications as the number
> of pods. This solution will then not be able to scale up horizontally.
> 
> That is exactly why I'm trying to suggest using consumer groups.
> In this attempt I created - here
> <https://github.com/srfrnk/demo/blob/cg-reader/beam-kafka2file/src/main/java/kafka/Read.java>
> -  I've already shown it is possible (albeit I admit with limitations such
> as you described) to use consumer groups and effectively allow our use case
> to run on a scaled up K8s deployment of DirectRunners.
> 
> And again finally my question is why should Kafka be treated differently
> from other messaging systems like SQS and PubSub for which it seems Beam
> does not attempt to manage the distribution strategy as well the mechanism
> for managing processed (committed) messages?
> 
> If Beam is able to perform as well with them managing these couldn't the
> same be applied to Kafka?
> 
> Cheers,
> Shahar.
> 
> ------------------------------
> 
> Shahar Frank
> 
> srf...@gmail.com
> 
> +447799561438
> 
> ------------------------------
> 
> 
> 
> 
> 
> On Wed, 19 Apr 2023 at 13:19, Daniel Collins <dpcoll...@google.com> wrote:
> 
> > Hello,
> >
> > I was specifically responding to quoted points from your last email. I
> > really don't understand why you, as a user, care if the implementation of
> > the framework is using consumer groups or not as long as it has the
> > throughput you need and is correct. If there is something specific this
> > would be useful for, like monitoring or metrics, it seems a reasonable
> > feature request to me to ask to reflect the progress state in a kafka
> > consumer group, but not to use the underlying assignment mechanism for the
> > reasons stated above.
> >
> > Per: "why Beam should recommend using a distributed processing framework"
> >
> > If you want to run in a single machine application mode, you can try
> > setting the `flinkMaster` parameter to `[local]`, which should launch an
> > inline flink runner just for your pipeline. If you want to have a scaling
> > out cluster per-application, you can launch a repeatable flink cluster with
> > kubernetes on a per-application basis pretty easily.
> >
> > -Daniel
> >
> > On Wed, Apr 19, 2023 at 8:11 AM Shahar Frank <srf...@gmail.com> wrote:
> >
> >> Hi Daniel,
> >>
> >> I think you missed my last email that deals exactly with what you just
> >> commented.
> >>
> >> I can send it again if you can't find it
> >>
> >> Shahar.
> >>
> >>
> >> On Wed, Apr 19, 2023, 13:07 Daniel Collins <dpcoll...@google.com> wrote:
> >>
> >>> > The real question I feel is why should there not be an option with
> >>> Beam to use a recommended best practice (by another apache project in 
> >>> fact)
> >>> when connecting to an external system?
> >>>
> >>> You ignored my previous answer. This is not a "best practice" for
> >>> streaming frameworks, only for applications, and is not used by other
> >>> frameworks *including kafka streams*.
> >>>
> >>> > The same as when connecting to SQS and PubSub should also be
> >>> implemented with Kafka I think.
> >>>
> >>> This makes it much harder to implement a consistent watermark and
> >>> involves either expensive secondary processing (Pub/Sub has a whole second
> >>> tracking subscription) or incorrectness of the watermark bounds when
> >>> reading from backlog. This makes windowing more likely to be incorrect.
> >>>
> >>> > the user should be allowed the option of using a mechanism that is
> >>> part of the system being connected if willing to accept the implication it
> >>> has.
> >>>
> >>> This requires a complete rewrite of how KafkaIO operates, so its not as
> >>> easy as "flip a switch".
> >>>
> >>> >  And then the real question behind that would be - is there anything
> >>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
> >>>
> >>> You haven't answered my original question: Why do you care if it uses
> >>> consumer groups or not?
> >>>
> >>> On Wed, Apr 19, 2023 at 3:46 AM Shahar Frank <srf...@gmail.com> wrote:
> >>>
> >>>> Thanks Kenn,
> >>>>
> >>>> I agree with your comments fully.
> >>>> I would rather use a Flink cluster even with the simple use case we
> >>>> have now.
> >>>>
> >>>> Sadly sometimes we have other constraints - especially in larger
> >>>> corporations like the one I work for - which make it harder to create and
> >>>> maintain these without real reasons.
> >>>> My intention is to introduce Apache Beam right now to allow for future
> >>>> growth opportunities such as you described but to do that I have to
> >>>> convince others that it is the best thing to do right now.
> >>>>
> >>>> It so happens that for the use case we are facing now perhaps Apache
> >>>> Beam is too much and it looks like Kafka Streams would allow us to avoid
> >>>> having to maintain another infrastructure cluster such as Flink.
> >>>>
> >>>> I would prefer to be able to propose a way we can use Beam right now
> >>>> without the added overhead of a Flink/Spark cluster so that I can 
> >>>> convince
> >>>> the teams that it is a viable option right now.
> >>>> The alternative of switching to Beam once more requirements arrive
> >>>> would be much less preferable as this is likely to never gather enough
> >>>> momentum for a full refactoring.
> >>>>
> >>>> Finally I feel the question that really needs to be asked is not why
> >>>> Beam should recommend using a distributed processing framework which
> >>>> totally makes sense and not even why it may require that in some use 
> >>>> cases.
> >>>>
> >>>> The real question I feel is why should there not be an option with Beam
> >>>> to use a recommended best practice (by another apache project in fact) 
> >>>> when
> >>>> connecting to an external system?
> >>>> The same as when connecting to SQS and PubSub should also be
> >>>> implemented with Kafka I think.
> >>>> If we think the existing connector has advantages in some use cases
> >>>> then by all means it should still exist as an option however I feel the
> >>>> user should be allowed the option of using a mechanism that is part of 
> >>>> the
> >>>> system being connected if willing to accept the implication it has.
> >>>>
> >>>> And then the real question behind that would be - is there anything
> >>>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
> >>>>
> >>>> Cheers,
> >>>> Shahar.
> >>>>
> >>>> ------------------------------
> >>>>
> >>>> Shahar Frank
> >>>>
> >>>> srf...@gmail.com
> >>>>
> >>>> +447799561438 <+44%207799%20561438>
> >>>>
> >>>> ------------------------------
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Mon, 17 Apr 2023 at 19:15, Kenneth Knowles <k...@apache.org> wrote:
> >>>>
> >>>>> Interesting discussion! I don't have a lot of expertise in some of
> >>>>> these details but I wanted to just add one little comment.
> >>>>>
> >>>>> On Sat, Apr 15, 2023 at 10:40 PM Shahar Frank <srf...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Regarding horizontal scaling I may have not been clear about what I
> >>>>>> mean.
> >>>>>> With Kafka Streams I can just run the same app (e.g. with a K8s
> >>>>>> Deployment) on as many pods I need and with Kafka Consumer Groups 
> >>>>>> managing
> >>>>>> the distribution of data all would work well.
> >>>>>> Moreover I can create more pods than the number of partitions and
> >>>>>> keep them idle so that if/when some crash others pick the slack 
> >>>>>> quicker. And
> >>>>>> this would all be managed by the Kafka consumer group coordinator.
> >>>>>> If I do the same with an Apache Beam application - for example with a
> >>>>>> Direct Runner or a Flink Runner running in "local" mode - each instance
> >>>>>> will consume the entire topic as it is unaware of the other instances. 
> >>>>>> To
> >>>>>> work I will be required to use something like a Flink runner with a 
> >>>>>> full
> >>>>>> fledged Flink cluster. This is a disadvantage for beam in simpler use 
> >>>>>> cases
> >>>>>> where maintaining such an additional cluster is not required for the 
> >>>>>> actual
> >>>>>> functionality (e.g. just for filtering) and incurs costs not everyone 
> >>>>>> is
> >>>>>> willing to make.
> >>>>>>
> >>>>>
> >>>>> Horizontal scaling is built into Beam. It all occurs within a single
> >>>>> pipeline. You should not try to scale up by running multiple pipelines
> >>>>> consuming from the same consumer group. Running your Beam pipeline on a
> >>>>> Flink cluster (or any other distributed runner) is the intended way to
> >>>>> achieve horizontal scaling. Beam has a very sophisticated (perhaps "the
> >>>>> most" sophisticated) sharding and work balancing model, for distributing
> >>>>> shards of work across workers. So it is by design that Beam is aware of 
> >>>>> the
> >>>>> sharding, but also does its own thing above and beyond. It is true that 
> >>>>> if
> >>>>> you have a very simple use case then Beam could be more than you need. 
> >>>>> Beam
> >>>>> is very general purpose. Of course, simple uses grow into more complex 
> >>>>> uses
> >>>>> :-) and you might end up stuck and/or porting to a distributed 
> >>>>> processing
> >>>>> engine after all. It all depends on what you are trying to do and what 
> >>>>> you
> >>>>> might do in the future!
> >>>>>
> >>>>> Kenn
> >>>>>
> >>>>>
> >>>>>
> >>>>>>
> >>>>>> Kafka Streams would not run on a Kafka cluster that is true - I was
> >>>>>> not saying it would. It might be run on a K8s cluster which is also 
> >>>>>> how I
> >>>>>> might be running K8s itself and any Flink/Spark cluster I might need. 
> >>>>>> While
> >>>>>> Flink can run in a standalone as well as cluster mode... as I said 
> >>>>>> earlier
> >>>>>> in this use case when Flink is used in standalone mode the use case 
> >>>>>> fails -
> >>>>>> which is my issue, as the same will work with Kafka Streams.
> >>>>>>
> >>>>>> Kafka Streams can only connect to Kafka - exactly why I would prefer
> >>>>>> to use Beam but to do that I need to show that we have no 
> >>>>>> disadvantages for
> >>>>>> the initial use case.
> >>>>>> I'm also very aware of the other benefits of Beam - being runner
> >>>>>> agnostic, language agnostic, source and sink agostic etc. which is why 
> >>>>>> I
> >>>>>> would very much like to use it right now.
> >>>>>> Sadly right now if we are unable to scale horizontally without
> >>>>>> maintaining another cluster - i.e. Flink or Spark - this is a major
> >>>>>> disadvantage which might drive us to use Kafka Streams instead.
> >>>>>>
> >>>>>> I have opened a bug in Github
> >>>>>> <https://github.com/apache/beam/issues/25978>for the issue. I can
> >>>>>> edit that to be more specific to documentation however I feel this 
> >>>>>> would be
> >>>>>> a huge miss for the project.
> >>>>>> Also happy to open the same in JIRA if you can point me to where in
> >>>>>> JIRA I can do that.
> >>>>>> I couldn't find that on the contact-us
> >>>>>> <https://beam.apache.org/community/contact-us/> page - it actually
> >>>>>> points you to create an issue in Github which is what I did.
> >>>>>>
> >>>>>> Finally I've been looking into other stream consumers in Beam IO.
> >>>>>>
> >>>>>> I might be missing something but I think SqsIO seems to not be
> >>>>>> managing the same - but rather expects SQS to manage data distribution
> >>>>>> between consumers:
> >>>>>>
> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660
> >>>>>>
> >>>>>> I think same goes for PubsubIO which I can't see where it tries to
> >>>>>> set any control over which part of the stream would be managed by which
> >>>>>> consumer but lets the Pubsub service do that instead:
> >>>>>>
> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726
> >>>>>>
> >>>>>> I would ask why should Kafka not be considered a "streaming service"
> >>>>>> just like SQS and Pubsub and allow it to manage data distribution just 
> >>>>>> like
> >>>>>> they would?
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Shahar.
> >>>>>>
> >>>>>> ------------------------------
> >>>>>>
> >>>>>> Shahar Frank
> >>>>>>
> >>>>>> srf...@gmail.com
> >>>>>>
> >>>>>> +447799561438 <+44%207799%20561438>
> >>>>>>
> >>>>>> ------------------------------
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dpcoll...@google.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> > the best practice recommended to consume for Kafka
> >>>>>>>
> >>>>>>> This is when using the consumer API directly- when using a framework
> >>>>>>> like beam or even kafka streams (which actually doesn't use the 
> >>>>>>> consumer
> >>>>>>> group assignment mechanism, see here
> >>>>>>> https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks)
> >>>>>>> you should rely on the framework's partition to task assignment 
> >>>>>>> mechanism.
> >>>>>>>
> >>>>>>> > all observability tools around Kafka use consumer group lag which
> >>>>>>> is going to be unavailable if consumer groups are not being used
> >>>>>>>
> >>>>>>> This is a valid feature request, to reflect the underlying progress
> >>>>>>> in offset commits, filing a feature request on JIRA would be the best 
> >>>>>>> way
> >>>>>>> to get this prioritized.
> >>>>>>>
> >>>>>>> > Kafka Streams allows me to horizontally scale by adding as many
> >>>>>>> instances of my application as I need and relies on Kafka to manage
> >>>>>>> distribution by using consumer groups.
> >>>>>>>
> >>>>>>> No it doesn't, see above. Beam allows you to scale out horizontally
> >>>>>>> as well, but neither allows you to scale the source horizontally 
> >>>>>>> beyond the
> >>>>>>> number of partitions on the topic.
> >>>>>>>
> >>>>>>> > I'm required to maintain another distributed processing cluster
> >>>>>>> like Spark or Flink (on top of a Kafka cluster I already have)
> >>>>>>>
> >>>>>>> Kafka streams does not run on your kafka cluster. It is a separate
> >>>>>>> runtime that you need to turn up and run jobs for separately, same as
> >>>>>>> flink. The only difference is that, effectively, you can only run 
> >>>>>>> kafka
> >>>>>>> streams in application mode while flink can also be run in 
> >>>>>>> session/cluster
> >>>>>>> mode.
> >>>>>>>
> >>>>>>> The main downside of Kafka streams is that it can only be used
> >>>>>>> talking with kafka. If you ever want to read batch data or from/to 
> >>>>>>> another
> >>>>>>> streaming system you cannot reuse your existing code/architecture and 
> >>>>>>> need
> >>>>>>> to rewrite everything. One advantage of decoupled frameworks like 
> >>>>>>> beam (or
> >>>>>>> flink or spark) is that the same pipeline and code can be reused for
> >>>>>>> various data sources and sinks, and they come with a library of 
> >>>>>>> prebuilt
> >>>>>>> ones.
> >>>>>>>
> >>>>>>> > documentation
> >>>>>>>
> >>>>>>> If the documentation misleadingly suggests that you can set group.id,
> >>>>>>> and doing so does not make upstream offset commits to allow you to 
> >>>>>>> access
> >>>>>>> metrics, please file a bug on JIRA so either the documentation or
> >>>>>>> implementation can be corrected.
> >>>>>>>
> >>>>>>> -Daniel
> >>>>>>>
> >>>>>>> On Sat, Apr 15, 2023 at 1:37 PM Shahar Frank <srf...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for the information Daniel,
> >>>>>>>>
> >>>>>>>> For one - Kafka Consumer Groups is the best practice recommended to
> >>>>>>>> consume for Kafka AFAIK and I would prefer to be able to use that.
> >>>>>>>> Also all observability tools around Kafka use consumer group lag
> >>>>>>>> which is going to be unavailable if consumer groups are not being 
> >>>>>>>> used.
> >>>>>>>>
> >>>>>>>> Finally in my use case I'm asked to evaluate using Apache Beam vs.
> >>>>>>>> Kafka Streams.
> >>>>>>>> Kafka Streams allows me to horizontally scale by adding as many
> >>>>>>>> instances of my application as I need and relies on Kafka to manage
> >>>>>>>> distribution by using consumer groups.
> >>>>>>>> With Apache Beam I'm required to maintain another distributed
> >>>>>>>> processing cluster like Spark or Flink (on top of a Kafka cluster I 
> >>>>>>>> already
> >>>>>>>> have) to be able to do the same.
> >>>>>>>> To be clear in this use case there is no need for an additional
> >>>>>>>> cluster except for consumer groups not being used.
> >>>>>>>> This constitutes a disadvantage over Kafka Streams and other
> >>>>>>>> solutions that use consumer groups.
> >>>>>>>>
> >>>>>>>> Furthermore if this use case is not supported I would imagine the
> >>>>>>>> documentation would mention that or at least not imply to the 
> >>>>>>>> contrary.
> >>>>>>>> In the latest version of the documentation for KafkaIO
> >>>>>>>> <https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html>
> >>>>>>>>  it
> >>>>>>>> shows an example for configuring to use a consumer group while in 
> >>>>>>>> fact this
> >>>>>>>> settings will not be doing anything of the sort:
> >>>>>>>> [image: image.png]
> >>>>>>>> And:
> >>>>>>>> [image: image.png]
> >>>>>>>>
> >>>>>>>> It seems like this has already been raised in the past - e.g. here
> >>>>>>>> <https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id>
> >>>>>>>>  -
> >>>>>>>> so I'm probably not the first person to be confused about that.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Shahar.
> >>>>>>>>
> >>>>>>>> ------------------------------
> >>>>>>>>
> >>>>>>>> Shahar Frank
> >>>>>>>>
> >>>>>>>> srf...@gmail.com
> >>>>>>>>
> >>>>>>>> +447799561438 <+44%207799%20561438>
> >>>>>>>>
> >>>>>>>> ------------------------------
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev <
> >>>>>>>> dev@beam.apache.org> wrote:
> >>>>>>>>
> >>>>>>>>> Why do you want to use a consumer group? If you have consumers in
> >>>>>>>>> other jobs, your beam job will fail to receive all messages it 
> >>>>>>>>> should for
> >>>>>>>>> the topic.
> >>>>>>>>>
> >>>>>>>>> > It seems the code attempts to circumvent the partition
> >>>>>>>>> assignment mechanism provided by Kafka to use it's own.
> >>>>>>>>>
> >>>>>>>>> All beam I/Os for partitioned sources do this. They use access to
> >>>>>>>>> the partitioning structure of the underlying system to track their 
> >>>>>>>>> progress
> >>>>>>>>> through each partition and provide feedback for scaling, as well as
> >>>>>>>>> tracking and enforcing exactly once processing semantics. In fact, 
> >>>>>>>>> most
> >>>>>>>>> interops with streaming data processing systems do this, you can 
> >>>>>>>>> see the
> >>>>>>>>> documentation of the flink kafka interop (
> >>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene)
> >>>>>>>>> that it does not use or respect the partition assignments.
> >>>>>>>>>
> >>>>>>>>> > By doing that it prevents the user from using consumer groups.
> >>>>>>>>>
> >>>>>>>>> Again, why do you (as a user) want to use consumer groups? What
> >>>>>>>>> value does it provide you?
> >>>>>>>>>
> >>>>>>>>> -Daniel
> >>>>>>>>>
> >>>>>>>>> On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <srf...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi All,
> >>>>>>>>>>
> >>>>>>>>>> Posting here as suggested here
> >>>>>>>>>> <https://github.com/apache/beam/issues/25978#issuecomment-1508530483>
> >>>>>>>>>> .
> >>>>>>>>>>
> >>>>>>>>>> I'm using KafkaIO to consume events from a Kafka topic.
> >>>>>>>>>> I've added "group.id" to the consumer properties.
> >>>>>>>>>> When running the pipeline I can see this value sent to Kafka in
> >>>>>>>>>> the consumer properties.
> >>>>>>>>>> The consumers created by KafkaIO fail to join the consumer group.
> >>>>>>>>>> Looking into the code I can see that nowhere is the consumer
> >>>>>>>>>> "subscribing" to the topic which is how KafkaConsumer should join a
> >>>>>>>>>> consumer group. It seems the code attempts to circumvent the 
> >>>>>>>>>> partition
> >>>>>>>>>> assignment mechanism provided by Kafka to use it's own.
> >>>>>>>>>> By doing that it prevents the user from using consumer groups.
> >>>>>>>>>> Is that by intention? Is there any reason why the decision to
> >>>>>>>>>> avoid using consumer groups has been taken?
> >>>>>>>>>> I would love to see any documentation about that if possible
> >>>>>>>>>> please.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Shahar.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> 

Reply via email to