Re: [DISCUSS] Contribution of Multi Cluster Kafka Source

2022-06-28 Thread Mason Chen
Hi all,

Thanks for the feedback! I'm adding the users, who responded in the user
mailing list, to this thread.

@Qingsheng - Yes, I would prefer to reuse the existing Kafka connector
module. It makes a lot of sense since the dependencies are the same and the
implementation can also extend and improve some of the test utilities you
have been working on for the FLIP 27 Kafka Source. I will enumerate the
migration steps in the FLIP template.

@Ryan - I don't have a public branch available yet, but I would appreciate
your review on the FLIP design! When the FLIP design is approved by devs
and the community, I can start to commit our implementation to a fork.

@Andrew - Yup, one of the requirements of the connector is to read multiple
clusters within a single source, so it should be able to work well with
your use case.

@Devs - what do I need to get started on the FLIP design? I see the FLIP
template and I have an account (mason6345), but I don't have access to
create a page.

Best,
Mason




On Sun, Jun 26, 2022 at 8:08 PM Qingsheng Ren  wrote:

> Hi Mason,
>
> It sounds like an exciting enhancement to the Kafka source and will
> benefit a lot of users I believe.
>
> Would you prefer to reuse the existing flink-connector-kafka module or
> create a new one for the new multi-cluster feature? Personally I prefer the
> former one because users won’t need to introduce another dependency module
> to their projects in order to use the feature.
>
> Thanks for the effort on this and looking forward to your FLIP!
>
> Best,
> Qingsheng
>
> > On Jun 24, 2022, at 09:43, Mason Chen  wrote:
> >
> > Hi community,
> >
> > We have been working on a Multi Cluster Kafka Source and are looking to
> > contribute it upstream. I've given a talk about the features and design
> at
> > a Flink meetup: https://youtu.be/H1SYOuLcUTI.
> >
> > The main features that it provides is:
> > 1. Reading multiple Kafka clusters within a single source.
> > 2. Adjusting the clusters and topics the source consumes from
> dynamically,
> > without Flink job restart.
> >
> > Some of the challenging use cases that these features solve are:
> > 1. Transparent Kafka cluster migration without Flink job restart.
> > 2. Transparent Kafka topic migration without Flink job restart.
> > 3. Direct integration with Hybrid Source.
> >
> > In addition, this is designed with wrapping and managing the existing
> > KafkaSource components to enable these features, so it can continue to
> > benefit from KafkaSource improvements and bug fixes. It can be considered
> > as a form of a composite source.
> >
> > I think the contribution of this source could benefit a lot of users who
> > have asked in the mailing list about Flink handling Kafka migrations and
> > removing topics in the past. I would love to hear and address your
> thoughts
> > and feedback, and if possible drive a FLIP!
> >
> > Best,
> > Mason
>
>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.0.1 released

2022-06-28 Thread Yang Wang
Thanks Gyula for working on the first patch release for the Flink
Kubernetes Operator project.


Best,
Yang



Gyula Fóra  于2022年6月28日周二 00:22写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink Kubernetes Operator 1.0.1.
>
> The Flink Kubernetes Operator allows users to manage their Apache Flink
> applications and their lifecycle through native k8s tooling like kubectl.
> <
> https://flink.apache.org/news/2022/04/03/release-kubernetes-operator-0.1.0.html
> >
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Official Docker image for Flink Kubernetes Operator applications can be
> found at:
> https://hub.docker.com/r/apache/flink-kubernetes-operator
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351812
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Gyula Fora
>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.0.1 released

2022-06-28 Thread Gyula Fóra
Sorry for the incorrect blogpost link, please ignore that. There is no
blogpbost for 1.0.1 :)

Gyula

On Tue, Jun 28, 2022 at 9:43 AM Yang Wang  wrote:

> Thanks Gyula for working on the first patch release for the Flink
> Kubernetes Operator project.
>
>
> Best,
> Yang
>
>
>
> Gyula Fóra  于2022年6月28日周二 00:22写道:
>
>> The Apache Flink community is very happy to announce the release of Apache
>> Flink Kubernetes Operator 1.0.1.
>>
>> The Flink Kubernetes Operator allows users to manage their Apache Flink
>> applications and their lifecycle through native k8s tooling like kubectl.
>> <
>> https://flink.apache.org/news/2022/04/03/release-kubernetes-operator-0.1.0.html
>> >
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Official Docker image for Flink Kubernetes Operator applications can be
>> found at:
>> https://hub.docker.com/r/apache/flink-kubernetes-operator
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351812
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Gyula Fora
>>
>


how to connect to the flink-state store and use it as cache to serve APIs.

2022-06-28 Thread laxmi narayan
Hi Team,
I am not sure if this is the right use case for the state-store but I
wanted to serve the APIs using queryable-state, what are the different ways
to achieve this ?

I have come across a version where we can use Job_Id to connect to the
state, but is there any other way to expose a specific rest-endpoint etc ?

