Re: [ANNOUNCE] FRocksDB 6.20.3-ververica-2.0 released

2023-01-31 Thread Leonard Xu
Thanks Yanfei for driving the release ! !


Best,
Leonard

> On Jan 31, 2023, at 3:43 PM, Yun Tang  wrote:
> 
> Thanks Yuanfei for driving the frocksdb release!
> 
> Best
> Yun Tang
> From: Yuan Mei 
> Sent: Tuesday, January 31, 2023 15:09
> To: Jing Ge 
> Cc: Yanfei Lei ; d...@flink.apache.org 
> ; user ; 
> user...@flink.apache.org 
> Subject: Re: [ANNOUNCE] FRocksDB 6.20.3-ververica-2.0 released
>  
> Thanks Yanfei for driving the release!
> 
> Best
> Yuan
> 
> On Mon, Jan 30, 2023 at 8:46 PM Jing Ge via user  > wrote:
> Hi Yanfei,
> 
> Thanks for your effort. Looking forward to checking it.
> 
> Best regards,
> Jing
> 
> On Mon, Jan 30, 2023 at 1:42 PM Yanfei Lei  > wrote:
> It is very happy to announce the release of FRocksDB 6.20.3-ververica-2.0.
> 
> Compiled files for Linux x86, Linux arm, Linux ppc64le, MacOS x86,
> MacOS arm, and Windows are included in FRocksDB 6.20.3-ververica-2.0
> jar, and the FRocksDB in Flink 1.17 would be updated to
> 6.20.3-ververica-2.0.
> 
> Release highlights:
> - [FLINK-30457] Add periodic_compaction_seconds option to RocksJava[1].
> - [FLINK-30321] Upgrade ZLIB of FRocksDB to 1.2.13[2].
> - Avoid expensive ToString() call when not in debug[3].
> - [FLINK-24932] Support build FRocksDB Java on Apple silicon[4].
> 
> Maven artifacts for FRocksDB can be found at:
> https://mvnrepository.com/artifact/com.ververica/frocksdbjni 
> 
> 
> We would like to thank all efforts from the Apache Flink community
> that made this release possible!
> 
> [1] https://issues.apache.org/jira/browse/FLINK-30457 
> 
> [2] https://issues.apache.org/jira/browse/FLINK-30321 
> 
> [3] https://github.com/ververica/frocksdb/pull/55 
> 
> [4] https://issues.apache.org/jira/browse/FLINK-24932 
> 
> 
> Best regards,
> Yanfei
> Ververica(Alibaba)



Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-31 Thread Gyula Fóra
There is also a pending fix for the standalone + k8s HA case :
https://github.com/apache/flink-kubernetes-operator/pull/518

You could maybe try and review the fix :)

Gyula

On Tue, Jan 31, 2023 at 8:36 AM Yang Wang  wrote:

> I assume you are using the standalone mode. Right?
>
> For the native K8s mode, the leader address should be 
> *akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1
> *when HA enabled.
>
>
> Best,
> Yang
>
> Anton Ippolitov via user  于2023年1月31日周二 00:21写道:
>
>> This is actually what I'm already doing, I'm only setting high-availability:
>> kubernetes myself. The other values are either defaults or set by the
>> Operator:
>> - jobmanager.rpc.port: 6123 is the default value (docs
>> 
>> )
>> -  high-availability.jobmanager.port: 6123 is set by the Operator here
>> 
>>
>> - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by the
>> Operator here
>> 
>>  (the
>> actual code which gets executed is here
>> 
>> )
>>
>>  Looking at what the Lyft Operator is doing here
>> ,
>>  I thought
>> this would be a common issue but since you've never seen this error before,
>> not sure what to do 🤔
>>
>> On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra  wrote:
>>
>>> We never encountered this problem before but also we don't configure
>>> those settings.
>>> Can you simply try:
>>>
>>> high-availability: kubernetes
>>>
>>> And remove the other configs? I think that can only cause problems and
>>> should not achieve anything :)
>>>
>>> Gyula
>>>
>>> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
>>> user@flink.apache.org> wrote:
>>>
 Hi everyone,

 I've been experimenting with Kubernetes HA and the Kubernetes Operator
 and ran into the following issue which is happening regularly on
 TaskManagers with Flink 1.16:

 Error while retrieving the leader gateway. Retrying to connect to 
 akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
 org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
 complete the operation. Number of retries has been exhausted.

 (The whole stacktrace is quite long, I put it in a Github Gist here
 .
 Note that I put placeholder values for the Kubernetes Service name and the
 Namespace name)

 The job configuration has the following values which should be relevant:
 high-availability: kubernetes
 high-availability.jobmanager.port: 6123
 jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
 jobmanager.rpc.port: 6123

 Looking a bit more into the logs, I can see that the Akka Actor System
 is started with an external address pointing to the Kubernetes Service
 defined by jobmanager.rpc.address:
 Trying to start actor system, external
 address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address
 0.0.0.0:6123.
 Actor system started at
 akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123

 (I believe the external address for the Akka Actor System is set to
 jobmanager.rpc.address from this place
 
 in the code but I might be wrong)

 I can also see these logs for the Dispatcher RPC endpoint:
 Starting RPC endpoint for
 org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
 akka://flink/user/rpc/dispatcher_1 .
 Successfully wrote leader information
 LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
 leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
 for leader dispatcher into the config map JOB-NAME-HERE-cluster-config-map.

 I confirmed that the HA ConfigMap contains an address which also uses
 the Kubernetes Service defined by jobmanager.rpc.address:
 $ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r

