Can't agree with Shahar Frank more 

On 2023/04/19 18:17:15 Shahar Frank wrote:
> Hi Daniel,
> 
> I think I've already answered these in a previous email but let me answer
> them again.
> 
> I was specifically responding to quoted points from your last email. I
>> really don't understand why you, as a user, care if the implementation of
>> the framework is using consumer groups or not as long as it has the
>> throughput you need and is correct. If there is something specific this
>> would be useful for, like monitoring or metrics, it seems a reasonable
>> feature request to me to ask to reflect the progress state in a kafka
>> consumer group, but not to use the underlying assignment mechanism for the
>> reasons stated above.
>> 
> 
> I do care for a couple of reasons:
> 1) Introducing risk with a technology that non one knows in the company vs.
> a technology people know and trust (i.e. Kafka Consumer Groups)
> 2) A multitude of alerting, monitoring and other observability tools that
> are using consumer groups will not be usable and new solutions would be
> required
> 3) Expert knowhow on managing (and sometimes fixing) issues with Kafka in
> the company will become useless - and this in turn introduces risk to
> projects
> 
> If you want to run in a single machine application mode, you can try
>> setting the `flinkMaster` parameter to `[local]`, which should launch an
>> inline flink runner just for your pipeline. If you want to have a scaling
>> out cluster per-application, you can launch a repeatable flink cluster with
>> kubernetes on a per-application basis pretty easily.
> 
> 
> I do agree that a Flink cluster is a great solution and have maintained a
> few in my time.
> Sadly in our use case I have to consider constraints set by security and
> platform teams and that will take time.
> By the time these follow through it is very likely that the decision to use
> Beam would have fallen in favor of other solutions (i.e. Kafka Streams,
> Camel and others) and this would be a shame in my view. It is very unlikely
> that once taken this decision would be reversed for a long time.
> 
> Given that a Flink cluster is not an option for me at this point I have
> been trying to push a solution where instances of a Beam pipeline are run
> "disconnected" using a DIrectRunner or (better even) a FlinkRunner in local
> standalone mode (as you're suggesting) - and like you suggested we are
> running those using a K8s deployment which allows us to scale up a required.
> The issue is if more than one pod attempts to run the pipeline - they will
> not split the partitions between them but rather each would consume ALL
> partitions and the output would include as many duplications as the number
> of pods. This solution will then not be able to scale up horizontally.
> 
> That is exactly why I'm trying to suggest using consumer groups.
> In this attempt I created - here
> <https://github.com/srfrnk/demo/blob/cg-reader/beam-kafka2file/src/main/java/kafka/Read.java>
> -  I've already shown it is possible (albeit I admit with limitations such
> as you described) to use consumer groups and effectively allow our use case
> to run on a scaled up K8s deployment of DirectRunners.
> 
> And again finally my question is why should Kafka be treated differently
> from other messaging systems like SQS and PubSub for which it seems Beam
> does not attempt to manage the distribution strategy as well the mechanism
> for managing processed (committed) messages?
> 
> If Beam is able to perform as well with them managing these couldn't the
> same be applied to Kafka?
> 
> Cheers,
> Shahar.
> 
> ------------------------------
> 
> Shahar Frank
> 
> srf...@gmail.com
> 
> +447799561438
> 
> ------------------------------
> 
> 
> 
> 
> 
> On Wed, 19 Apr 2023 at 13:19, Daniel Collins <dpcoll...@google.com> wrote:
> 
>> Hello,
>> 
>> I was specifically responding to quoted points from your last email. I
>> really don't understand why you, as a user, care if the implementation of
>> the framework is using consumer groups or not as long as it has the
>> throughput you need and is correct. If there is something specific this
>> would be useful for, like monitoring or metrics, it seems a reasonable
>> feature request to me to ask to reflect the progress state in a kafka
>> consumer group, but not to use the underlying assignment mechanism for the
>> reasons stated above.
>> 
>> Per: "why Beam should recommend using a distributed processing framework"
>> 
>> If you want to run in a single machine application mode, you can try
>> setting the `flinkMaster` parameter to `[local]`, which should launch an
>> inline flink runner just for your pipeline. If you want to have a scaling
>> out cluster per-application, you can launch a repeatable flink cluster with
>> kubernetes on a per-application basis pretty easily.
>> 
>> -Daniel
>> 
>> On Wed, Apr 19, 2023 at 8:11 AM Shahar Frank <srf...@gmail.com> wrote:
>> 
>>> Hi Daniel,
>>> 
>>> I think you missed my last email that deals exactly with what you just
>>> commented.
>>> 
>>> I can send it again if you can't find it
>>> 
>>> Shahar.
>>> 
>>> 
>>> On Wed, Apr 19, 2023, 13:07 Daniel Collins <dpcoll...@google.com> wrote:
>>> 
>>>>> The real question I feel is why should there not be an option with
>>>> Beam to use a recommended best practice (by another apache project in fact)
>>>> when connecting to an external system?
>>>> 
>>>> You ignored my previous answer. This is not a "best practice" for
>>>> streaming frameworks, only for applications, and is not used by other
>>>> frameworks *including kafka streams*.
>>>> 
>>>>> The same as when connecting to SQS and PubSub should also be
>>>> implemented with Kafka I think.
>>>> 
>>>> This makes it much harder to implement a consistent watermark and
>>>> involves either expensive secondary processing (Pub/Sub has a whole second
>>>> tracking subscription) or incorrectness of the watermark bounds when
>>>> reading from backlog. This makes windowing more likely to be incorrect.
>>>> 
>>>>> the user should be allowed the option of using a mechanism that is
>>>> part of the system being connected if willing to accept the implication it
>>>> has.
>>>> 
>>>> This requires a complete rewrite of how KafkaIO operates, so its not as
>>>> easy as "flip a switch".
>>>> 
>>>>> And then the real question behind that would be - is there anything
>>>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
>>>> 
>>>> You haven't answered my original question: Why do you care if it uses
>>>> consumer groups or not?
>>>> 
>>>> On Wed, Apr 19, 2023 at 3:46 AM Shahar Frank <srf...@gmail.com> wrote:
>>>> 
>>>>> Thanks Kenn,
>>>>> 
>>>>> I agree with your comments fully.
>>>>> I would rather use a Flink cluster even with the simple use case we
>>>>> have now.
>>>>> 
>>>>> Sadly sometimes we have other constraints - especially in larger
>>>>> corporations like the one I work for - which make it harder to create and
>>>>> maintain these without real reasons.
>>>>> My intention is to introduce Apache Beam right now to allow for future
>>>>> growth opportunities such as you described but to do that I have to
>>>>> convince others that it is the best thing to do right now.
>>>>> 
>>>>> It so happens that for the use case we are facing now perhaps Apache
>>>>> Beam is too much and it looks like Kafka Streams would allow us to avoid
>>>>> having to maintain another infrastructure cluster such as Flink.
>>>>> 
>>>>> I would prefer to be able to propose a way we can use Beam right now
>>>>> without the added overhead of a Flink/Spark cluster so that I can convince
>>>>> the teams that it is a viable option right now.
>>>>> The alternative of switching to Beam once more requirements arrive
>>>>> would be much less preferable as this is likely to never gather enough
>>>>> momentum for a full refactoring.
>>>>> 
>>>>> Finally I feel the question that really needs to be asked is not why
>>>>> Beam should recommend using a distributed processing framework which
>>>>> totally makes sense and not even why it may require that in some use 
>>>>> cases.
>>>>> 
>>>>> The real question I feel is why should there not be an option with Beam
>>>>> to use a recommended best practice (by another apache project in fact) 
>>>>> when
>>>>> connecting to an external system?
>>>>> The same as when connecting to SQS and PubSub should also be
>>>>> implemented with Kafka I think.
>>>>> If we think the existing connector has advantages in some use cases
>>>>> then by all means it should still exist as an option however I feel the
>>>>> user should be allowed the option of using a mechanism that is part of the
>>>>> system being connected if willing to accept the implication it has.
>>>>> 
>>>>> And then the real question behind that would be - is there anything
>>>>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
>>>>> 
>>>>> Cheers,
>>>>> Shahar.
>>>>> 
>>>>> ------------------------------
>>>>> 
>>>>> Shahar Frank
>>>>> 
>>>>> srf...@gmail.com
>>>>> 
>>>>> +447799561438 <+44%207799%20561438>
>>>>> 
>>>>> ------------------------------
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Mon, 17 Apr 2023 at 19:15, Kenneth Knowles <k...@apache.org> wrote:
>>>>> 
>>>>>> Interesting discussion! I don't have a lot of expertise in some of
>>>>>> these details but I wanted to just add one little comment.
>>>>>> 
>>>>>> On Sat, Apr 15, 2023 at 10:40 PM Shahar Frank <srf...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Regarding horizontal scaling I may have not been clear about what I
>>>>>>> mean.
>>>>>>> With Kafka Streams I can just run the same app (e.g. with a K8s
>>>>>>> Deployment) on as many pods I need and with Kafka Consumer Groups 
>>>>>>> managing
>>>>>>> the distribution of data all would work well.
>>>>>>> Moreover I can create more pods than the number of partitions and
>>>>>>> keep them idle so that if/when some crash others pick the slack 
>>>>>>> quicker. And
>>>>>>> this would all be managed by the Kafka consumer group coordinator.
>>>>>>> If I do the same with an Apache Beam application - for example with a
>>>>>>> Direct Runner or a Flink Runner running in "local" mode - each instance
>>>>>>> will consume the entire topic as it is unaware of the other instances. 
>>>>>>> To
>>>>>>> work I will be required to use something like a Flink runner with a full
>>>>>>> fledged Flink cluster. This is a disadvantage for beam in simpler use 
>>>>>>> cases
>>>>>>> where maintaining such an additional cluster is not required for the 
>>>>>>> actual
>>>>>>> functionality (e.g. just for filtering) and incurs costs not everyone is
>>>>>>> willing to make.
>>>>>>> 
>>>>>> 
>>>>>> Horizontal scaling is built into Beam. It all occurs within a single
>>>>>> pipeline. You should not try to scale up by running multiple pipelines
>>>>>> consuming from the same consumer group. Running your Beam pipeline on a
>>>>>> Flink cluster (or any other distributed runner) is the intended way to
>>>>>> achieve horizontal scaling. Beam has a very sophisticated (perhaps "the
>>>>>> most" sophisticated) sharding and work balancing model, for distributing
>>>>>> shards of work across workers. So it is by design that Beam is aware of 
>>>>>> the
>>>>>> sharding, but also does its own thing above and beyond. It is true that 
>>>>>> if
>>>>>> you have a very simple use case then Beam could be more than you need. 
>>>>>> Beam
>>>>>> is very general purpose. Of course, simple uses grow into more complex 
>>>>>> uses
>>>>>> :-) and you might end up stuck and/or porting to a distributed processing
>>>>>> engine after all. It all depends on what you are trying to do and what 
>>>>>> you
>>>>>> might do in the future!
>>>>>> 
>>>>>> Kenn
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> 
>>>>>>> Kafka Streams would not run on a Kafka cluster that is true - I was
>>>>>>> not saying it would. It might be run on a K8s cluster which is also how 
>>>>>>> I
>>>>>>> might be running K8s itself and any Flink/Spark cluster I might need. 
>>>>>>> While
>>>>>>> Flink can run in a standalone as well as cluster mode... as I said 
>>>>>>> earlier
>>>>>>> in this use case when Flink is used in standalone mode the use case 
>>>>>>> fails -
>>>>>>> which is my issue, as the same will work with Kafka Streams.
>>>>>>> 
>>>>>>> Kafka Streams can only connect to Kafka - exactly why I would prefer
>>>>>>> to use Beam but to do that I need to show that we have no disadvantages 
>>>>>>> for
>>>>>>> the initial use case.
>>>>>>> I'm also very aware of the other benefits of Beam - being runner
>>>>>>> agnostic, language agnostic, source and sink agostic etc. which is why I
>>>>>>> would very much like to use it right now.
>>>>>>> Sadly right now if we are unable to scale horizontally without
>>>>>>> maintaining another cluster - i.e. Flink or Spark - this is a major
>>>>>>> disadvantage which might drive us to use Kafka Streams instead.
>>>>>>> 
>>>>>>> I have opened a bug in Github
>>>>>>> <https://github.com/apache/beam/issues/25978>for the issue. I can
>>>>>>> edit that to be more specific to documentation however I feel this 
>>>>>>> would be
>>>>>>> a huge miss for the project.
>>>>>>> Also happy to open the same in JIRA if you can point me to where in
>>>>>>> JIRA I can do that.
>>>>>>> I couldn't find that on the contact-us
>>>>>>> <https://beam.apache.org/community/contact-us/> page - it actually
>>>>>>> points you to create an issue in Github which is what I did.
>>>>>>> 
>>>>>>> Finally I've been looking into other stream consumers in Beam IO.
>>>>>>> 
>>>>>>> I might be missing something but I think SqsIO seems to not be
>>>>>>> managing the same - but rather expects SQS to manage data distribution
>>>>>>> between consumers:
>>>>>>> 
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660
>>>>>>> 
>>>>>>> I think same goes for PubsubIO which I can't see where it tries to
>>>>>>> set any control over which part of the stream would be managed by which
>>>>>>> consumer but lets the Pubsub service do that instead:
>>>>>>> 
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726
>>>>>>> 
>>>>>>> I would ask why should Kafka not be considered a "streaming service"
>>>>>>> just like SQS and Pubsub and allow it to manage data distribution just 
>>>>>>> like
>>>>>>> they would?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Shahar.
>>>>>>> 
>>>>>>> ------------------------------
>>>>>>> 
>>>>>>> Shahar Frank
>>>>>>> 
>>>>>>> srf...@gmail.com
>>>>>>> 
>>>>>>> +447799561438 <+44%207799%20561438>
>>>>>>> 
>>>>>>> ------------------------------
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dpcoll...@google.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>>> the best practice recommended to consume for Kafka
>>>>>>>> 
>>>>>>>> This is when using the consumer API directly- when using a framework
>>>>>>>> like beam or even kafka streams (which actually doesn't use the 
>>>>>>>> consumer
>>>>>>>> group assignment mechanism, see here
>>>>>>>> https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks)
>>>>>>>> you should rely on the framework's partition to task assignment 
>>>>>>>> mechanism.
>>>>>>>> 
>>>>>>>>> all observability tools around Kafka use consumer group lag which
>>>>>>>> is going to be unavailable if consumer groups are not being used
>>>>>>>> 
>>>>>>>> This is a valid feature request, to reflect the underlying progress
>>>>>>>> in offset commits, filing a feature request on JIRA would be the best 
>>>>>>>> way
>>>>>>>> to get this prioritized.
>>>>>>>> 
>>>>>>>>> Kafka Streams allows me to horizontally scale by adding as many
>>>>>>>> instances of my application as I need and relies on Kafka to manage
>>>>>>>> distribution by using consumer groups.
>>>>>>>> 
>>>>>>>> No it doesn't, see above. Beam al
[message truncated...]

Reply via email to