Any sample example/github link would be nice.



Thank you.


Re: Synchronizing streams in coprocessfunction

2022-06-28 Thread Gopi Krishna M
On Mon, Jun 27, 2022 at 7:47 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Gopi,
>
>
>
> Your use case is a little under-specified to give a specific answer,
> especially to the nature of the two input streams and the way events of
> both streams are correlated (joined):
>
>- *Is your fast-stream keyed?*
>   - If yes: keyed state and timers can be used, otherwise only
>   operator state can be used to buffer events, no timers
>
> Fast stream is keyed. The metadata is a broadcast.

>
>-
>- *Is your metadata-stream keyed?* I.e.
>   - Metadata-stream events are combined only to fast-stream events
>   having the same respective key
>  - Implement a KeyedCoProcessFunction …
>   - Metadata-stream events apply to all fast-stream events
>   irrespective of the key
>  - Implement a KeyedBroadcastProcessFunction (after converting
>  the metadata-stream to a broadcast stream)
>  - Then in the processBroadcastElement function you can iterate
>  over all keys of all state primitives
>   - *None of your streams are keyed?*
>   - That leaves you only the option of using operator state
>  - Current implementation of operator state is not incremental
>  and thus it is completely generated/stored with each state checkpoint
>  - This allows only a moderate number of datapoints in operator
>  state
>
> Correct, the main concern is, there's no flow control that could be
applied here. If fast stream has very high throughput, a delay in the
metadata stream will cause a lot of state to be persisted.

>
>-
>   -
>  -
>   - *Which version of Flink are you using?* Recommendations above
>refer to Flink 1.15.0
>
> 1.14 but I can use the latest version if needed.

>
>-
>
>
>
> Looking forward to your answers (also please go a little more into detail
> of you use case) and follow up questions …
>

Use case: We have a stream of events with a primary key (consider CDC). We
need to store these events sorted by the primary key in a set of files
(consider DeltaLake table). The flink pipeline is used to split the stream
and map it to the corresponding file where the event should go. The events
come in arbitrary order of primary keys and mapping of keys to files can
change over time.

The metadata stream is a list of key ranges regularly updated from the file
system. Event stream is partitioned into multiple tasks (potentially by
primary key), each task gets the metadata via broadcast connected to the
keyed stream. In the CoProcess of connected stream, the file path is
determined using the metadata and then the stream is keyed again using the
filepath to group all events that belong to same file.

Now the problem is, when the job starts, it might take some time to query
the metadata. During this time, a large number of events will be buffered.

Hope this clarifies the use case, if this impl can be simplified, please
let me know.

cdcStream
.keyBy(r -> "primaryKey")
.connect(tableStream.broadcast())
.process(new KeyedCoProcessFunction() {
@Override
public void processElement1(RowData value,
KeyedCoProcessFunction.Context ctx,
Collector out) throws Exception {

}

@Override
public void processElement2(DeltaTable value,
KeyedCoProcessFunction.Context ctx,
Collector out) throws Exception {

}
})
.keyBy(Enriched::getFilePath)
.map(Enriched::getData)
.sink()



>
>
> Greetings
>
>
>
> Thias
>
>
>
>
>
> *From:* Gopi Krishna M 
> *Sent:* Monday, June 27, 2022 3:01 PM
> *To:* Qingsheng Ren 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Synchronizing streams in coprocessfunction
>
>
>
> Thanks Quingsheng, that would definitely work. But I'm unable to figure
> out how I can apply this with CoProcessFunction. One stream is windowed and
> trigger implementation uses the 2nd stream.
>
>
>
> On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren  wrote:
>
> Hi Gopi,
>
> What about using a window with a custom trigger? The window is doing
> nothing but aggregating your input to a collection. The trigger accepts
> metadata from the low input stream so it can fire and purge the window
> (emit all elements in the window to downstream) on arrival of metadata.
>
> Best,
> Qingsheng
>
> > On Jun 27, 2022, at 12:46, Gopi Krishna M 
> wrote:
> >
> > Hi,
> > I've a scenario where I use connected streams where one is a low
> throughput metadata stream and another one is a high throughput data
> stream. I use CoProcessFunction that operates on a data stream with
> behavior controlled by a metadata stream.
> >
> > Is there a way to slow down/pause the high throughput data stream until
> I've received one entry from the metadata stream? It's possible that by the
> time I get the first element from the metadata stream, I might get 1000s of
> items from the data stream. One option is to create a stat

Re: Reply:DelegationTokenManager

2022-06-28 Thread vtygoss
Hi, Gabor Somogyi!


Because my fork repository from apache flink repository was outdated, i did't 
find in time that some subtasks about KerberosDelegationTokenManager had been 
done. 


I accomplished this feature on flink-1.13.5 release used in my scenario, and 
created a jira FLINK-28291 with patch attached. 


If this situation is allowed, please assign to related ones or review this 
patch at your convenience.
If not allowed, please drop this jira. And i am sorry to have caused this 
trouble.