Flink 1.16 arm64/v8 docker images

2023-01-31 Thread Roberts, Ben (Senior Developer) via user
Hi,

Is it possible for the arm64/v8 architecture images to be published for >1.16 
flink-docker (apache/flink)?

I’m aware that the official docker flink image is now published in the arm64 
arch, but that image doesn’t include a JDK, so it’d be super helpful to have 
the apache/flink images published for that arch too.

It looks like it’s listed as a target architecture in the metadata: 
https://github.com/apache/flink-docker/blob/e348fd602cfe038402aeb574d1956762f4175af0/1.16/scala_2.12-java11-ubuntu/release.metadata#L2
But I can’t find it available to pull anywhere in the repo.

Thanks
Ben Roberts

Information in this email including any attachments may be privileged, 
confidential and is intended exclusively for the addressee. The views expressed 
may not be official policy, but the personal views of the originator. If you 
have received it in error, please notify the sender by return e-mail and delete 
it from your system. You should not reproduce, distribute, store, retransmit, 
use or disclose its contents to anyone. Please note we reserve the right to 
monitor all e-mail communication through our internal and external networks. 
SKY and the SKY marks are trademarks of Sky Limited and Sky International AG 
and are used under licence.

Sky UK Limited (Registration No. 2906991), Sky-In-Home Service Limited 
(Registration No. 2067075), Sky Subscribers Services Limited (Registration No. 
2340150) and Sky CP Limited (Registration No. 9513259) are direct or indirect 
subsidiaries of Sky Limited (Registration No. 2247735). All of the companies 
mentioned in this paragraph are incorporated in England and Wales and share the 
same registered office at Grant Way, Isleworth, Middlesex TW7 5QD


Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-31 Thread Anton Ippolitov via user
I am using the Standalone Mode indeed, should've mentioned it right away.
This fix looks exactly like what I need, thank you!!

On Tue, Jan 31, 2023 at 9:16 AM Gyula Fóra  wrote:

> There is also a pending fix for the standalone + k8s HA case :
> https://github.com/apache/flink-kubernetes-operator/pull/518
>
> You could maybe try and review the fix :)
>
> Gyula
>
> On Tue, Jan 31, 2023 at 8:36 AM Yang Wang  wrote:
>
>> I assume you are using the standalone mode. Right?
>>
>> For the native K8s mode, the leader address should be 
>> *akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1
>> *when HA enabled.
>>
>>
>> Best,
>> Yang
>>
>> Anton Ippolitov via user  于2023年1月31日周二 00:21写道:
>>
>>> This is actually what I'm already doing, I'm only setting high-availability:
>>> kubernetes myself. The other values are either defaults or set by the
>>> Operator:
>>> - jobmanager.rpc.port: 6123 is the default value (docs
>>> 
>>> )
>>> -  high-availability.jobmanager.port: 6123 is set by the Operator here
>>> 
>>>
>>> - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by
>>> the Operator here
>>> 
>>>  (the
>>> actual code which gets executed is here
>>> 
>>> )
>>>
>>>  Looking at what the Lyft Operator is doing here
>>> ,
>>>  I thought
>>> this would be a common issue but since you've never seen this error before,
>>> not sure what to do 🤔
>>>
>>> On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra 
>>> wrote:
>>>
 We never encountered this problem before but also we don't configure
 those settings.
 Can you simply try:

 high-availability: kubernetes

 And remove the other configs? I think that can only cause problems and
 should not achieve anything :)

 Gyula

 On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
 user@flink.apache.org> wrote:

> Hi everyone,
>
> I've been experimenting with Kubernetes HA and the Kubernetes Operator
> and ran into the following issue which is happening regularly on
> TaskManagers with Flink 1.16:
>
> Error while retrieving the leader gateway. Retrying to connect to 
> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Number of retries has been exhausted.
>
> (The whole stacktrace is quite long, I put it in a Github Gist here
> .
> Note that I put placeholder values for the Kubernetes Service name and the
> Namespace name)
>
> The job configuration has the following values which should be
> relevant:
> high-availability: kubernetes
> high-availability.jobmanager.port: 6123
> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
> jobmanager.rpc.port: 6123
>
> Looking a bit more into the logs, I can see that the Akka Actor System
> is started with an external address pointing to the Kubernetes Service
> defined by jobmanager.rpc.address:
> Trying to start actor system, external
> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address
> 0.0.0.0:6123.
> Actor system started at
> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123
>
> (I believe the external address for the Akka Actor System is set to
> jobmanager.rpc.address from this place
> 
> in the code but I might be wrong)
>
> I can also see these logs for the Dispatcher RPC endpoint:
> Starting RPC endpoint for
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
> akka://flink/user/rpc/dispatcher_1 .
> Successfully wrote leader information
> LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
> leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
>

Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-31 Thread Gyula Fóra
Thanks @Anton Ippolitov 
At this stage I would highly recommend the native mode if you have the
liberty to try that.
I think that has better production characteristics and will work out of the
box with the autoscaler. (the standalone mode won't)

Gyula

On Tue, Jan 31, 2023 at 10:41 AM Anton Ippolitov <
anton.ippoli...@datadoghq.com> wrote:

> I am using the Standalone Mode indeed, should've mentioned it right away.
> This fix looks exactly like what I need, thank you!!
>
> On Tue, Jan 31, 2023 at 9:16 AM Gyula Fóra  wrote:
>
>> There is also a pending fix for the standalone + k8s HA case :
>> https://github.com/apache/flink-kubernetes-operator/pull/518
>>
>> You could maybe try and review the fix :)
>>
>> Gyula
>>
>> On Tue, Jan 31, 2023 at 8:36 AM Yang Wang 
>> wrote:
>>
>>> I assume you are using the standalone mode. Right?
>>>
>>> For the native K8s mode, the leader address should be 
>>> *akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1
>>> *when HA enabled.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Anton Ippolitov via user  于2023年1月31日周二 00:21写道:
>>>
 This is actually what I'm already doing, I'm only setting 
 high-availability:
 kubernetes myself. The other values are either defaults or set by the
 Operator:
 - jobmanager.rpc.port: 6123 is the default value (docs
 
 )
 -  high-availability.jobmanager.port: 6123 is set by the Operator here
 

 - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by
 the Operator here
 
  (the
 actual code which gets executed is here
 
 )

  Looking at what the Lyft Operator is doing here
 ,
  I thought
 this would be a common issue but since you've never seen this error before,
 not sure what to do 🤔

 On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra 
 wrote:

> We never encountered this problem before but also we don't configure
> those settings.
> Can you simply try:
>
> high-availability: kubernetes
>
> And remove the other configs? I think that can only cause problems and
> should not achieve anything :)
>
> Gyula
>
> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
> user@flink.apache.org> wrote:
>
>> Hi everyone,
>>
>> I've been experimenting with Kubernetes HA and the Kubernetes
>> Operator and ran into the following issue which is happening regularly on
>> TaskManagers with Flink 1.16:
>>
>> Error while retrieving the leader gateway. Retrying to connect to 
>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
>> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
>> complete the operation. Number of retries has been exhausted.
>>
>> (The whole stacktrace is quite long, I put it in a Github Gist here
>> .
>> Note that I put placeholder values for the Kubernetes Service name and 
>> the
>> Namespace name)
>>
>> The job configuration has the following values which should be
>> relevant:
>> high-availability: kubernetes
>> high-availability.jobmanager.port: 6123
>> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
>> jobmanager.rpc.port: 6123
>>
>> Looking a bit more into the logs, I can see that the Akka Actor
>> System is started with an external address pointing to the Kubernetes
>> Service defined by jobmanager.rpc.address:
>> Trying to start actor system, external
>> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address
>> 0.0.0.0:6123.
>> Actor system started at
>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123
>>
>> (I believe the external address for the Akka Actor System is set to
>> jobmanager.rpc.address from this place
>> 
>> in 

Re: beam + flink + k8

2023-01-31 Thread Jan Lukavský

Hi,

can you please share the also the script itself? I'd say that the 
problem is that the flink jobmanager is not accessible through 
localhost:8081, because it runs inside the minikube. You need to expose 
it outside of the minikube via [1], or run the script from pod inside 
the minikube and access job manager via flink-jobmanager:8081. I'm 
surprised that the log didn't make this more obvious, though. Is it 
possible that you changed the default log level to ERROR? Can you try 
DEBUG or similar?


 Jan

[1] https://minikube.sigs.k8s.io/docs/handbook/accessing/

On 1/30/23 18:36, P Singh wrote:

Hi Jan,

Yeah I am using minikube and beam image with python 3.10.

Please find the attached screenshots.



On Mon, 30 Jan 2023 at 21:22, Jan Lukavský  wrote:

Hi,

can you please share the command-line and complete output of the
script?
Are you using minikube? Can you share list of your running pods?

  Jan

On 1/30/23 14:25, P Singh wrote:
> Hi Team,
>
> I am trying to run beam job on top of flink on my local machine
> (kubernetes).
>
>  I have flink 1.14 and beam 2.43 images both running but when i
submit
> the job it's not reaching to the flink cluster and getting
failed with
> below error.
>
> ERROR:apache_beam.utils.subprocess_server:Starting job service with
> ['java', '-jar',
>

'/Users/spsingh/.apache_beam/cache/jars/beam-runners-flink-1.14-job-server-2.43.0.jar',

> '--flink-master', 'http://localhost:8081', '--artifacts-dir',
>

'/var/folders/n3/dqblsr792yj4kfs7xlfmdj54gr/T/beam-tempvphhje07/artifacts6kjt60ch',

> '--job-port', '57882', '--artifact-port', '0',
'--expansion-port', '0']
> ERROR:apache_beam.utils.subprocess_server:Error bringing up service
> Traceback (most recent call last):
>   File
>

"/Users/flink_deploy/flink_env/lib/python3.10/site-packages/apache_beam/utils/subprocess_server.py",

> line 88, in start
>     raise RuntimeError(
> RuntimeError: Service failed to start up with error 1
>
> Any help would be appreciated.


Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-31 Thread Anton Ippolitov via user
Makes sense, thank you!

On Tue, Jan 31, 2023 at 10:48 AM Gyula Fóra  wrote:

> Thanks @Anton Ippolitov 
> At this stage I would highly recommend the native mode if you have the
> liberty to try that.
> I think that has better production characteristics and will work out of
> the box with the autoscaler. (the standalone mode won't)
>
> Gyula
>
> On Tue, Jan 31, 2023 at 10:41 AM Anton Ippolitov <
> anton.ippoli...@datadoghq.com> wrote:
>
>> I am using the Standalone Mode indeed, should've mentioned it right away.
>> This fix looks exactly like what I need, thank you!!
>>
>> On Tue, Jan 31, 2023 at 9:16 AM Gyula Fóra  wrote:
>>
>>> There is also a pending fix for the standalone + k8s HA case :
>>> https://github.com/apache/flink-kubernetes-operator/pull/518
>>>
>>> You could maybe try and review the fix :)
>>>
>>> Gyula
>>>
>>> On Tue, Jan 31, 2023 at 8:36 AM Yang Wang 
>>> wrote:
>>>
 I assume you are using the standalone mode. Right?

 For the native K8s mode, the leader address should be 
 *akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1
 *when HA enabled.


 Best,
 Yang

 Anton Ippolitov via user  于2023年1月31日周二 00:21写道:

> This is actually what I'm already doing, I'm only setting 
> high-availability:
> kubernetes myself. The other values are either defaults or set by the
> Operator:
> - jobmanager.rpc.port: 6123 is the default value (docs
> 
> )
> -  high-availability.jobmanager.port: 6123 is set by the Operator here
> 
>
> - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by
> the Operator here
> 
>  (the
> actual code which gets executed is here
> 
> )
>
>  Looking at what the Lyft Operator is doing here
> ,
>  I thought
> this would be a common issue but since you've never seen this error 
> before,
> not sure what to do 🤔
>
> On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra 
> wrote:
>
>> We never encountered this problem before but also we don't configure
>> those settings.
>> Can you simply try:
>>
>> high-availability: kubernetes
>>
>> And remove the other configs? I think that can only cause problems
>> and should not achieve anything :)
>>
>> Gyula
>>
>> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
>> user@flink.apache.org> wrote:
>>
>>> Hi everyone,
>>>
>>> I've been experimenting with Kubernetes HA and the Kubernetes
>>> Operator and ran into the following issue which is happening regularly 
>>> on
>>> TaskManagers with Flink 1.16:
>>>
>>> Error while retrieving the leader gateway. Retrying to connect to 
>>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
>>> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
>>> complete the operation. Number of retries has been exhausted.
>>>
>>> (The whole stacktrace is quite long, I put it in a Github Gist here
>>> .
>>> Note that I put placeholder values for the Kubernetes Service name and 
>>> the
>>> Namespace name)
>>>
>>> The job configuration has the following values which should be
>>> relevant:
>>> high-availability: kubernetes
>>> high-availability.jobmanager.port: 6123
>>> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
>>> jobmanager.rpc.port: 6123
>>>
>>> Looking a bit more into the logs, I can see that the Akka Actor
>>> System is started with an external address pointing to the Kubernetes
>>> Service defined by jobmanager.rpc.address:
>>> Trying to start actor system, external
>>> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address
>>> 0.0.0.0:6123.
>>> Actor system started at
>>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123
>>>
>>> (I believe the external address for the Akka Actor System is set to
>>> jobmanager.r

Re: Using pyflink from flink distribution

2023-01-31 Thread Andrew Otto
Great, thank you so much for your responses.  It all makes sense now. :)

On Mon, Jan 30, 2023 at 10:41 PM Dian Fu  wrote:

> >> What is the reason for including
> opt/python/{pyflink.zip,cloudpickle.zip,py4j.zip} in the base
> distribution then?  Oh, a guess: to make it easier for TaskManagers to run
> pyflink without having pyflink installed themselves?  Somehow I'd guess
> this wouldn't work tho; I'd assume TaskManagers would also need some python
> transitive dependencies, e.g. google protobuf.
>
> It has some historical reasons. In the first version (1.9.x) which has not
> provided Python UDF support, it's not necessary to install PyFlink in the
> nodes of TaskManagers. Since 1.10 which supports Python UDF, users have to
> install PyFlink in the nodes of TaskManager as there are many transitive
> dependencies, e.g. Apache Beam、protobuf、pandas, etc. However, we have not
> removed these packages as they are still useful for client node which is
> responsible for compiling jobs(it's not necessary to install PyFlink in the
> client node).
>
> >> Since we're building our own Docker image, I'm going the other way
> around: just install pyflink, and symlink /opt/flink ->
> /usr/lib/python3.7/dist-packages/pyflink.  So far so good, but I'm
> worried that something will be fishy when trying to run JVM apps via
> pyflink.
>
> Good idea! It contains all the things necessary needed to run JVM apps in
> the PyFlink package and so I think you could just try this way.
>
> Regards,
> Dian
>
> On Mon, Jan 30, 2023 at 9:58 PM Andrew Otto  wrote:
>
>> Thanks Dian!
>>
>> > >> Is using pyflink from the flink distribution tarball (without pip)
>> not a supported way to use pyflink?
>> > You are right.
>>
>> What is the reason for including
>> opt/python/{pyflink.zip,cloudpickle.zip,py4j.zip} in the base
>> distribution then?  Oh, a guess: to make it easier for TaskManagers to run
>> pyflink without having pyflink installed themselves?  Somehow I'd guess
>> this wouldn't work tho; I'd assume TaskManagers would also need some python
>> transitive dependencies, e.g. google protobuf.
>>
>> > you could remove the JAR packages located under
>> /usr/local/lib/python3.7/dist-packages/pyflink/lib manually after `pip
>> install apache-flink`
>>
>> Since we're building our own Docker image, I'm going the other way
>> around: just install pyflink, and symlink /opt/flink ->
>> /usr/lib/python3.7/dist-packages/pyflink.  So far so good, but I'm worried
>> that something will be fishy when trying to run JVM apps via pyflink.
>>
>> -Ao
>>
>>
>>
>> On Sun, Jan 29, 2023 at 1:43 AM Dian Fu  wrote:
>>
>>> Hi Andrew,
>>>
>>> >> By pip installing apache-flink, this docker image will have the flink
>>> distro installed at /opt/flink and FLINK_HOME set to /opt/flink
>>> .
>>> BUT ALSO flink lib jars will be installed at e.g.
>>> /usr/local/lib/python3.7/dist-packages/pyflink/lib!
>>> So, by following those instructions, flink is effectively installed
>>> twice into the docker image.
>>>
>>> Yes, your understanding is correct. The base image `flink:1.15.2`
>>> doesn't include PyFlink and so you need to build a custom image if you want
>>> to use PyFlink. Regarding to the jar packages which are installed twice,
>>> you could remove the JAR packages located under
>>> /usr/local/lib/python3.7/dist-packages/pyflink/lib manually after `pip
>>> install apache-flink`. It will use the JAR packages located under
>>> $FLINK_HOME/lib.
>>>
>>> >> Is using pyflink from the flink distribution tarball (without pip)
>>> not a supported way to use pyflink?
>>> You are right.
>>>
>>> Regards,
>>> Dian
>>>
>>>
>>> On Thu, Jan 26, 2023 at 11:12 PM Andrew Otto  wrote:
>>>
 Ah, oops and my original email had a typo:
 > Some python dependencies are not included in the flink distribution
 tarballs: cloudpickle, py4j and pyflink are in opt/python.

 Should read:
 > Some python dependencies ARE included in the flink distribution
 tarballs: cloudpickle, py4j and pyflink are in opt/python.

 On Thu, Jan 26, 2023 at 10:10 AM Andrew Otto 
 wrote:

> Let me ask a related question:
>
> We are building our own base Flink docker image.  We will be deploying
> both JVM and python apps via flink-kubernetes-operator.
>
> Is there any reason not to install Flink in this image via `pip
> install apache-flink` and use it for JVM apps?
>
> -Andrew Otto
>  Wikimedia Foundation
>
>
>
> On Tue, Jan 24, 2023 at 4:26 PM Andrew Otto 
> wrote:
>
>> Hello,
>>
>> I'm having quite a bit of trouble running pyflink from the default
>> flink distribution tarballs.  I'd expect the python examples to work as
>> long as python is installed, and we've got the distribution.  Some python
>> dependencies are not included in the flink distribution tarballs:
>> cloudpickle, py4j and py

Re: beam + flink + k8

2023-01-31 Thread Jan Lukavský
The script looks good to me, did you run the SDK harness? External 
environment needs the SDK harness to be run externally, see [1]. 
Generally, the best option is DOCKER, but that usually does not work in 
k8s. For this, you might try PROCESS environment and build your own 
docker image for flink, which will contain the Beam harness, e.g. [2]. 
You will need to pass the environment config using 
--environment_config={"command": "/opt/apache/beam/boot"}.


From the screenshot it seems, that the Flink UI is accessible, so this 
is the only option that comes to my mind. Did you check logs of the 
Flink jobmanager pod?


 Jan

[1] https://beam.apache.org/documentation/runtime/sdk-harness-config/

[2] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/docker/flink/Dockerfile


On 1/31/23 13:33, P Singh wrote:

HI Jan,

Thanks for your reply, please find attached script, I am newbie with 
flink and minikube though i am trying to connect them by script from 
local machine as suggested by flink kubernetes documents link 



I have changed the log level to ERROR but didn't find much... Can you 
please help me out how to run the script from inside the pod.


On Tue, 31 Jan 2023 at 15:40, Jan Lukavský  wrote:

Hi,

can you please share the also the script itself? I'd say that the
problem is that the flink jobmanager is not accessible through
localhost:8081, because it runs inside the minikube. You need to
expose it outside of the minikube via [1], or run the script from
pod inside the minikube and access job manager via
flink-jobmanager:8081. I'm surprised that the log didn't make this
more obvious, though. Is it possible that you changed the default
log level to ERROR? Can you try DEBUG or similar?

 Jan

[1] https://minikube.sigs.k8s.io/docs/handbook/accessing/

On 1/30/23 18:36, P Singh wrote:

Hi Jan,

Yeah I am using minikube and beam image with python 3.10.

Please find the attached screenshots.



On Mon, 30 Jan 2023 at 21:22, Jan Lukavský  wrote:

Hi,

can you please share the command-line and complete output of
the script?
Are you using minikube? Can you share list of your running pods?

  Jan

On 1/30/23 14:25, P Singh wrote:
> Hi Team,
>
> I am trying to run beam job on top of flink on my local
machine
> (kubernetes).
>
>  I have flink 1.14 and beam 2.43 images both running but
when i submit
> the job it's not reaching to the flink cluster and getting
failed with
> below error.
>
> ERROR:apache_beam.utils.subprocess_server:Starting job
service with
> ['java', '-jar',
>

'/Users/spsingh/.apache_beam/cache/jars/beam-runners-flink-1.14-job-server-2.43.0.jar',

> '--flink-master', 'http://localhost:8081', '--artifacts-dir',
>

'/var/folders/n3/dqblsr792yj4kfs7xlfmdj54gr/T/beam-tempvphhje07/artifacts6kjt60ch',

> '--job-port', '57882', '--artifact-port', '0',
'--expansion-port', '0']
> ERROR:apache_beam.utils.subprocess_server:Error bringing up
service
> Traceback (most recent call last):
>   File
>

"/Users/flink_deploy/flink_env/lib/python3.10/site-packages/apache_beam/utils/subprocess_server.py",

> line 88, in start
>     raise RuntimeError(
> RuntimeError: Service failed to start up with error 1
>
> Any help would be appreciated.


Extremely long startup time for exactly_once kafka sink

2023-01-31 Thread Bobby Richard
When enabling exactly_once on my kafka sink, I am seeing extremely long
initialization times (over 1 hour), especially after restoring from a
savepoint. In the logs I see the job constantly initializing thousands of
kafka producers like this:

2023-01-31 14:39:58,150 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-mutable-enriched-events-5-1,
transactionalId=common-event-enrichment-mutable-enriched-events-5-1]
ProducerId set to 847642 with epoch 14
2023-01-31 14:39:58,150 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-mutable-enriched-events-5-1,
transactionalId=common-event-enrichment-mutable-enriched-events-5-1]
Invoking InitProducerId for the first time in order to acquire a producer ID
2023-01-31 14:39:58,151 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-common-enriched-events-5-1,
transactionalId=common-event-enrichment-common-enriched-events-5-1]
ProducerId set to 2496758 with epoch 25
2023-01-31 14:39:58,151 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-immutable-enriched-events-5-1,
transactionalId=common-event-enrichment-immutable-enriched-events-5-1]
ProducerId set to 886210 with epoch 16
2023-01-31 14:39:58,151 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-mutable-enriched-events-5-1,
transactionalId=common-event-enrichment-mutable-enriched-events-5-1]
Discovered transaction coordinator kafka-broker-2:9092 (id: 2 rack: null)
2023-01-31 14:39:58,151 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-immutable-enriched-events-5-1,
transactionalId=common-event-enrichment-immutable-enriched-events-5-1]
Invoking InitProducerId for the first time in order to acquire a producer ID
2023-01-31 14:39:58,151 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-common-enriched-events-5-1,
transactionalId=common-event-enrichment-common-enriched-events-5-1] Invoking
InitProducerId for the first time in order to acquire a producer ID
2023-01-31 14:39:58,152 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-immutable-enriched-events-5-1,
transactionalId=common-event-enrichment-immutable-enriched-events-5-1]
Discovered transaction coordinator kafka-broker-0:9092 (id: 0 rack: null)
2023-01-31 14:39:58,152 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-common-enriched-events-5-1,
transactionalId=common-event-enrichment-common-enriched-events-5-1]
Discovered transaction coordinator kafka-broker-2:9092 (id: 2 rack: null)