Best Regards!




在 2022年6月21日 16:20,Gabor Somogyi 写道:


Thanks for pinging me!


Yes, this is my main target to finish this feature however there are major code 
parts which are still missing.
Please have a look at the umbrella jira to get better understanding: 
https://issues.apache.org/jira/browse/FLINK-21232
In general it's not advised to use it for production use-cases but only for 
testing and/or reporting bugs in test environments.
When this will be finished the only thing needs to be done as a framework user 
is to implement a new provider
and all the rest (TGT renewal, token obtain, token propagation, etc.) will be 
handled automatically in the right time.


Related questions I've answered them inline with green, hope I've given answers 
to all your questions.
In general we're open to make the design/implementation better so feel free to 
contribute any way.
If you have Spark knowledge that would answer huge amount of questions because 
I'm trying to apply the
knowledge here (and not re-inventing the wheel). Of course there are some minor 
differences because
obviously these are 2 completely different products but the main concept is the 
same.


BR,
G




On Tue, Jun 21, 2022 at 8:43 AM Márton Balassi  wrote:

Hi,


For your information G (ccd) is actively working on this topic. [1] He is in 
the best position to answer your questions as far as I know. :-)


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework



On Tue, Jun 21, 2022 at 8:38 AM vtygoss  wrote:

Hi, flink community!


I don't know much details for KDC. Can different TaskManagers hold different 
tokens? If so, driver and each worker can renew their tokens in their 
respective DelegationTokenManager individually.
Not sure the intention of this question but as a general rule one cluster means 
one active DelegationTokenManager which rules all the TaskManagers.
So TaskManagers are not intended to be shared between 2 DelegationTokenManagers.
 
> Can different TaskManagers hold different tokens?
Physically yes, but it's not intended to do that in the design what we've put 
together.
There is an exotic Hadoop feature which will allow different DTs per TM but 
it's more like
a hack than a real feature (UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION).
> If so, driver and each worker can renew their tokens in their respective 
> DelegationTokenManager individually.
If you ask this I'm not sure whether you understand the whole DT concept.
The main target of DT is that only one entity (this case JobManager) have 
Kerberos credentials, obtains tokens
which then propagates that set of tokens to all TaskManager. TaskManager simply 
doesn't have the right to
obtain any token since that would be a security hole.


Thanks for your any replies. 


Best Regards!




在 2022年6月21日 13:30,vtygoss 写道:


Hi, flink community!


I am trying to do some work on renewing DelegationToken periodically for 
DelegtionTokenManager, and met some problems. 


1. Implementations of DelegationTokenProvider
There seems to be only two implementations for testing defined by SPI 
service: TestDelegationTokenProvider and  
ExceptionThrowingDelegationTokenProvider, no hdfs or hbase. 
That said implementation is ongoing and far not finished. Currently I'm working 
on the HadoopFSDelegationTokenProvider which is half-way implemented.
The implementation is depending on one of my currently open PR and until that 
is not merged I'm fully blocked. If you would like to help I suggest to review
my PRs.


2. RPCGateway
When the renewer thread of DelegationTokenManager in resource manager 
renews tokens of all providers periodically, DelegationTokenManager should 
update the tokens of all taskmanagers by RPCGateway that seems to be the 
appropriate way for now. 
But the registered taskmanagers are managed by TaskExecutorManager in RM which 
DelegationTokenManager don't have the pointer of. 


So, how about using a global context to hold all necessary services, e.g. 
RPCService, TaskExecutorManager or HA? 


In my mind, RPCGateway seems to be a little clunky(just privately) for 
extentions, why not thinking of the async typed message model between driver 
and worker in design? 


At the moment there is no DT propagation to TaskManagers since it's not yet 
implemented. There will be an RPC call which will then transmit all the 
obtained tokens
TaskManagers and all of them will have the same set of DTs. According to the 
plans it wi

Re: how to connect to the flink-state store and use it as cache to serve APIs.

2022-06-28 Thread Hangxiang Yu
Hi, laxmi.
There are two ways that users can access the state store currently:
1. Queryable state [1] which you could access states in runtime.
2. State Processor API [2] which you could access states (snapshot) offline.

But we have marked the Queryable state as "Reaching End-of-Life".
We are also trying to find a graceful and effective way for users to debug
and troubleshoot.
So could you share your case about what you want to use it for ?
Your feedback is important for us to design it in the long term. Thanks!

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/queryable_state/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
[3] https://flink.apache.org/roadmap.html

Best,
Hangxiang.

On Tue, Jun 28, 2022 at 8:26 PM laxmi narayan  wrote:

> Hi Team,
> I am not sure if this is the right use case for the state-store but I
> wanted to serve the APIs using queryable-state, what are the different ways
> to achieve this ?
>
> I have come across a version where we can use Job_Id to connect to the
> state, but is there any other way to expose a specific rest-endpoint etc ?
>
> Any sample example/github link would be nice.
>
>
>
> Thank you.
>