Does transaction timeout impact the startup time? How can I optimize the
initialization time? Without exactly_once the job starts up very quickly.

-- 
This electronic communication and the information and any files transmitted 
with it, or attached to it, are confidential and are intended solely for 
the use of the individual or entity to whom it is addressed and may contain 
information that is confidential, legally privileged, protected by privacy 
laws, or otherwise restricted from disclosure to anyone else. If you are 
not the intended recipient or the person responsible for delivering the 
e-mail to the intended recipient, you are hereby notified that any use, 
copying, distributing, dissemination, forwarding, printing, or copying of 
this e-mail is strictly prohibited. If you received this e-mail in error, 
please return the e-mail to the sender, delete it from your computer, and 
destroy any printed copy of it.


smime.p7s
Description: S/MIME Cryptographic Signature


Re: Custom catalog implementation - getting table schema for computed columns

2023-01-31 Thread yuxia
Hi. 
Just FYI, I have seen some catalogs are still use deprecated TableSchema in 
flink hive, Iceberg, etc connector. But it's in Flink plan to drop the 
deprecated table schema [1]. In long term, seems use new schema api is a better 
choice. 

If it's for the case of Catalog's createTable method, from the code base [1], 
the passed CatalogBaseTable looks like should be a instance of 
ResolvedCatalogBaseTable with which you can get the resolve schema. From the 
commit history[3], since Flink 1.13, the pased CatalogBaseTable is intance of 
ResolvedCatalogBaseTable. 

I think maybe you can cast it ResolvedCatalogBaseTable and get the resolved 
schema. But please remeber, the cast will fail when the Flink version is lower 
than 1.13 since only from Flink 1.13, the passed CatalogBaseTable is intance of 
ResolvedCatalogBaseTable. 

[1] https://issues.apache.org/jira/browse/FLINK-29072 
[2] 
https://github.com/apache/flink/blob/75a92efd7b35501698e5de253e5231d680830c16/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java#L654
 
[3] [ https://issues.apache.org/jira/browse/FLINK-21396 | 
https://issues.apache.org/jira/browse/FLINK-21396 ] 

Best regards, 
Yuxia 


发件人: "Krzysztof Chmielewski"  
收件人: "User"  
发送时间: 星期六, 2023年 1 月 21日 上午 2:27:25 
主题: Custom catalog implementation - getting table schema for computed columns 

Hi, 
I'm implementing a custom Catalog where for "create table" I need to get tables 
schema, both column names and types from DDL. 

Now the Catalog's createTable method has "CatalogBaseTable table" argument. 
The CatalogBaseTable has a deprecated "getSchema" and suggest to use 
getUnresolvedSchema instead. 

I was able to resolve schema types for physical columns, but I'm struggling 
with computed columns [1]. To be more precise I'm struggling to get//resolve 
the type of this field. 

I see that all implementations that would be needed to resolve inderlying 
expression of UnresolvedComputedColumn are marked as @Internal. 

On the other hand the deprecated "getSchema" has a proper type for this 
ComputedColumn. 

I'm wondering now, what should I do. Should I use the deprecated API that has 
what I need already or should I use suggested API and somehow try to resolve 
the type using @Internal classes which also does not seems safe. 

I would appreciate for any hint here. 

[1] [ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#:~:text=BIGINT%2C%20%60name%60%20STRING)-,Computed%20Columns,-Computed%20columns%20are
 | 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#:~:text=BIGINT%2C%20%60name%60%20STRING)-,Computed%20Columns,-Computed%20columns%20are
 ] 



Re: Custom catalog implementation - getting table schema for computed columns

2023-01-31 Thread yuxia
HI, 
> about the question can I assume that ResolvedCatalogTable will be always a 
> runtime type. 

Sorry for I don't really understand your question , why do you have such 
assumtion? 

Best regards, 
Yuxia 


发件人: "Krzysztof Chmielewski"  
收件人: "User"  
发送时间: 星期六, 2023年 1 月 21日 上午 3:13:12 
主题: Re: Custom catalog implementation - getting table schema for computed 
columns 

Ok, 
so now I see that runtime type of "table" parameter is ResolvedCatalogTable 
that has method getResolvedSchema. 

So I guess my question is, can I assume that ResolvedCatalogTable will be 
always a runtime type? 

pt., 20 sty 2023 o 19:27 Krzysztof Chmielewski < [ 
mailto:krzysiek.chmielew...@gmail.com | krzysiek.chmielew...@gmail.com ] > 
napisał(a): 



Hi, 
I'm implementing a custom Catalog where for "create table" I need to get tables 
schema, both column names and types from DDL. 

Now the Catalog's createTable method has "CatalogBaseTable table" argument. 
The CatalogBaseTable has a deprecated "getSchema" and suggest to use 
getUnresolvedSchema instead. 

I was able to resolve schema types for physical columns, but I'm struggling 
with computed columns [1]. To be more precise I'm struggling to get//resolve 
the type of this field. 

I see that all implementations that would be needed to resolve inderlying 
expression of UnresolvedComputedColumn are marked as @Internal. 

On the other hand the deprecated "getSchema" has a proper type for this 
ComputedColumn. 

I'm wondering now, what should I do. Should I use the deprecated API that has 
what I need already or should I use suggested API and somehow try to resolve 
the type using @Internal classes which also does not seems safe. 

I would appreciate for any hint here. 

[1] [ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#:~:text=BIGINT%2C%20%60name%60%20STRING)-,Computed%20Columns,-Computed%20columns%20are
 | 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#:~:text=BIGINT%2C%20%60name%60%20STRING)-,Computed%20Columns,-Computed%20columns%20are
 ] 







How to add permission validation? flink reads and writes hive table。

2023-01-31 Thread melin li
flink supports both sql and jar types.How can we implement a unified access
check in flink? spark supports extensions; flink lacks extensions.


RE: Processing watermarks in a broadcast connected stream

2023-01-31 Thread Schwalbe Matthias
Good Morning Sajjad,

I’ve once had a similar problem. As you’ve found out, directly using 
KeyedBroadcastProcessFunction is a little tricky.
What I ended up with instead is to use the rather new @PublicEvolving 
MultipleInputStreamOperator.
It allows you to connect and process any (reasonable) number of DataStream 
keyed/broadcast/plain and also to tap into
the meta-stream of watermark events. Each Input is set up separately and can 
implement separate handlers for the events/watermarks/etc.
However, it is an operator implementation, you e.g. need to manually set up 
timer manager and a number of other auxiliary components.
This is not too difficult as you can always model after other operator 
implementations within flink.

If you don’t mind that it will be in Scala, I could take the time to collect 
the basic setup …?


Hope this helps

Thias







From: Sajjad Rizvi 
Sent: Monday, January 30, 2023 7:42 PM
To: user@flink.apache.org
Subject: Processing watermarks in a broadcast connected stream

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi,

I am trying to process watermarks in a BroadcastConnectedStream. However, I am 
not able to find any direct way to handle watermark events, similar to what we 
have in processWatermark1 in a  KeyedCoProcessOperator. Following are further 
details.

In the context of the example given in “A Practical Guide to Broadcast State in 
Apache Flink”, I have 
a user actions stream and a pattern stream. The pattern stream is broadcast and 
connected with the user actions stream. The result is a 
BroadcastConnectedStream. I want to handle user action events and pattern evens 
in this stream. In addition, I want to use a processWatermark function to 
perform an action in response to watermark events.

The problem is that a BroadcastConnectedStream has only process() function, no 
transform(), that takes a (Keyed)BroadcastProcessFunction. A 
BroadcastProcessFunction allows only to process elements, doesn’t provide the 
interface to process watermarks. In contrast, a ConnectedStream (without 
broadcast) provides a transform function, which takes in an operator that 
provides a way to process watermarks.

Is there a way to process watermarks in a BroadcastConnectedStream?

Thanks,
Sajjad


Disclaimer

This email and any attachments are for the expressed and sole use of the 
intended recipient(s) and contain information that may be confidential and/or 
legally privileged. Any disclosure, copying, distribution or use of this 
communication by someone other than the intended recipient is strictly 
prohibited. If you are not the intended recipient please delete this email 
immediately. Any information and services described herein are provided by 
Arctic Wolf Networks, Inc.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: How to add permission validation? flink reads and writes hive table。

2023-01-31 Thread yuxia
HI, melin li. 
Could you please explain a bit more about unified access check in flink? 

Best regards, 
Yuxia 


发件人: "melin li"  
收件人: "User"  
发送时间: 星期三, 2023年 2 月 01日 下午 2:39:15 
主题: How to add permission validation? flink reads and writes hive table。 



flink supports both sql and jar types.How can we implement a unified access 
check in flink? spark supports extensions; flink lacks extensions. 






Reducing Checkpoint Count for Chain Operator

2023-01-31 Thread Talat Uyarer via user
Hi,

We have a job that is reading from kafka and writing some endpoints. The
job does not have any shuffling steps.  I implement it with multiple
steps.  Flink chained those operators in one operator in submission time.
However I see all operators are doing checkpointing.

Is there any way to create one checkpoint object per chain operator rather
than all operators ?

Thanks