Re: Re: pojo warning when using auto generated protobuf class

2021-04-26 Thread Yun Gao
Hi Prashant,

Flink should always give warnings as long as the deduced result 
is GenericType, no matter it uses the default kryo serializer or
the register one, thus if you have registered the type, you may 
simply ignore the warnings. To make sure it works, you may 
find the tm that the source tasks resides, and use jmap to 
see if ProtoSerializer is created or not.

Best,
Yun




 --Original Mail --
Sender:Prashant Deva 
Send Date:Sun Apr 25 01:18:41 2021
Recipients:Yun Gao 
CC:User 
Subject:Re: pojo warning when using auto generated protobuf class

so i did  register the type with Kryo and the ProtobufSerializer. However I am 
still continuing to see the warnings. is this a bug in Flink?

env.config.registerTypeWithKryoSerializer(Trace.APITrace::class.java, 
ProtobufSerializer::class.java)

 val stream: DataStreamSource = 
env.addSource(FlinkKafkaConsumer(Config.TOPIC_SPANS, ApiTraceSchema(), props))

Sent via Superhuman

On Sat, Apr 24, 2021 at 8:48 AM, Yun Gao  wrote:

Hi Prashant,

I think the warn is given when calling 

return TypeInformation.of(Trace.APITrace::class.java)

Currently flink does not have the native support 
for the protobuf types yet[1], thus it would use a
generic serializer created by kryo. 

This should not affect the rightness of the program
and should only affect its performance. One possible
solution might be register custom serializer into the kryo 
serializer framework for protobuf classes, like the example in [2].

Best,
Yun

[1] https://issues.apache.org/jira/browse/FLINK-11333
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html


 --Original Mail --
Sender:Prashant Deva 
Send Date:Sat Apr 24 11:00:17 2021
Recipients:User 
Subject:pojo warning when using auto generated protobuf class

I am seeing this warning msg when trying to use a custom protobuf de/serializer 
with kafka source with auto generated java protobuf class:

18:41:31.164 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - 
Class class com.xx.APITrace cannot be used as a POJO type because not all 
fields are valid POJO fields, and must be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance.

here is my serializer. What am i doing wrong?

class ApiTraceSchema: DeserializationSchema, 
SerializationSchema {

override fun getProducedType(): TypeInformation {
return TypeInformation.of(Trace.APITrace::class.java)
}

override fun deserialize(message: ByteArray): Trace.APITrace {
return Trace.APITrace.parseFrom(message)
}

override fun isEndOfStream(nextElement: Trace.APITrace): Boolean {
return false
}

override fun serialize(element: Trace.APITrace): ByteArray {
return element.toByteArray()
}
}


Re: Checkpoint error - "The job has failed"

2021-04-26 Thread Yun Tang
Hi Dan,

I think you might use older version of Flink and this problem has been resolved 
by FLINK-16753 [1] after Flink-1.10.3.


[1] https://issues.apache.org/jira/browse/FLINK-16753

Best
Yun Tang

From: Robert Metzger 
Sent: Monday, April 26, 2021 14:46
To: Dan Hill 
Cc: user 
Subject: Re: Checkpoint error - "The job has failed"

Hi Dan,

can you provide me with the JobManager logs to take a look as well? (This will 
also tell me which Flink version you are using)



On Mon, Apr 26, 2021 at 7:20 AM Dan Hill 
mailto:quietgol...@gmail.com>> wrote:
My Flink job failed to checkpoint with a "The job has failed" error.  The logs 
contained no other recent errors.  I keep hitting the error even if I cancel 
the jobs and restart them.  When I restarted my jobmanager and taskmanager, the 
error went away.

What error am I hitting?  It looks like there is bad state that lives outside 
the scope of a job.

How often do people restart their jobmanagers and taskmanager to deal with 
errors like this?


Re: MemoryStateBackend Issue

2021-04-26 Thread Matthias Pohl
I'm not sure what you're trying to achieve. Are you trying to simulate a
task failure? Or are you trying to pick up the state from a stopped job?
You could achieve the former one by killing the TaskManager instance or by
throwing a custom failure as part of your job pipeline. The latter one can
be achieved by using stop-with-savepoint instead of canceling the job.

Matthias

On Fri, Apr 23, 2021 at 9:31 PM Milind Vaidya  wrote:

> Hi Matthias,
>
> Yeah you are right. I am canceling the job and hence it is creating new
> job with new job id and hence it is no respecting previous checkpoint. I
> observed same behaviour even for local FS backend.
>
> Is there any way to simulated failing of job locally ?
>
> As far as config is concerned, I have not configured any back end in the
> conf file and defaulting to Memory Checkpoint.
>
> Thanks,
> Milind
>
>
>
> On Fri, Apr 23, 2021 at 12:32 AM Matthias Pohl 
> wrote:
>
>> One additional question: How did you stop and restart the job? The
>> behavior you're expecting should work with stop-with-savepoint. Cancelling
>> the job and then just restarting it wouldn't work. The latter approach
>> would lead to a new job being created.
>>
>> Best,
>> Matthias
>>
>> On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl 
>> wrote:
>>
>>> Hi Milind,
>>> I bet someone else might have a faster answer. But could you provide the
>>> logs and config to get a better understanding of what your issue is?
>>> In general, the state is maintained even in cases where a TaskManager
>>> fails.
>>>
>>> Best,
>>> Matthias
>>>
>>> On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya  wrote:
>>>
 Hi

 I see MemoryStateBackend being used in TM Log

 org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend
 has been configured, using default (Memory / JobManager)
 MemoryStateBackend (data in heap memory / checkpoints to JobManager)
 (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
 maxStateSize: 5242880)



 I am logging checkpointed value which is just message count

 Snapshot the state 500
 Snapshot the state 1000


 When I restart the job i.e. new TM but the job manager is same I see

 Snapshot the state 500

 In the JM logs I see following entries

 Triggering checkpoint 1
 Triggering checkpoint 2

 After restarting job hence new TM

 Triggering checkpoint 1

 As per my understanding JM should hold the checkpointed
 
 state across TM ? Am I correct?

 I have not configured anything special and using default. Do I need to
 add any setting to make it work ?
 I want to maintain message count across the TMs.

>>>


Deployment/Memory Configuration/Scalability

2021-04-26 Thread Radoslav Smilyanov
Hi all,

I am having multiple questions regarding Flink :) Let me give you some
background of what I have done so far.

*Description*
I am using Flink 1.11.2. My job is doing data enrichment. Data is consumed
from 6 different kafka topics and it is joined via multiple
CoProcessFunctions. On a daily basis the job is handling ~20 millions
events from the source kafka topics.

*Configuration*
These are the settings I am using:

jobmanager.memory.process.size: 4096m
jobmanager.memory.off-heap.size: 512m
taskmanager.memory.process.size: 12000m
taskmanager.memory.task.off-heap.size: 512m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 5
taskmanager.rpc.port: 6122
jobmanager.execution.failover-strategy: region
state.backend: rocksdb
state.backend.incremental: true
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
state.backend.rocksdb.block.cache-size: 64mb
state.checkpoints.dir: s3://bucket/checkpoints
state.savepoints.dir: s3://bucket/savepoints
s3.access-key: AWS_ACCESS_KEY_ID
s3.secret-key: AWS_SECRET_ACCESS_KEY
s3.endpoint: http://
s3.path.style.access: true
s3.entropy.key: _entropy_
s3.entropy.length: 8
presto.s3.socket-timeout: 10m
client.timeout: 60min

*Deployment setup*
Flink is deployed in k8s with Per-Job mode having 1 job manager and 5 task
managers. I have a daily cron job which triggers savepoint in order to have
a fresh copy of the whole state.

*Problems with the existing setup*
1. I observe that savepoints are causing Flink to consume more than the
allowed memory. I observe the behavior described in this stackoverflow post

(which
seems to be solved in 1.12.X if I am getting it right).
2. I cannot achieve high availability with Per-Job mode and thus I ended up
having a regular savepoint on a daily basis.

*Questions*
1. Is it a good idea to have regular savepoints (say on a daily basis)?
2. Is it possible to have high availability with Per-Job mode? Or maybe I
should go with session mode and make sure that my flink cluster is running
a single job?
3. Let's assume that savepoints should be triggered only before job
update/deployment. How can I trigger a savepoint if my job is already
consuming more than 80% of the allowed memory per pod in k8s? My
observations show that k8s kills task managers (which are running as pods)
and I need to retry it a couple of times.
4. Should I consider upgrading to version 1.12.3?
5. Should I consider switching off state.backend.rocksdb.memory.managed
property even in version 1.12.3?
6. How do I decide when the job parallelism should be increased? Are there
some metrics which can lead me to a clue that the parallelism should be
increased?

Best Regards,
Rado


Re: Flink Event specific window

2021-04-26 Thread Swagat Mishra
Hi Arvid,

On 2 - I was referring to stateful functions as an alternative to windows,
but in this particular use case, its not fitting in exactly I think, though
a solution can be built around it.

On the overall approach here what's the right way to use Flink SQL:

Every event has the transaction time which I am using as event time to
assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr
then process all elements using ProcessWindowFunction

Extending above..

For the session window, taking the above example , reiterated below:

Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

1 hour window:
9:30AM - 10:30 AM : Customer 2
10:30 AM - 11:30 AM : Customer 1, Customer 3
11:30 AM - 12:30 PM : Customer 3

Questions - how do we access the state?

   1. Will the process window function write to an in-memory SQL table
that does not get flushed to a proper backing database, so all the
data stays in-memory -  if yes can that be queried?
   2. If the process window function writes to a proper backing
database, at what point should this happen? Because the API can query
the state at any point of time, so the data that was flushed might be
state and need recomputation.
   3. How do you recommend for rock db to be used as a state backend?
Is that the embedded rocks db or do you recommend an external
implementation. Embedded rocks db state is lost when the container is
restarted i guess, so we will have to have an external mechanism for
restart/ crash recovery?

Regards,
Swagat



On Mon, Apr 26, 2021 at 11:29 AM Arvid Heise  wrote:

> 1. It always depends on the data volume per user. A million user is not
> much if you compare it to the biggest Flink installations (Netflix,
> Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd
> recommend to use rocksDB state backend. [1]
>
> 2. Are you referring to statefun? I'd say that for your use case, Flink is
> a better fit. Statefun is more suitable when each actor (=user in your
> case) acts differently depending on the data like in a state machine. In
> your case, your users should be processed in the same way: Even if the
> windows are independently opened and closed, every user has only at most
> one window open at a given event time. You probably also aggregate all user
> states more or less in the same way.
>
> Or did you refer to processing functions with state? That's certainly
> possible to implement but it won't be much faster unless you can exploit
> some specific properties of your application. An example is written in [2].
> I'd recommend to first use regular, built-in windows and only switch to
> custom code if the performance is insufficient. Custom implementations may
> be faster now, but come with a higher maintenance cost and the built-in
> windows may be better optimized in future.
>
> Lastly if your query is of relational nature, I'd recommend to have a look
> at Table API/SQL [3]. Unless you really invest a lot of time, you won't be
> able to write more efficient code than what Table API is generating.
>
> [1] https://flink.apache.org/2021/01/18/rocksdb.html
> [2] https://flink.apache.org/news/2020/07/30/demo-fraud-detection-3.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#session-session-windows
>
> On Sun, Apr 25, 2021 at 11:46 PM Swagat Mishra  wrote:
>
>>  1. What if there are a very high number of users, like a million
>> customers won't the service crash? Is it advisable to hold the data in
>> memory.
>>
>> 2. What if state-functions are used to calculate the value ? How will
>> this approach differ from the one proposed below.
>>
>> Regards,
>> Swagat
>>
>> On Wed, Apr 21, 2021, 11:25 PM Arvid Heise  wrote:
>>
>>> Hi Sunitha,
>>>
>>> the approach you are describing sounds like you want to use a session
>>> window. [1] If you only want to count them if they happen at the same hour
>>> then, you want to use a tumbling window.
>>>
>>> Your datastream approach looks solid.
>>>
>>> For SQL, there is also a session (and tumbling) window [2]. You can see
>>> examples at the bottom of the section.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#session-windows
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows
>>>
>>> On Tue, Apr 20, 2021 at 11:03 PM s_penakalap...@yahoo.com <
>>> s_penakalap...@yahoo.com> wrote:
>>>
 Hi All,

 I have one requirement where I need to calculate total amount of
 transactions done by each each user in last 1 hour.
 Say Customer1 has done 2 transactions one at 11:00am and other one at
 11:20 am.
 Customer2 has done 1 transaction one at 10:00 am
 Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45
 am.

Re: when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-26 Thread Till Rohrmann
Hi Tony,

I think you are right that Flink's cli does not behave super consistent at
the moment. Case 2. should definitely work because `-t yarn-application`
should overwrite what is defined in the Flink configuration. The problem
seems to be that we don't resolve the configuration wrt the specified
command line options before calling into `CustomCommandLine.isActive`. If
we parsed first the command line configuration options which can overwrite
flink-conf.yaml options and then replaced them, then the custom command
lines (assuming that they use the Configuration as the ground truth) should
behave consistently.

For your questions:

1. I am not 100% sure. I think the FlinkYarnSessionCli wasn't used on
purpose when introducing the yarn application mode.
2. See answer 1.

I think it is a good idea to extend the description of the config option
`execution.target`. Do you want to create a ticket and a PR for it?

Cheers,
Till

On Mon, Apr 26, 2021 at 8:37 AM Yangze Guo  wrote:

> Hi, Tony.
>
> What is the version of your flink-dist. AFAIK, this issue should be
> addressed in FLINK-15852[1]. Could you give the client log of case
> 2(set the log level to DEBUG would be better).
>
> [1] https://issues.apache.org/jira/browse/FLINK-15852
>
> Best,
> Yangze Guo
>
> On Sun, Apr 25, 2021 at 11:33 AM Tony Wei  wrote:
> >
> > Hi Experts,
> >
> > I recently tried to run yarn-application mode on my yarn cluster, and I
> had a problem related to configuring `execution.target`.
> > After reading the source code and doing some experiments, I found that
> there should be some room of improvement for `FlinkYarnSessionCli` or
> `AbstractYarnCli`.
> >
> > My experiments are:
> >
> > setting `execution.target: yarn-application` in flink-conf.yaml and run
> `flink run-application -t yarn-application`: run job successfully.
> >
> > `FlinkYarnSessionCli` is not active
> > `GenericCLI` is active
> >
> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
> `flink run-application -t yarn-application`: run job failed
> >
> > failed due to `ClusterDeploymentException` [1]
> > `FlinkYarnSessionCli` is active
> >
> > setting `execution.target: yarn-application` in flink-conf.yaml and run
> `flink run -t yarn-per-job`: run job successfully.
> >
> > `FlinkYarnSessionCli` is not active
> > `GenericCLI` is active
> >
> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
> `flink run -t yarn-per-job`: run job successfully.
> >
> > `FlinkYarnSessionCli` is active
> >
> > From `AbstractYarnCli#isActive` [2] and `FlinkYarnSessionCli#isActive`
> [3], `FlinkYarnSessionCli` will be active when `execution.target` is
> specified with `yarn-per-job` or `yarn-session`.
> >
> > According to the flink official document [4], I thought the 2nd
> experiment should also work well, but it didn't.
> >>
> >> The --target will overwrite the execution.target specified in the
> config/flink-config.yaml.
> >
> >
> > The root cause is that `FlinkYarnSessionCli` only overwrite the
> `execution.target` with `yarn-session` or `yarn-per-job` [5], but no
> `yarn-application`.
> > So, my question is
> >
> > should we use `FlinkYarnSessionCli` in case 2?
> > if we should, how we can improve `FlinkYarnSessionCli` so that we can
> overwrite `execution.target` via `--target`?
> >
> > and one more improvement, the config description for `execution.target`
> [6] should include `yarn-application` as well.
> >
> > [1]
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L439-L447
> > [2]
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/AbstractYarnCli.java#L54-L66
> > [3]
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L373-L377
> > [4]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#selecting-deployment-targets
> > [5]
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L397-L413
> > [6]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java#L41-L46
> >
> > best regards,
> >
>


Re: when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-26 Thread Yangze Guo
Hi, Till,

I agree that we need to resolve the issue by overriding the
configuration before selecting the CustomCommandLines. However, IIUC,
after FLINK-15852 the GenericCLI should always be the first choice.
Could you help me to understand why the FlinkYarnSessionCli can be
activated?


Best,
Yangze Guo

On Mon, Apr 26, 2021 at 4:48 PM Till Rohrmann  wrote:
>
> Hi Tony,
>
> I think you are right that Flink's cli does not behave super consistent at 
> the moment. Case 2. should definitely work because `-t yarn-application` 
> should overwrite what is defined in the Flink configuration. The problem 
> seems to be that we don't resolve the configuration wrt the specified command 
> line options before calling into `CustomCommandLine.isActive`. If we parsed 
> first the command line configuration options which can overwrite 
> flink-conf.yaml options and then replaced them, then the custom command lines 
> (assuming that they use the Configuration as the ground truth) should behave 
> consistently.
>
> For your questions:
>
> 1. I am not 100% sure. I think the FlinkYarnSessionCli wasn't used on purpose 
> when introducing the yarn application mode.
> 2. See answer 1.
>
> I think it is a good idea to extend the description of the config option 
> `execution.target`. Do you want to create a ticket and a PR for it?
>
> Cheers,
> Till
>
> On Mon, Apr 26, 2021 at 8:37 AM Yangze Guo  wrote:
>>
>> Hi, Tony.
>>
>> What is the version of your flink-dist. AFAIK, this issue should be
>> addressed in FLINK-15852[1]. Could you give the client log of case
>> 2(set the log level to DEBUG would be better).
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15852
>>
>> Best,
>> Yangze Guo
>>
>> On Sun, Apr 25, 2021 at 11:33 AM Tony Wei  wrote:
>> >
>> > Hi Experts,
>> >
>> > I recently tried to run yarn-application mode on my yarn cluster, and I 
>> > had a problem related to configuring `execution.target`.
>> > After reading the source code and doing some experiments, I found that 
>> > there should be some room of improvement for `FlinkYarnSessionCli` or 
>> > `AbstractYarnCli`.
>> >
>> > My experiments are:
>> >
>> > setting `execution.target: yarn-application` in flink-conf.yaml and run 
>> > `flink run-application -t yarn-application`: run job successfully.
>> >
>> > `FlinkYarnSessionCli` is not active
>> > `GenericCLI` is active
>> >
>> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run `flink 
>> > run-application -t yarn-application`: run job failed
>> >
>> > failed due to `ClusterDeploymentException` [1]
>> > `FlinkYarnSessionCli` is active
>> >
>> > setting `execution.target: yarn-application` in flink-conf.yaml and run 
>> > `flink run -t yarn-per-job`: run job successfully.
>> >
>> > `FlinkYarnSessionCli` is not active
>> > `GenericCLI` is active
>> >
>> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run `flink 
>> > run -t yarn-per-job`: run job successfully.
>> >
>> > `FlinkYarnSessionCli` is active
>> >
>> > From `AbstractYarnCli#isActive` [2] and `FlinkYarnSessionCli#isActive` 
>> > [3], `FlinkYarnSessionCli` will be active when `execution.target` is 
>> > specified with `yarn-per-job` or `yarn-session`.
>> >
>> > According to the flink official document [4], I thought the 2nd experiment 
>> > should also work well, but it didn't.
>> >>
>> >> The --target will overwrite the execution.target specified in the 
>> >> config/flink-config.yaml.
>> >
>> >
>> > The root cause is that `FlinkYarnSessionCli` only overwrite the 
>> > `execution.target` with `yarn-session` or `yarn-per-job` [5], but no 
>> > `yarn-application`.
>> > So, my question is
>> >
>> > should we use `FlinkYarnSessionCli` in case 2?
>> > if we should, how we can improve `FlinkYarnSessionCli` so that we can 
>> > overwrite `execution.target` via `--target`?
>> >
>> > and one more improvement, the config description for `execution.target` 
>> > [6] should include `yarn-application` as well.
>> >
>> > [1] 
>> > https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L439-L447
>> > [2] 
>> > https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/AbstractYarnCli.java#L54-L66
>> > [3] 
>> > https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L373-L377
>> > [4] 
>> > https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#selecting-deployment-targets
>> > [5] 
>> > https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L397-L413
>> > [6] 
>> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java#L41-L46
>> >
>> > best regards,
>> >


Re: when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-26 Thread Till Rohrmann
I think you are right that the `GenericCLI` should be the first choice.
>From the top of my head I do not remember why FlinkYarnSessionCli is still
used. Maybe it is in order to support some Yarn specific cli option
parsing. I assume it is either an oversight or some parsing has not been
completely migrated to the GenericCLI.

Cheers,
Till

On Mon, Apr 26, 2021 at 11:07 AM Yangze Guo  wrote:

> Hi, Till,
>
> I agree that we need to resolve the issue by overriding the
> configuration before selecting the CustomCommandLines. However, IIUC,
> after FLINK-15852 the GenericCLI should always be the first choice.
> Could you help me to understand why the FlinkYarnSessionCli can be
> activated?
>
>
> Best,
> Yangze Guo
>
> On Mon, Apr 26, 2021 at 4:48 PM Till Rohrmann 
> wrote:
> >
> > Hi Tony,
> >
> > I think you are right that Flink's cli does not behave super consistent
> at the moment. Case 2. should definitely work because `-t yarn-application`
> should overwrite what is defined in the Flink configuration. The problem
> seems to be that we don't resolve the configuration wrt the specified
> command line options before calling into `CustomCommandLine.isActive`. If
> we parsed first the command line configuration options which can overwrite
> flink-conf.yaml options and then replaced them, then the custom command
> lines (assuming that they use the Configuration as the ground truth) should
> behave consistently.
> >
> > For your questions:
> >
> > 1. I am not 100% sure. I think the FlinkYarnSessionCli wasn't used on
> purpose when introducing the yarn application mode.
> > 2. See answer 1.
> >
> > I think it is a good idea to extend the description of the config option
> `execution.target`. Do you want to create a ticket and a PR for it?
> >
> > Cheers,
> > Till
> >
> > On Mon, Apr 26, 2021 at 8:37 AM Yangze Guo  wrote:
> >>
> >> Hi, Tony.
> >>
> >> What is the version of your flink-dist. AFAIK, this issue should be
> >> addressed in FLINK-15852[1]. Could you give the client log of case
> >> 2(set the log level to DEBUG would be better).
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-15852
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Sun, Apr 25, 2021 at 11:33 AM Tony Wei 
> wrote:
> >> >
> >> > Hi Experts,
> >> >
> >> > I recently tried to run yarn-application mode on my yarn cluster, and
> I had a problem related to configuring `execution.target`.
> >> > After reading the source code and doing some experiments, I found
> that there should be some room of improvement for `FlinkYarnSessionCli` or
> `AbstractYarnCli`.
> >> >
> >> > My experiments are:
> >> >
> >> > setting `execution.target: yarn-application` in flink-conf.yaml and
> run `flink run-application -t yarn-application`: run job successfully.
> >> >
> >> > `FlinkYarnSessionCli` is not active
> >> > `GenericCLI` is active
> >> >
> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
> `flink run-application -t yarn-application`: run job failed
> >> >
> >> > failed due to `ClusterDeploymentException` [1]
> >> > `FlinkYarnSessionCli` is active
> >> >
> >> > setting `execution.target: yarn-application` in flink-conf.yaml and
> run `flink run -t yarn-per-job`: run job successfully.
> >> >
> >> > `FlinkYarnSessionCli` is not active
> >> > `GenericCLI` is active
> >> >
> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
> `flink run -t yarn-per-job`: run job successfully.
> >> >
> >> > `FlinkYarnSessionCli` is active
> >> >
> >> > From `AbstractYarnCli#isActive` [2] and
> `FlinkYarnSessionCli#isActive` [3], `FlinkYarnSessionCli` will be active
> when `execution.target` is specified with `yarn-per-job` or `yarn-session`.
> >> >
> >> > According to the flink official document [4], I thought the 2nd
> experiment should also work well, but it didn't.
> >> >>
> >> >> The --target will overwrite the execution.target specified in the
> config/flink-config.yaml.
> >> >
> >> >
> >> > The root cause is that `FlinkYarnSessionCli` only overwrite the
> `execution.target` with `yarn-session` or `yarn-per-job` [5], but no
> `yarn-application`.
> >> > So, my question is
> >> >
> >> > should we use `FlinkYarnSessionCli` in case 2?
> >> > if we should, how we can improve `FlinkYarnSessionCli` so that we can
> overwrite `execution.target` via `--target`?
> >> >
> >> > and one more improvement, the config description for
> `execution.target` [6] should include `yarn-application` as well.
> >> >
> >> > [1]
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L439-L447
> >> > [2]
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/AbstractYarnCli.java#L54-L66
> >> > [3]
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L373-L377
> >> > [4]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#selecting-deployment-targets
> 

Re: when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-26 Thread Yangze Guo
If the GenericCLI is selected, then the execution.target should have
been overwritten to "yarn-application" in GenericCLI#toConfiguration.
It is odd that why the GenericCLI#isActive return false as the
execution.target is defined in both flink-conf and command line.

Best,
Yangze Guo

On Mon, Apr 26, 2021 at 5:14 PM Till Rohrmann  wrote:
>
> I think you are right that the `GenericCLI` should be the first choice. From 
> the top of my head I do not remember why FlinkYarnSessionCli is still used. 
> Maybe it is in order to support some Yarn specific cli option parsing. I 
> assume it is either an oversight or some parsing has not been completely 
> migrated to the GenericCLI.
>
> Cheers,
> Till
>
> On Mon, Apr 26, 2021 at 11:07 AM Yangze Guo  wrote:
>>
>> Hi, Till,
>>
>> I agree that we need to resolve the issue by overriding the
>> configuration before selecting the CustomCommandLines. However, IIUC,
>> after FLINK-15852 the GenericCLI should always be the first choice.
>> Could you help me to understand why the FlinkYarnSessionCli can be
>> activated?
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Mon, Apr 26, 2021 at 4:48 PM Till Rohrmann  wrote:
>> >
>> > Hi Tony,
>> >
>> > I think you are right that Flink's cli does not behave super consistent at 
>> > the moment. Case 2. should definitely work because `-t yarn-application` 
>> > should overwrite what is defined in the Flink configuration. The problem 
>> > seems to be that we don't resolve the configuration wrt the specified 
>> > command line options before calling into `CustomCommandLine.isActive`. If 
>> > we parsed first the command line configuration options which can overwrite 
>> > flink-conf.yaml options and then replaced them, then the custom command 
>> > lines (assuming that they use the Configuration as the ground truth) 
>> > should behave consistently.
>> >
>> > For your questions:
>> >
>> > 1. I am not 100% sure. I think the FlinkYarnSessionCli wasn't used on 
>> > purpose when introducing the yarn application mode.
>> > 2. See answer 1.
>> >
>> > I think it is a good idea to extend the description of the config option 
>> > `execution.target`. Do you want to create a ticket and a PR for it?
>> >
>> > Cheers,
>> > Till
>> >
>> > On Mon, Apr 26, 2021 at 8:37 AM Yangze Guo  wrote:
>> >>
>> >> Hi, Tony.
>> >>
>> >> What is the version of your flink-dist. AFAIK, this issue should be
>> >> addressed in FLINK-15852[1]. Could you give the client log of case
>> >> 2(set the log level to DEBUG would be better).
>> >>
>> >> [1] https://issues.apache.org/jira/browse/FLINK-15852
>> >>
>> >> Best,
>> >> Yangze Guo
>> >>
>> >> On Sun, Apr 25, 2021 at 11:33 AM Tony Wei  wrote:
>> >> >
>> >> > Hi Experts,
>> >> >
>> >> > I recently tried to run yarn-application mode on my yarn cluster, and I 
>> >> > had a problem related to configuring `execution.target`.
>> >> > After reading the source code and doing some experiments, I found that 
>> >> > there should be some room of improvement for `FlinkYarnSessionCli` or 
>> >> > `AbstractYarnCli`.
>> >> >
>> >> > My experiments are:
>> >> >
>> >> > setting `execution.target: yarn-application` in flink-conf.yaml and run 
>> >> > `flink run-application -t yarn-application`: run job successfully.
>> >> >
>> >> > `FlinkYarnSessionCli` is not active
>> >> > `GenericCLI` is active
>> >> >
>> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run 
>> >> > `flink run-application -t yarn-application`: run job failed
>> >> >
>> >> > failed due to `ClusterDeploymentException` [1]
>> >> > `FlinkYarnSessionCli` is active
>> >> >
>> >> > setting `execution.target: yarn-application` in flink-conf.yaml and run 
>> >> > `flink run -t yarn-per-job`: run job successfully.
>> >> >
>> >> > `FlinkYarnSessionCli` is not active
>> >> > `GenericCLI` is active
>> >> >
>> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run 
>> >> > `flink run -t yarn-per-job`: run job successfully.
>> >> >
>> >> > `FlinkYarnSessionCli` is active
>> >> >
>> >> > From `AbstractYarnCli#isActive` [2] and `FlinkYarnSessionCli#isActive` 
>> >> > [3], `FlinkYarnSessionCli` will be active when `execution.target` is 
>> >> > specified with `yarn-per-job` or `yarn-session`.
>> >> >
>> >> > According to the flink official document [4], I thought the 2nd 
>> >> > experiment should also work well, but it didn't.
>> >> >>
>> >> >> The --target will overwrite the execution.target specified in the 
>> >> >> config/flink-config.yaml.
>> >> >
>> >> >
>> >> > The root cause is that `FlinkYarnSessionCli` only overwrite the 
>> >> > `execution.target` with `yarn-session` or `yarn-per-job` [5], but no 
>> >> > `yarn-application`.
>> >> > So, my question is
>> >> >
>> >> > should we use `FlinkYarnSessionCli` in case 2?
>> >> > if we should, how we can improve `FlinkYarnSessionCli` so that we can 
>> >> > overwrite `execution.target` via `--target`?
>> >> >
>> >> > and one more improvement, the config description for `execution.target` 
>> >> > [6] 

Re: when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-26 Thread Tony Wei
Hi Till, Yangze,

I think FLINK-15852 should solve my problem.
It is my fault that my flink version is not 100% consistent with the
community version, and FLINK-15852 is the one I missed.
Thanks for your information.

best regards,

Till Rohrmann  於 2021年4月26日 週一 下午5:14寫道:

> I think you are right that the `GenericCLI` should be the first choice.
> From the top of my head I do not remember why FlinkYarnSessionCli is still
> used. Maybe it is in order to support some Yarn specific cli option
> parsing. I assume it is either an oversight or some parsing has not been
> completely migrated to the GenericCLI.
>
> Cheers,
> Till
>
> On Mon, Apr 26, 2021 at 11:07 AM Yangze Guo  wrote:
>
>> Hi, Till,
>>
>> I agree that we need to resolve the issue by overriding the
>> configuration before selecting the CustomCommandLines. However, IIUC,
>> after FLINK-15852 the GenericCLI should always be the first choice.
>> Could you help me to understand why the FlinkYarnSessionCli can be
>> activated?
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Mon, Apr 26, 2021 at 4:48 PM Till Rohrmann 
>> wrote:
>> >
>> > Hi Tony,
>> >
>> > I think you are right that Flink's cli does not behave super consistent
>> at the moment. Case 2. should definitely work because `-t yarn-application`
>> should overwrite what is defined in the Flink configuration. The problem
>> seems to be that we don't resolve the configuration wrt the specified
>> command line options before calling into `CustomCommandLine.isActive`. If
>> we parsed first the command line configuration options which can overwrite
>> flink-conf.yaml options and then replaced them, then the custom command
>> lines (assuming that they use the Configuration as the ground truth) should
>> behave consistently.
>> >
>> > For your questions:
>> >
>> > 1. I am not 100% sure. I think the FlinkYarnSessionCli wasn't used on
>> purpose when introducing the yarn application mode.
>> > 2. See answer 1.
>> >
>> > I think it is a good idea to extend the description of the config
>> option `execution.target`. Do you want to create a ticket and a PR for it?
>> >
>> > Cheers,
>> > Till
>> >
>> > On Mon, Apr 26, 2021 at 8:37 AM Yangze Guo  wrote:
>> >>
>> >> Hi, Tony.
>> >>
>> >> What is the version of your flink-dist. AFAIK, this issue should be
>> >> addressed in FLINK-15852[1]. Could you give the client log of case
>> >> 2(set the log level to DEBUG would be better).
>> >>
>> >> [1] https://issues.apache.org/jira/browse/FLINK-15852
>> >>
>> >> Best,
>> >> Yangze Guo
>> >>
>> >> On Sun, Apr 25, 2021 at 11:33 AM Tony Wei 
>> wrote:
>> >> >
>> >> > Hi Experts,
>> >> >
>> >> > I recently tried to run yarn-application mode on my yarn cluster,
>> and I had a problem related to configuring `execution.target`.
>> >> > After reading the source code and doing some experiments, I found
>> that there should be some room of improvement for `FlinkYarnSessionCli` or
>> `AbstractYarnCli`.
>> >> >
>> >> > My experiments are:
>> >> >
>> >> > setting `execution.target: yarn-application` in flink-conf.yaml and
>> run `flink run-application -t yarn-application`: run job successfully.
>> >> >
>> >> > `FlinkYarnSessionCli` is not active
>> >> > `GenericCLI` is active
>> >> >
>> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
>> `flink run-application -t yarn-application`: run job failed
>> >> >
>> >> > failed due to `ClusterDeploymentException` [1]
>> >> > `FlinkYarnSessionCli` is active
>> >> >
>> >> > setting `execution.target: yarn-application` in flink-conf.yaml and
>> run `flink run -t yarn-per-job`: run job successfully.
>> >> >
>> >> > `FlinkYarnSessionCli` is not active
>> >> > `GenericCLI` is active
>> >> >
>> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
>> `flink run -t yarn-per-job`: run job successfully.
>> >> >
>> >> > `FlinkYarnSessionCli` is active
>> >> >
>> >> > From `AbstractYarnCli#isActive` [2] and
>> `FlinkYarnSessionCli#isActive` [3], `FlinkYarnSessionCli` will be active
>> when `execution.target` is specified with `yarn-per-job` or `yarn-session`.
>> >> >
>> >> > According to the flink official document [4], I thought the 2nd
>> experiment should also work well, but it didn't.
>> >> >>
>> >> >> The --target will overwrite the execution.target specified in the
>> config/flink-config.yaml.
>> >> >
>> >> >
>> >> > The root cause is that `FlinkYarnSessionCli` only overwrite the
>> `execution.target` with `yarn-session` or `yarn-per-job` [5], but no
>> `yarn-application`.
>> >> > So, my question is
>> >> >
>> >> > should we use `FlinkYarnSessionCli` in case 2?
>> >> > if we should, how we can improve `FlinkYarnSessionCli` so that we
>> can overwrite `execution.target` via `--target`?
>> >> >
>> >> > and one more improvement, the config description for
>> `execution.target` [6] should include `yarn-application` as well.
>> >> >
>> >> > [1]
>> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L43

Flink Hive connector: hive-conf-dir supports hdfs URI, while hadoop-conf-dir supports local path only?

2021-04-26 Thread Yik San Chan
Hi community,

This question is cross-posted on Stack Overflow
https://stackoverflow.com/questions/67264156/flink-hive-connector-hive-conf-dir-supports-hdfs-uri-while-hadoop-conf-dir-sup

In my current setup, local dev env can access testing env. I would like to
run Flink job on local dev env, while reading/writing data from/to testing
env Hive.

This is what I do:

```
CREATE CATALOG hive WITH (
'type' = 'hive',
'hive-conf-dir' = 'hdfs://testhdp273/hive/conf'
)
```

However, I realizes I also need to specify a matching Hadoop classpath,
therefore I want to also define `hadoop-conf-dir` that actually points to
the hadoop classpath in testing env. However, as said in [docs](
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/#hadoop-conf-dir
):

> Path to Hadoop conf dir. Only local file system paths are supported. The
recommended way to set Hadoop conf is via the HADOOP_CONF_DIR environment
variable. Use the option only if environment variable doesn't work for you,
e.g. if you want to configure each HiveCatalog separately.

I wonder why hadoop-conf-dir only supports local path, while hive-conf-dir
supports any legit hdfs path? Any work around to this problem?

Any help? Thanks!

Best,
Yik San


Re: when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-26 Thread Tony Wei
Hi Till,

I have created the ticket to extend the description of `execution.targe`.
https://issues.apache.org/jira/browse/FLINK-22476

best regards,

Tony Wei  於 2021年4月26日 週一 下午5:24寫道:

> Hi Till, Yangze,
>
> I think FLINK-15852 should solve my problem.
> It is my fault that my flink version is not 100% consistent with the
> community version, and FLINK-15852 is the one I missed.
> Thanks for your information.
>
> best regards,
>
> Till Rohrmann  於 2021年4月26日 週一 下午5:14寫道:
>
>> I think you are right that the `GenericCLI` should be the first choice.
>> From the top of my head I do not remember why FlinkYarnSessionCli is still
>> used. Maybe it is in order to support some Yarn specific cli option
>> parsing. I assume it is either an oversight or some parsing has not been
>> completely migrated to the GenericCLI.
>>
>> Cheers,
>> Till
>>
>> On Mon, Apr 26, 2021 at 11:07 AM Yangze Guo  wrote:
>>
>>> Hi, Till,
>>>
>>> I agree that we need to resolve the issue by overriding the
>>> configuration before selecting the CustomCommandLines. However, IIUC,
>>> after FLINK-15852 the GenericCLI should always be the first choice.
>>> Could you help me to understand why the FlinkYarnSessionCli can be
>>> activated?
>>>
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Mon, Apr 26, 2021 at 4:48 PM Till Rohrmann 
>>> wrote:
>>> >
>>> > Hi Tony,
>>> >
>>> > I think you are right that Flink's cli does not behave super
>>> consistent at the moment. Case 2. should definitely work because `-t
>>> yarn-application` should overwrite what is defined in the Flink
>>> configuration. The problem seems to be that we don't resolve the
>>> configuration wrt the specified command line options before calling into
>>> `CustomCommandLine.isActive`. If we parsed first the command line
>>> configuration options which can overwrite flink-conf.yaml options and then
>>> replaced them, then the custom command lines (assuming that they use the
>>> Configuration as the ground truth) should behave consistently.
>>> >
>>> > For your questions:
>>> >
>>> > 1. I am not 100% sure. I think the FlinkYarnSessionCli wasn't used on
>>> purpose when introducing the yarn application mode.
>>> > 2. See answer 1.
>>> >
>>> > I think it is a good idea to extend the description of the config
>>> option `execution.target`. Do you want to create a ticket and a PR for it?
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Mon, Apr 26, 2021 at 8:37 AM Yangze Guo  wrote:
>>> >>
>>> >> Hi, Tony.
>>> >>
>>> >> What is the version of your flink-dist. AFAIK, this issue should be
>>> >> addressed in FLINK-15852[1]. Could you give the client log of case
>>> >> 2(set the log level to DEBUG would be better).
>>> >>
>>> >> [1] https://issues.apache.org/jira/browse/FLINK-15852
>>> >>
>>> >> Best,
>>> >> Yangze Guo
>>> >>
>>> >> On Sun, Apr 25, 2021 at 11:33 AM Tony Wei 
>>> wrote:
>>> >> >
>>> >> > Hi Experts,
>>> >> >
>>> >> > I recently tried to run yarn-application mode on my yarn cluster,
>>> and I had a problem related to configuring `execution.target`.
>>> >> > After reading the source code and doing some experiments, I found
>>> that there should be some room of improvement for `FlinkYarnSessionCli` or
>>> `AbstractYarnCli`.
>>> >> >
>>> >> > My experiments are:
>>> >> >
>>> >> > setting `execution.target: yarn-application` in flink-conf.yaml and
>>> run `flink run-application -t yarn-application`: run job successfully.
>>> >> >
>>> >> > `FlinkYarnSessionCli` is not active
>>> >> > `GenericCLI` is active
>>> >> >
>>> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
>>> `flink run-application -t yarn-application`: run job failed
>>> >> >
>>> >> > failed due to `ClusterDeploymentException` [1]
>>> >> > `FlinkYarnSessionCli` is active
>>> >> >
>>> >> > setting `execution.target: yarn-application` in flink-conf.yaml and
>>> run `flink run -t yarn-per-job`: run job successfully.
>>> >> >
>>> >> > `FlinkYarnSessionCli` is not active
>>> >> > `GenericCLI` is active
>>> >> >
>>> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
>>> `flink run -t yarn-per-job`: run job successfully.
>>> >> >
>>> >> > `FlinkYarnSessionCli` is active
>>> >> >
>>> >> > From `AbstractYarnCli#isActive` [2] and
>>> `FlinkYarnSessionCli#isActive` [3], `FlinkYarnSessionCli` will be active
>>> when `execution.target` is specified with `yarn-per-job` or `yarn-session`.
>>> >> >
>>> >> > According to the flink official document [4], I thought the 2nd
>>> experiment should also work well, but it didn't.
>>> >> >>
>>> >> >> The --target will overwrite the execution.target specified in the
>>> config/flink-config.yaml.
>>> >> >
>>> >> >
>>> >> > The root cause is that `FlinkYarnSessionCli` only overwrite the
>>> `execution.target` with `yarn-session` or `yarn-per-job` [5], but no
>>> `yarn-application`.
>>> >> > So, my question is
>>> >> >
>>> >> > should we use `FlinkYarnSessionCli` in case 2?
>>> >> > if we should, how we can improve `FlinkYarnSessionCli` so that we
>>> can over

PyFlink: Shall we disallow relative URL for filesystem path?

2021-04-26 Thread Yik San Chan
Hi community,

When using Filesystem SQL Connector, users need to provide a path. When
running a PyFlink job using the mini-cluster mode by simply `python
WordCount.py`, the path can be a relative path, such as, `words.txt`.
However, trying to submit the job to `flink run` will fail without
question, because `flink run` expects an absolute file path prefix-ed with
a `file://`, for example, `file:///tmp/input/words.txt`.

Since we will need to submit our job implementation to run on prod anyway,
I recommend disallowing relative paths such as `words.txt`. This will save
users from spending hours to figure out "Why I can `python` run this job,
while `flink run` fails?"

Look forward to your feedback.

Best,
Yik San


Contradictory docs: python.files config can include not only python files

2021-04-26 Thread Yik San Chan
Hi community,

In
https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/python_config.html,
regarding python.files:

> Attach custom python files for job.

This makes readers think only Python files are allowed here. However, in
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#submitting-pyflink-jobs
:

./bin/flink run \
  --python examples/python/table/batch/word_count.py \
  --pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt

It is obviously including .txt file that is not Python files.

I believe it is contradictory here. Can anyone confirm?

Best,
Yik San


Confusing docs on python.archives

2021-04-26 Thread Yik San Chan
Hi community,

In
https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/python_config.html#python-options
,

> For each archive file, a target directory is specified. If the target
directory name is specified, the archive file will be extracted to a name
can directory with the specified name. Otherwise, the archive file will be
extracted to a directory with the same name of the archive file.

I don't get what does "the archive file will be extracted to a name can
directory with the specified name" mean. Maybe there are typos?

best,
Yik San


Re: Contradictory docs: python.files config can include not only python files

2021-04-26 Thread Dian Fu
Hi Yik San,

1) what `--pyFiles` is used for:
All the files specified via `--pyFiles` will be put in the PYTHONPATH of the 
Python worker during execution and then they will be available for the Python 
user-defined functions during execution. 

2) validate for the files passed to `--pyFiles`
Currently it will not validate the files passed to this argument. I also think 
that it’s not necessary and not able to perform such kind of check. Do you have 
any advice for this?

Regards,
Dian

> 2021年4月26日 下午8:45,Yik San Chan  写道:
> 
> Hi community,
> 
> In 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/python_config.html
>  
> ,
>  regarding python.files:
> 
> > Attach custom python files for job.
> 
> This makes readers think only Python files are allowed here. However, in 
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#submitting-pyflink-jobs
>  
> :
> 
> ./bin/flink run \
>   --python examples/python/table/batch/word_count.py \
>   --pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
> It is obviously including .txt file that is not Python files.
> 
> I believe it is contradictory here. Can anyone confirm?
> 
> Best,
> Yik San



Re: Confusing docs on python.archives

2021-04-26 Thread Dian Fu
Hi Yik San,

It should be a typo issue. I guess it should be `If the target directory name 
is specified, the archive file will be extracted to a directory with the 
specified name.`

Regards,
Dian

> 2021年4月26日 下午8:57,Yik San Chan  写道:
> 
> Hi community,
> 
> In 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/python_config.html#python-options
>  
> ,
> 
> > For each archive file, a target directory is specified. If the target 
> > directory name is specified, the archive file will be extracted to a name 
> > can directory with the specified name. Otherwise, the archive file will be 
> > extracted to a directory with the same name of the archive file.
> 
> I don't get what does "the archive file will be extracted to a name can 
> directory with the specified name" mean. Maybe there are typos?
> 
> best,
> Yik San



Re: Task Local Recovery with mountable disks in the cloud

2021-04-26 Thread Till Rohrmann
Hi Sonam,

sorry for the late reply. We were a bit caught in the midst of the feature
freeze for the next major Flink release.

In general, I think it is a very good idea to disaggregate the local state
storage to make it reusable across TaskManager failures. However, it is
also not trivial to do.

Maybe let me first describe how the current task local recovery works and
then see how we could improve it:

Flink creates for every slot allocation an AllocationID. The AllocationID
associates a slot on a TaskExecutor with a job and is also used for scoping
the lifetime of a slot wrt a job (theoretically, one and the same slot
could be used to fulfill multiple slot requests of the same job if the slot
allocation is freed in between). Note that the AllocationID is a random ID
and, thus, changes whenever the ResourceManager allocates a new slot on a
TaskExecutor for a job.

Task local recovery is effectively a state cache which is associated with
an AllocationID. So for every checkpoint and every task, a TaskExecutor
copies the state data and stores them in the task local recovery cache. The
cache is maintained as long as the slot allocation is valid (e.g. the slot
has not been freed by the JobMaster and the slot has not timed out). This
makes the lifecycle management of the state data quite easy and makes sure
that a process does not clutter local disks. On the JobMaster side, Flink
remembers for every Execution, where it is deployed (it remembers the
AllocationID). If a failover happens, then Flink tries to re-deploy the
Executions into the slots they were running in before by matching the
AllocationIDs.

The reason why we scoped the state cache to an AllocationID was for
simplicity and because we couldn't guarantee that a failed TaskExecutor X
will be restarted on the same machine again and thereby having access to
the same local disk as before. That's also why Flink deletes the cache
directory when a slot is freed or when the TaskExecutor is shut down
gracefully.

With persistent volumes this changes and we can make the TaskExecutors
"stateful" in the sense that we can reuse an already occupied cache. One
rather simple idea could be to also persist the slot allocations of a
TaskExecutor (which slot is allocated and what is its assigned
AllocationID). This information could be used to re-initialize the
TaskExecutor upon restart. That way, it does not have to register at the
ResourceManager and wait for new slot allocations but could directly start
offering its slots to the jobs it remembered. If the TaskExecutor cannot
find the JobMasters for the respective jobs, it would then free the slots
and clear the cache accordingly.

This could work as long as the ResourceManager does not start new
TaskExecutors whose slots could be used to recover the job. If this is a
problem, then one needs to answer the question how long to wait for the old
TaskExecutors to come back and reusing their local state vs. starting
quickly a fresh instance but having to restore state remotely.

An alternative solution proposal which is probably more powerful albeit
also more complex would be to make the cache information explicit when
registering the TaskExecutor at the ResourceManager and later offering
slots to the JobMaster. For example, the TaskExecutor could tell the
ResourceManager which states it has locally cached (it probably needs to
contain key group ranges for every stored state) and this information could
be used to decide from which TaskExecutor to allocate slots for a job.
Similarly on the JobMaster side we could use this information to calculate
the best mapping between Executions and slots. I think that mechanism could
better deal with rescaling events where there is no perfect match between
Executions and slots because of the changed key group ranges.

So to answer your question: There is currently no way to preserve
AllocationIDs across restarts. However, we could use the persistent volume
to store this information so that we can restore it on restart of a
TaskExecutor. This could enable task local state recovery for cases where
we lose a TaskExecutor and restart it with the same persistent volume.

Cheers,
Till

On Wed, Apr 21, 2021 at 7:26 PM Stephan Ewen  wrote:

> /cc dev@flink
>
>
> On Tue, Apr 20, 2021 at 1:29 AM Sonam Mandal 
> wrote:
>
> > Hello,
> >
> > We've been experimenting with Task-local recovery using Kubernetes. We
> > have a way to specify mounting the same disk across Task Manager
> > restarts/deletions for when the pods get recreated. In this scenario, we
> > noticed that task local recovery does not kick in (as expected based on
> the
> > documentation).
> >
> > We did try to comment out the code on the shutdown path which cleaned up
> > the task local directories before the pod went down / was restarted. We
> > noticed that remote recovery kicked in even though the task local state
> was
> > present. I noticed that the slot IDs changed, and was wondering if this
> is
> > the main reason that the t

Re: The wrong Options of Kafka Connector, will make the cluster can not run any job

2021-04-26 Thread Robert Metzger
Thanks a lot for your message. This could be a bug in Flink. It seems that
the archival of the execution graph is failing because some classes are
unloaded.

What I observe from your stack traces is that some classes are loaded from
flink-dist_2.11-1.11.2.jar, while other classes are loaded from
template-common-jar-0.0.1. Maybe Flink is closing the usercode classloader,
and this is causing the exception during the archival of the execution
graph. Can you make sure that the core Flink classes are only in your
classpath once (in flink-dist), and the template-common-jar-0.0.1 doesn't
contain the runtime Flink classes? (for example by setting the Flink
dependencies to provided when using the maven-shade-plugin).

For the issue while submitting the job, I can not provide you any further
help, because you haven't posted the exception that occurred in the REST
handler. Could you post this exception here as well?

Best wishes,
Robert



On Sun, Apr 25, 2021 at 2:44 PM chenxuying  wrote:

> environment:
>
> flinksql 1.12.2
>
> k8s session mode
>
> description:
>
> I got follow error log when my kafka connector port was wrong
>
> >
>
> 2021-04-25 16:49:50
>
> org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
> expired before the position for partition filebeat_json_install_log-3 could
> be determined
>
> >
>
>
> I got follow error log when my kafka connector ip was wrong
>
> >
>
> 2021-04-25 20:12:53
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout
> expired while fetching topic metadata
>
> >
>
>
> When the job was cancelled,there was follow error log:
>
> >
>
> 2021-04-25 08:53:41,115 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
> v2_ods_device_action_log (fcc451b8a521398b10e5b86153141fbf) switched from
> state CANCELLING to CANCELED.
>
> 2021-04-25 08:53:41,115 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping
> checkpoint coordinator for job fcc451b8a521398b10e5b

Re: Flink missing Kafka records

2021-04-26 Thread Robert Metzger
Hi Dan,

Can you describe under which conditions you are missing records (after a
machine failure, after a Kafka failure, after taking and restoring from a
savepoint, ...).
Are many records missing? Are "the first records" or the "latest records"
missing? Any individual records missing, or larger blocks of data?

I don't think that there's a bug in Flink or the Kafka connector. Maybe its
just a configuration or systems design issue.


On Sun, Apr 25, 2021 at 9:56 AM Dan Hill  wrote:

> Hi!
>
> Have any other devs noticed issues with Flink missing Kafka records with
> long-running Flink jobs?  When I re-run my Flink job and start from the
> earliest Kafka offset, Flink processes the events correctly.  I'm using
> Flink v1.11.1.
>
> I have a simple job that takes records (Requests) from Kafka and
> serializes them to S3.  Pretty basic.  No related issues in the text logs.
> I'm hoping I just have a configuration issue.  I'm guessing idleness is
> working in a way that I'm not expecting.
>
> Any ideas?
> - Dan
>
>
> void createLogRequestJob(StreamExecutionEnvironment env) throws Exception {
>
>   Properties kafkaSourceProperties =
> getKafkaSourceProperties("logrequest");
>
>   SingleOutputStreamOperator rawRequestInput = env.addSource(
>
> new FlinkKafkaConsumer(getInputRequestTopic(),
> getProtoDeserializationSchema(Request.class), kafkaSourceProperties))
>
>   .uid("source-request")
>
>   .name("Request")
>
>   .assignTimestampsAndWatermarks(
>
>
> WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1)));
>
>
>   executeLogRequest(rawRequestInput);
>
>   env.execute("log-request");
>
> }
>
>
> void executeLogRequest(SingleOutputStreamOperator
> rawRequestInput) {
>
>   AvroWriterFactory factory =
> getAvroWriterFactory(Request.class);
>
>   rawRequestInput.addSink(StreamingFileSink
>
>   .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"),
> factory)
>
>   .withBucketAssigner(new DateHourBucketAssigner(request ->
> request.getTiming().getEventApiTimestamp()))
>
>   .withRollingPolicy(OnCheckpointRollingPolicy.build())
>
>   .withOutputFileConfig(createOutputFileConfig())
>
>   .build())
>
> .uid("sink-s3-raw-request")
>
> .name("S3 Raw Request");
>
> }
>
>
>
>


Re: Correctly serializing "Number" as state in ProcessFunction

2021-04-26 Thread Robert Metzger
Quick comment on the kryo type registration and the messages you are
seeing: The messages are expected: What the message is saying is that we
are not serializing the type using Flink's POJO serializer, but we are
falling back to Kryo.
Since you are registering all the instances of Number that you are using
(Integer, Double), you'll get better performance (or at least less CPU
load) with Kryo. So if you want to keep using Kryo, you are doing
everything right (and you generally won't be able to use Flink's POJO
serializer for Number-types).

On Fri, Apr 23, 2021 at 7:07 PM Miguel Araújo 
wrote:

> Thanks for your replies. I agree this is a somewhat general problem.
> I posted it here as I was trying to register the valid subclasses in Kryo
> but I couldn't get the message to go away, i.e., everything worked
> correctly but there was the complaint that GenericType serialization was
> being used.
>
> This is how I was registering these types:
>
> env.getConfig.registerKryoType(classOf[java.lang.Integer])
> env.getConfig.registerKryoType(classOf[java.lang.Double])
>
> and this is the message I got on every event:
>
> flink-task-manager_1  | 2021-04-23 16:48:29.274 [Processor Function 1
> (1/2)#0] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - No
> fields were detected for class java.lang.Number so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
>
> In the meanwhile, I've changed my approach to reuse a protobuf type I
> already had as part of my input event.
>
> Once again, thanks for your replies because they gave me the right
> perspective.
>
>
>
> Arvid Heise  escreveu no dia quarta, 21/04/2021 à(s)
> 18:26:
>
>> Hi Miguel,
>>
>> as Klemens said this is a rather general problem independent of Flink:
>> How do you map Polymorphism in serialization?
>>
>> Flink doesn't have an answer on its own, as it's discouraged (A Number
>> can have arbitrary many subclasses: how do you distinguish them except by
>> classname? That adds a ton of overhead.). The easiest solution in your case
>> is to convert ints into double.
>> Or you use Kryo which dictionary encodes the classes and also limits the
>> possible subclasses.
>>
>> On Tue, Apr 20, 2021 at 11:13 AM Klemens Muthmann <
>> klemens.muthm...@cyface.de> wrote:
>>
>>> Hi,
>>>
>>> I guess this is more of a Java Problem than a Flink Problem. If you want
>>> it quick and dirty you could implement a class such as:
>>>
>>> public class Value {
>>> private boolean isLongSet = false;
>>> private long longValue = 0L;
>>> private boolean isIntegerSet = false;
>>> private int intValue = 0;
>>>
>>>public Value(final long value) {
>>>setLong(value);
>>>}
>>>
>>> public void setLong(final long value) |
>>> longValue = value;
>>> isLongSet = true;
>>>}
>>>
>>>public long getLong() {
>>>if(isLongSet) {
>>>return longValue
>>>}
>>>}
>>>
>>>// Add same methods for int
>>>// to satisfy POJO requirements you will also need to add a
>>> no-argument constructor as well as getters and setters for the boolean flags
>>> }
>>>
>>> I guess a cleaner solution would be possible using a custom Kryo
>>> serializer as explained here:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>>>
>>> Regards
>>>   Klemens
>>>
>>>
>>>
>>> > Am 20.04.2021 um 10:34 schrieb Miguel Araújo <
>>> miguelaraujo...@gmail.com>:
>>> >
>>> > Hi everyone,
>>> >
>>> > I have a ProcessFunction which needs to store different number types
>>> for different keys, e.g., some keys need to store an integer while others
>>> need to store a double.
>>> >
>>> > I tried to use java.lang.Number as the type for the ValueState, but I
>>> got the expected "No fields were detected for class java.lang.Number so it
>>> cannot be used as a POJO type and must be processed as GenericType."
>>> >
>>> > I have the feeling that this is not the right approach, but the exact
>>> type to be stored is only known at runtime which makes things a bit
>>> trickier. Is there a way to register these classes correctly, or Is it
>>> preferable to use different ValueState's for different types?
>>> >
>>> > Thanks,
>>> > Miguel
>>>
>>>


RE: [1.9.2] Flink SSL on YARN - NoSuchFileException

2021-04-26 Thread Hailu, Andreas [Engineering]
Hi Arvid, thanks for the reply.

Our stores are world-readable, so I don’t think that it’s an access issue. All 
of our clients have the stores present through a shared mount as well. I’m able 
to see the shipped stores in the directory.info output when pulling the YARN 
logs, and can confirm the account submitting the application has correct 
privileges.

The exception I shared occurs during the cluster deployment phase. Here’s the 
full stacktrace:

2021-04-26 13:37:17,468 [main] ERROR ClusterEntrypoint - Could not start 
cluster entrypoint YarnSessionClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint YarnSessionClusterEntrypoint.
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:182)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)
at 
org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:93)
Caused by: org.apache.flink.util.FlinkException: Could not create the 
DispatcherResourceManagerComponent.
at 
org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentF
actory.java:257)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:210)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
... 2 more
Caused by: org.apache.flink.util.ConfigurationException: Failed to initialize 
SSLEngineFactory for REST server endpoint.
at 
org.apache.flink.runtime.rest.RestServerEndpointConfiguration.fromConfiguration(RestServerEndpointConfiguration.java:162)
at 
org.apache.flink.runtime.rest.SessionRestEndpointFactory.createRestEndpoint(SessionRestEndpointFactory.java:54)
at 
org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentF
actory.java:150)
... 9 more
Caused by: java.nio.file.NoSuchFileException: 
/home/user/ssl/deploy-keys/rest.keystore
at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at 
sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at 
java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at 
org.apache.flink.runtime.net.SSLUtils.getKeyManagerFactory(SSLUtils.java:266)
at 
org.apache.flink.runtime.net.SSLUtils.createRestNettySSLContext(SSLUtils.java:392)
at 
org.apache.flink.runtime.net.SSLUtils.createRestNettySSLContext(SSLUtils.java:365)
   at 
org.apache.flink.runtime.net.SSLUtils.createRestServerSSLEngineFactory(SSLUtils.java:163)
at 
org.apache.flink.runtime.rest.RestServerEndpointConfiguration.fromConfiguration(RestServerEndpointConfiguration.java:160)
... 11 more

Given the number of machines in our YARN compute cluster, we’d really like to 
avoid having to have to copy the stores to each machine as that would add 
another step in configuration each time a machine is replaced, added, etc. The 
YARN shipping feature is really what we need.

The documentation [1] says that we should be able to ship the stores directly 
from my our client:

flink run -m yarn-cluster -yt deploy-keys/ flinkapp.jar

But it doesn’t provide an example of the requisite change made in the 
flink-conf.yaml that supports shipped stores.

If we consider that we have the stores available in a local directory called 
/home/user/ssl/deploy-keys/, and we’re shipping the directory through the –yt 
option, what do the values of:

1. security.ssl.rest.keystore

2. security.ssl.rest.truststore
Need to be in order for this to work? Happy to share our failed application’s 
YARN logs with you If you require them.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/security-ssl.html#tips-for-yarn--mesos-deployment

// ah

From: Arvid Heise 
Sent: Wednesday, April 21, 2021 1:05 PM
To: Hailu, Andreas [Engineering] 

Re: Flink Metric isBackPressured not available

2021-04-26 Thread David Anderson
The isBackPressured metric is a Boolean -- it reports true or false, rather
than 1 or 0. The Flink web UI can not display it (it shows NaN); perhaps
the same is true for Datadog.

https://issues.apache.org/jira/browse/FLINK-15753 relates to this.

Regards,
David

On Tue, Apr 13, 2021 at 12:13 PM Claude M  wrote:

> Thanks for your reply.  I'm using Flink 1.12.  I'm checking in Datadog and
> the metric is not available there.
> It has other task/operator metrics such as numRecordsIn/numRecordsOut
> there but not the isBackPressured.
>
>
> On Mon, Apr 12, 2021 at 8:40 AM Roman Khachatryan 
> wrote:
>
>> Hi,
>>
>> The metric is registered upon task deployment and reported periodically.
>>
>> Which Flink version are you using? The metric was added in 1.10.
>> Are you checking it in the UI?
>>
>> Regards,
>> Roman
>>
>> On Fri, Apr 9, 2021 at 8:50 PM Claude M  wrote:
>> >
>> > Hello,
>> >
>> > The documentation here
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html
>> states there is a isBackPressured metric available yet I don't see it.  Any
>> ideas why?
>> >
>> >
>> > Thanks
>>
>


Re: Writing to Avro from pyflink

2021-04-26 Thread Edward Yang
Hi Dian,

Thanks for trying it out, it ruled out a problem with the python code. I
double checked the jar path and only included the jar you referenced
without any luck. However, I tried creating a python 3.7 (had 3.8)
environment for pyflink and the code worked without any errors!


On Sun, Apr 25, 2021, 10:09 PM Dian Fu  wrote:

> Hi Eddie,
>
> I have tried your program with the following changes and it could execute
> successfully:
> - Replace `rf"
> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar”`
> with rf`"
> file:///Users/dianfu/code/src/apache/flink/flink-sql-avro-1.12.3.jar”`
> - Use flink-sql-avro-1.12.3.jar [1] instead of flink-sql-avro-1.12.2.jar
> as I encountered issue FLINK-21012 [2] which has been addressed in 1.12.3
>
> For your problem, I suspect if `
> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar` really exists. Could
> you double check that?
>
> [1]
> https://repository.apache.org/content/repositories/orgapacheflink-1419/org/apache/flink/flink-sql-avro/1.12.3/flink-sql-avro-1.12.3.jar
> [2] https://issues.apache.org/jira/browse/FLINK-21012
>
> Regards,
> Dian
>
> 2021年4月25日 下午11:56,Edward Yang  写道:
>
> Hi Dian,
>
> I tried your suggestion but had the same error message unfortunately. I
> also tried file:/ and file:// with the same error, not sure what's going
> on, I assume writing to avro works fine in java and scala?
>
> Eddie
>
> On Sat, Apr 24, 2021 at 10:03 PM Dian Fu  wrote:
>
>> I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar
>> . Could you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try
>> again?
>>
>> Regards,
>> Dian
>>
>> 2021年4月24日 上午8:29,Edward Yang  写道:
>>
>> I've been trying to write to the avro format with pyflink 1.12.2 on
>> ubuntu, I've tested my code with an iterator writing to csv and everything
>> works as expected. Reading through the flink documentation I see that I
>> should add jar dependencies to work with avro. I downloaded three jar files
>> that I believe are required for avro like so:
>>
>> table_env\
>> .get_config()\
>> .get_configuration()\
>> .set_string(
>> "pipeline.jars",
>> rf"
>> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar
>> "
>> )
>>
>> I suspect I'm not loading the jar files correctly, but it's unclear what
>> I'm supposed to do as I'm not familiar with java and when I switch the sink
>> format to avro I get some unexpected errors:
>>
>> Py4JJavaError: An error occurred while calling o746.executeInsert.
>> : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
>>  at 
>> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
>>  at 
>> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
>>  at 
>> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
>>  at 
>> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>  at scala.collection.TraversableLike$class.map(

RE: [1.9.2] Flink SSL on YARN - NoSuchFileException

2021-04-26 Thread Hailu, Andreas [Engineering]
Hey Nico, thanks for your reply. I gave this a try and unfortunately had no 
luck.

// ah

-Original Message-
From: Nico Kruber 
Sent: Wednesday, April 21, 2021 1:01 PM
To: user@flink.apache.org
Subject: Re: [1.9.2] Flink SSL on YARN - NoSuchFileException

Hi Andreas,
judging from [1], it should work if you refer to it via

security.ssl.rest.keystore: ./deploy-keys/rest.keystore
security.ssl.rest.truststore: ./deploy-keys/rest.truststore


Nico

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-KAFKA-KEYTAB-Kafkaconsumer-error-Kerberos-td37277.html

On Monday, 19 April 2021 16:45:25 CEST Hailu, Andreas [Engineering] wrote:
> Hi Flink team,
>
> I'm trying to configure a Flink on YARN with SSL enabled. I've
> followed the documentation's instruction  [1] to generate a Keystore
> and Truststore locally, and added a the properties to my flink-conf.yaml.
> security.ssl.rest.keystore: /home/user/ssl/deploy-keys/rest.keystore
> security.ssl.rest.truststore:
> /home/user/ssl/deploy-keys/rest.truststore
>
> I've also added the yarnship option so that the keystore and
> truststore are deployed as suggested in [1].
>
> -m yarn-cluster --class  [...] -yt /home/user/ssl/deploy-keys/
>
> However, starting the Flink cluster results in a NoSuchFileException,
> Caused by: java.nio.file.NoSuchFileException:
> /home/user/ssl/deploy-keys/rest.keystore at
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
> at
> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvide
> r.jav
> a:214) at java.nio.file.Files.newByteChannel(Files.java:361)
> at java.nio.file.Files.newByteChannel(Files.java:407)
> at
> java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider
> .java
> :384) at java.nio.file.Files.newInputStream(Files.java:152)
> at
> org.apache.flink.runtime.net.SSLUtils.getKeyManagerFactory(SSLUtils.ja
> va:26
> 6) at
> org.apache.flink.runtime.net.SSLUtils.createRestNettySSLContext(SSLUti
> ls.ja
> va:392) at
> org.apache.flink.runtime.net.SSLUtils.createRestNettySSLContext(SSLUti
> ls.ja
> va:365) at
> org.apache.flink.runtime.net.SSLUtils.createRestServerSSLEngineFactory
> (SSLU
> tils.java:163) at
> org.apache.flink.runtime.rest.RestServerEndpointConfiguration.fromConf
> igura
> tion(RestServerEndpointConfiguration.java:160)
>
> I'm able to see in launch_container.sh that the shipped directory was
> able to be created successfully:
>
> mkdir -p deploy-keys
> ln -sf
> "/fs/htmp/yarn/local/usercache/delp/appcache/application_1618711298408
> _2664 /filecache/16/rest.truststore" "deploy-keys/rest.truststore"
> mkdir -p deploy-keys ln -sf
> "/fs/htmp/yarn/local/usercache/delp/appcache/application_1618711298408
> _2664 /filecache/13/rest.keystore" "deploy-keys/rest.keystore"
>
> So given the above logs, I tried editing flink-conf.yaml to reflect
> what I
> saw: security.ssl.rest.keystore: deploy-keys/rest.keystore
> security.ssl.rest.truststore: deploy-keys/rest.truststore
>
> But that didn't seem to work, either:
> Caused by: java.nio.file.NoSuchFileException: deploy-keys/rest.truststore
> at
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
> at
> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvide
> r.jav
> a:214) at java.nio.file.Files.newByteChannel(Files.java:361)
> at java.nio.file.Files.newByteChannel(Files.java:407)
> at
> java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider
> .java
> :384) at java.nio.file.Files.newInputStream(Files.java:152)
> at
> org.apache.flink.runtime.net.SSLUtils.getTrustManagerFactory(SSLUtils.java:
> 233) at
> org.apache.flink.runtime.net.SSLUtils.createRestNettySSLContext(SSLUti
> ls.ja
> va:397) at
> org.apache.flink.runtime.net.SSLUtils.createRestNettySSLContext(SSLUti
> ls.ja
> va:365) at
> org.apache.flink.runtime.net.SSLUtils.createRestClientSSLEngineFactory
> (SSLU
> tils.java:181) at
> org.apache.flink.runtime.rest.RestClientConfiguration.fromConfiguratio
> n(Res
> tClientConfiguration.java:106)
>
> What needs to be done to get the YARN application to point to the
> right keystore and truststore?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/securi
> ty-ss l.html#tips-for-yarn--mesos-deployment
>
> 
>
> Andreas Hailu
> Data Lake Engineering | Goldman Sachs & Co.
>
>
> 
>
> Your Personal Data: We may collect and process information about you
> that may be subject to data protection laws. For more information
> about how we use and disclose your personal data, how we protect your
> information, our legal b

Re: Checkpoint error - "The job has failed"

2021-04-26 Thread Dan Hill
Hey Yun and Robert,

I'm using Flink v1.11.1.

Robert, I'll send you a separate email with the logs.

On Mon, Apr 26, 2021 at 12:46 AM Yun Tang  wrote:

> Hi Dan,
>
> I think you might use older version of Flink and this problem has been
> resolved by FLINK-16753 [1] after Flink-1.10.3.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-16753
>
> Best
> Yun Tang
> --
> *From:* Robert Metzger 
> *Sent:* Monday, April 26, 2021 14:46
> *To:* Dan Hill 
> *Cc:* user 
> *Subject:* Re: Checkpoint error - "The job has failed"
>
> Hi Dan,
>
> can you provide me with the JobManager logs to take a look as well? (This
> will also tell me which Flink version you are using)
>
>
>
> On Mon, Apr 26, 2021 at 7:20 AM Dan Hill  wrote:
>
> My Flink job failed to checkpoint with a "The job has failed" error.  The
> logs contained no other recent errors.  I keep hitting the error even if I
> cancel the jobs and restart them.  When I restarted my jobmanager and
> taskmanager, the error went away.
>
> What error am I hitting?  It looks like there is bad state that lives
> outside the scope of a job.
>
> How often do people restart their jobmanagers and taskmanager to deal with
> errors like this?
>
>


Re: Contradictory docs: python.files config can include not only python files

2021-04-26 Thread Yik San Chan
Hi Dian,

It is still not clear to me - does it only allow Python files (.py), or not?

Best,
Yik San

On Mon, Apr 26, 2021 at 9:15 PM Dian Fu  wrote:

> Hi Yik San,
>
> 1) what `--pyFiles` is used for:
> All the files specified via `--pyFiles` will be put in the PYTHONPATH of
> the Python worker during execution and then they will be available for the
> Python user-defined functions during execution.
>
> 2) validate for the files passed to `--pyFiles`
> Currently it will not validate the files passed to this argument. I also
> think that it’s not necessary and not able to perform such kind of check.
> Do you have any advice for this?
>
> Regards,
> Dian
>
> 2021年4月26日 下午8:45,Yik San Chan  写道:
>
> Hi community,
>
> In
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/python_config.html,
> regarding python.files:
>
> > Attach custom python files for job.
>
> This makes readers think only Python files are allowed here. However, in
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#submitting-pyflink-jobs
> :
>
> ./bin/flink run \
>   --python examples/python/table/batch/word_count.py \
>   --pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
>
> It is obviously including .txt file that is not Python files.
>
> I believe it is contradictory here. Can anyone confirm?
>
> Best,
> Yik San
>
>
>


Re: Confusing docs on python.archives

2021-04-26 Thread Yik San Chan
Hi Dian,

I wonder where can we specify the target directory?

Best,
Yik San

On Mon, Apr 26, 2021 at 9:19 PM Dian Fu  wrote:

> Hi Yik San,
>
> It should be a typo issue. I guess it should be `If the target directory
> name is specified, the archive file will be extracted to a directory with
> the specified name.`
>
> Regards,
> Dian
>
> 2021年4月26日 下午8:57,Yik San Chan  写道:
>
> Hi community,
>
> In
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/python_config.html#python-options
> ,
>
> > For each archive file, a target directory is specified. If the target
> directory name is specified, the archive file will be extracted to a name
> can directory with the specified name. Otherwise, the archive file will be
> extracted to a directory with the same name of the archive file.
>
> I don't get what does "the archive file will be extracted to a name can
> directory with the specified name" mean. Maybe there are typos?
>
> best,
> Yik San
>
>
>


Re: Flink missing Kafka records

2021-04-26 Thread Dan Hill
Hey Robert.

Nothing weird.  I was trying to find recent records (not the latest).  No
savepoints (just was running about ~1 day).  No checkpoint issues (all
successes).  I don't know how many are missing.

I removed the withIdleness. The other parts are very basic.  The text logs
look pretty useless.

On Mon, Apr 26, 2021 at 11:07 AM Robert Metzger  wrote:

> Hi Dan,
>
> Can you describe under which conditions you are missing records (after a
> machine failure, after a Kafka failure, after taking and restoring from a
> savepoint, ...).
> Are many records missing? Are "the first records" or the "latest records"
> missing? Any individual records missing, or larger blocks of data?
>
> I don't think that there's a bug in Flink or the Kafka connector. Maybe
> its just a configuration or systems design issue.
>
>
> On Sun, Apr 25, 2021 at 9:56 AM Dan Hill  wrote:
>
>> Hi!
>>
>> Have any other devs noticed issues with Flink missing Kafka records with
>> long-running Flink jobs?  When I re-run my Flink job and start from the
>> earliest Kafka offset, Flink processes the events correctly.  I'm using
>> Flink v1.11.1.
>>
>> I have a simple job that takes records (Requests) from Kafka and
>> serializes them to S3.  Pretty basic.  No related issues in the text logs.
>> I'm hoping I just have a configuration issue.  I'm guessing idleness is
>> working in a way that I'm not expecting.
>>
>> Any ideas?
>> - Dan
>>
>>
>> void createLogRequestJob(StreamExecutionEnvironment env) throws Exception
>> {
>>
>>   Properties kafkaSourceProperties =
>> getKafkaSourceProperties("logrequest");
>>
>>   SingleOutputStreamOperator rawRequestInput = env.addSource(
>>
>> new FlinkKafkaConsumer(getInputRequestTopic(),
>> getProtoDeserializationSchema(Request.class), kafkaSourceProperties))
>>
>>   .uid("source-request")
>>
>>   .name("Request")
>>
>>   .assignTimestampsAndWatermarks(
>>
>>
>> WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1)));
>>
>>
>>   executeLogRequest(rawRequestInput);
>>
>>   env.execute("log-request");
>>
>> }
>>
>>
>> void executeLogRequest(SingleOutputStreamOperator
>> rawRequestInput) {
>>
>>   AvroWriterFactory factory =
>> getAvroWriterFactory(Request.class);
>>
>>   rawRequestInput.addSink(StreamingFileSink
>>
>>   .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"),
>> factory)
>>
>>   .withBucketAssigner(new DateHourBucketAssigner(request ->
>> request.getTiming().getEventApiTimestamp()))
>>
>>   .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>
>>   .withOutputFileConfig(createOutputFileConfig())
>>
>>   .build())
>>
>> .uid("sink-s3-raw-request")
>>
>> .name("S3 Raw Request");
>>
>> }
>>
>>
>>
>>


Re: Contradictory docs: python.files config can include not only python files

2021-04-26 Thread Dian Fu
Hi Yik San,

All the files which could be put in the PYTHONPATH are allowed here, e.g. .zip, 
.whl, etc.

Regards,
Dian

> 2021年4月27日 上午8:16,Yik San Chan  写道:
> 
> Hi Dian,
> 
> It is still not clear to me - does it only allow Python files (.py), or not?
> 
> Best,
> Yik San
> 
> On Mon, Apr 26, 2021 at 9:15 PM Dian Fu  > wrote:
> Hi Yik San,
> 
> 1) what `--pyFiles` is used for:
> All the files specified via `--pyFiles` will be put in the PYTHONPATH of the 
> Python worker during execution and then they will be available for the Python 
> user-defined functions during execution. 
> 
> 2) validate for the files passed to `--pyFiles`
> Currently it will not validate the files passed to this argument. I also 
> think that it’s not necessary and not able to perform such kind of check. Do 
> you have any advice for this?
> 
> Regards,
> Dian
> 
>> 2021年4月26日 下午8:45,Yik San Chan > > 写道:
>> 
>> Hi community,
>> 
>> In 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/python_config.html
>>  
>> ,
>>  regarding python.files:
>> 
>> > Attach custom python files for job.
>> 
>> This makes readers think only Python files are allowed here. However, in 
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#submitting-pyflink-jobs
>>  
>> :
>> 
>> ./bin/flink run \
>>   --python examples/python/table/batch/word_count.py \
>>   --pyFiles file:///user.txt,hdfs:/// <>$namenode_address/username.txt
>> It is obviously including .txt file that is not Python files.
>> 
>> I believe it is contradictory here. Can anyone confirm?
>> 
>> Best,
>> Yik San
> 



Re: Confusing docs on python.archives

2021-04-26 Thread Dian Fu
There are multiple ways to specify the target directory depending on how to 
specify the python archives.
1) API: add_python_archive(“file:///path/to/py_env 
.zip", "myenv"), see [1] for more details, 
2) configuration: python.archives, e.g. file:///path/to/py_env.zip#myenv
3) command line arguments: -pyarch file:///path/to/py 
_env.zip#myenv

You can specify python archives via either of the above options and it will 
extract py_env.zip into directory myenv during execution.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html

> 2021年4月27日 上午8:17,Yik San Chan  写道:
> 
> Hi Dian,
> 
> I wonder where can we specify the target directory?
> 
> Best,
> Yik San
> 
> On Mon, Apr 26, 2021 at 9:19 PM Dian Fu  > wrote:
> Hi Yik San,
> 
> It should be a typo issue. I guess it should be `If the target directory name 
> is specified, the archive file will be extracted to a directory with the 
> specified name.`
> 
> Regards,
> Dian
> 
>> 2021年4月26日 下午8:57,Yik San Chan > > 写道:
>> 
>> Hi community,
>> 
>> In 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/python_config.html#python-options
>>  
>> ,
>> 
>> > For each archive file, a target directory is specified. If the target 
>> > directory name is specified, the archive file will be extracted to a name 
>> > can directory with the specified name. Otherwise, the archive file will be 
>> > extracted to a directory with the same name of the archive file.
>> 
>> I don't get what does "the archive file will be extracted to a name can 
>> directory with the specified name" mean. Maybe there are typos?
>> 
>> best,
>> Yik San
> 



Re: Confusing docs on python.archives

2021-04-26 Thread Dian Fu
For the command line arguments, it’s documented in 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html

> 2021年4月27日 上午10:19,Dian Fu  写道:
> 
> There are multiple ways to specify the target directory depending on how to 
> specify the python archives.
> 1) API: add_python_archive(“file:///path/to/py_env 
> .zip", "myenv"), see [1] for more details, 
> 2) configuration: python.archives, e.g. file:///path/to/py_env.zip#myenv 
> 
> 3) command line arguments: -pyarch file:///path/to/py 
> _env.zip#myenv
> 
> You can specify python archives via either of the above options and it will 
> extract py_env.zip into directory myenv during execution.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html
>  
> 
> 
>> 2021年4月27日 上午8:17,Yik San Chan > > 写道:
>> 
>> Hi Dian,
>> 
>> I wonder where can we specify the target directory?
>> 
>> Best,
>> Yik San
>> 
>> On Mon, Apr 26, 2021 at 9:19 PM Dian Fu > > wrote:
>> Hi Yik San,
>> 
>> It should be a typo issue. I guess it should be `If the target directory 
>> name is specified, the archive file will be extracted to a directory with 
>> the specified name.`
>> 
>> Regards,
>> Dian
>> 
>>> 2021年4月26日 下午8:57,Yik San Chan >> > 写道:
>>> 
>>> Hi community,
>>> 
>>> In 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/python_config.html#python-options
>>>  
>>> ,
>>> 
>>> > For each archive file, a target directory is specified. If the target 
>>> > directory name is specified, the archive file will be extracted to a name 
>>> > can directory with the specified name. Otherwise, the archive file will 
>>> > be extracted to a directory with the same name of the archive file.
>>> 
>>> I don't get what does "the archive file will be extracted to a name can 
>>> directory with the specified name" mean. Maybe there are typos?
>>> 
>>> best,
>>> Yik San
>> 
> 



Re: Contradictory docs: python.files config can include not only python files

2021-04-26 Thread Yik San Chan
Hi Dian,

If that's the case, shall we reword "Attach custom python files for job."
into "attach custom files that could be put in PYTHONPATH, e.g., .zip,
.whl, etc."

Best,
Yik San

On Tue, Apr 27, 2021 at 10:08 AM Dian Fu  wrote:

> Hi Yik San,
>
> All the files which could be put in the PYTHONPATH are allowed here, e.g.
> .zip, .whl, etc.
>
> Regards,
> Dian
>
> 2021年4月27日 上午8:16,Yik San Chan  写道:
>
> Hi Dian,
>
> It is still not clear to me - does it only allow Python files (.py), or
> not?
>
> Best,
> Yik San
>
> On Mon, Apr 26, 2021 at 9:15 PM Dian Fu  wrote:
>
>> Hi Yik San,
>>
>> 1) what `--pyFiles` is used for:
>> All the files specified via `--pyFiles` will be put in the PYTHONPATH of
>> the Python worker during execution and then they will be available for the
>> Python user-defined functions during execution.
>>
>> 2) validate for the files passed to `--pyFiles`
>> Currently it will not validate the files passed to this argument. I also
>> think that it’s not necessary and not able to perform such kind of check.
>> Do you have any advice for this?
>>
>> Regards,
>> Dian
>>
>> 2021年4月26日 下午8:45,Yik San Chan  写道:
>>
>> Hi community,
>>
>> In
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/python_config.html,
>> regarding python.files:
>>
>> > Attach custom python files for job.
>>
>> This makes readers think only Python files are allowed here. However, in
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#submitting-pyflink-jobs
>> :
>>
>> ./bin/flink run \
>>   --python examples/python/table/batch/word_count.py \
>>   --pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
>>
>> It is obviously including .txt file that is not Python files.
>>
>> I believe it is contradictory here. Can anyone confirm?
>>
>> Best,
>> Yik San
>>
>>
>>
>


Re: Deployment/Memory Configuration/Scalability

2021-04-26 Thread Yangze Guo
Hi, Radoslav,

> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> 2. Is it possible to have high availability with Per-Job mode? Or maybe I 
> should go with session mode and make sure that my flink cluster is running a 
> single job?

Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at
your configuration, you need to also enable the checkpoint[2], which
is automatically triggered and helps you to resume the program when
failure, by setting the execution.checkpointing.interval.

> 3. Let's assume that savepoints should be triggered only before job 
> update/deployment. How can I trigger a savepoint if my job is already 
> consuming more than 80% of the allowed memory per pod in k8s? My observations 
> show that k8s kills task managers (which are running as pods) and I need to 
> retry it a couple of times.

I think with the checkpoint, you no longer need to trigger the
savepoint manually with a specific condition as the checkpoint will be
periodically triggered.

> 4. Should I consider upgrading to version 1.12.3?
> 5. Should I consider switching off state.backend.rocksdb.memory.managed 
> property even in version 1.12.3?

I'm not an expert on the state backend, but it seems the fix of that
issue is only applied to the docker image. So I guess you can package
a custom image yourselves if you do not want to upgrade. However, if
you are using the Native K8S mode[3] and there is no compatibility
issue, I think it might be good to upgrading because there are also
lots of improvements[4] in 1.12.

> 6. How do I decide when the job parallelism should be increased? Are there 
> some metrics which can lead me to a clue that the parallelism should be 
> increased?

As there are 6 Kafka sources in your job, I think the parallelism
should first be fixed with the topic partition number. For metrics,
you could refer to the backpressure of tasks and
numRecordsOutPerSecond[5].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/zookeeper_ha/
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
[4] https://issues.apache.org/jira/browse/FLINK-17709
[5] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#io

Best,
Yangze Guo

On Mon, Apr 26, 2021 at 4:14 PM Radoslav Smilyanov
 wrote:
>
> Hi all,
>
> I am having multiple questions regarding Flink :) Let me give you some 
> background of what I have done so far.
>
> Description
> I am using Flink 1.11.2. My job is doing data enrichment. Data is consumed 
> from 6 different kafka topics and it is joined via multiple 
> CoProcessFunctions. On a daily basis the job is handling ~20 millions events 
> from the source kafka topics.
>
> Configuration
> These are the settings I am using:
>
> jobmanager.memory.process.size: 4096m
> jobmanager.memory.off-heap.size: 512m
> taskmanager.memory.process.size: 12000m
> taskmanager.memory.task.off-heap.size: 512m
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 5
> taskmanager.rpc.port: 6122
> jobmanager.execution.failover-strategy: region
> state.backend: rocksdb
> state.backend.incremental: true
> state.backend.rocksdb.localdir: /opt/flink/rocksdb
> state.backend.rocksdb.memory.managed: true
> state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
> state.backend.rocksdb.block.cache-size: 64mb
> state.checkpoints.dir: s3://bucket/checkpoints
> state.savepoints.dir: s3://bucket/savepoints
> s3.access-key: AWS_ACCESS_KEY_ID
> s3.secret-key: AWS_SECRET_ACCESS_KEY
> s3.endpoint: http://
> s3.path.style.access: true
> s3.entropy.key: _entropy_
> s3.entropy.length: 8
> presto.s3.socket-timeout: 10m
> client.timeout: 60min
>
> Deployment setup
> Flink is deployed in k8s with Per-Job mode having 1 job manager and 5 task 
> managers. I have a daily cron job which triggers savepoint in order to have a 
> fresh copy of the whole state.
>
> Problems with the existing setup
> 1. I observe that savepoints are causing Flink to consume more than the 
> allowed memory. I observe the behavior described in this stackoverflow post 
> (which seems to be solved in 1.12.X if I am getting it right).
> 2. I cannot achieve high availability with Per-Job mode and thus I ended up 
> having a regular savepoint on a daily basis.
>
> Questions
> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> 2. Is it possible to have high availability with Per-Job mode? Or maybe I 
> should go with session mode and make sure that my flink cluster is running a 
> single job?
> 3. Let's assume that savepoints should be triggered only before job 
> update/deployment. How can I trigger a savepoint if my job is already 
> consuming more than 80% of the allowed memory per pod in k8s? My observations 
> show that k8s kills task managers (which are running as pods) and I need to 
> retry it a couple of times.

Re: Contradictory docs: python.files config can include not only python files

2021-04-26 Thread Dian Fu
Thanks for the suggestion. It makes sense to me~. 

> 2021年4月27日 上午10:28,Yik San Chan  写道:
> 
> Hi Dian,
> 
> If that's the case, shall we reword "Attach custom python files for job." 
> into "attach custom files that could be put in PYTHONPATH, e.g., .zip, .whl, 
> etc."
> 
> Best,
> Yik San
> 
> On Tue, Apr 27, 2021 at 10:08 AM Dian Fu  > wrote:
> Hi Yik San,
> 
> All the files which could be put in the PYTHONPATH are allowed here, e.g. 
> .zip, .whl, etc.
> 
> Regards,
> Dian
> 
>> 2021年4月27日 上午8:16,Yik San Chan > > 写道:
>> 
>> Hi Dian,
>> 
>> It is still not clear to me - does it only allow Python files (.py), or not?
>> 
>> Best,
>> Yik San
>> 
>> On Mon, Apr 26, 2021 at 9:15 PM Dian Fu > > wrote:
>> Hi Yik San,
>> 
>> 1) what `--pyFiles` is used for:
>> All the files specified via `--pyFiles` will be put in the PYTHONPATH of the 
>> Python worker during execution and then they will be available for the 
>> Python user-defined functions during execution. 
>> 
>> 2) validate for the files passed to `--pyFiles`
>> Currently it will not validate the files passed to this argument. I also 
>> think that it’s not necessary and not able to perform such kind of check. Do 
>> you have any advice for this?
>> 
>> Regards,
>> Dian
>> 
>>> 2021年4月26日 下午8:45,Yik San Chan >> > 写道:
>>> 
>>> Hi community,
>>> 
>>> In 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/python_config.html
>>>  
>>> ,
>>>  regarding python.files:
>>> 
>>> > Attach custom python files for job.
>>> 
>>> This makes readers think only Python files are allowed here. However, in 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#submitting-pyflink-jobs
>>>  
>>> :
>>> 
>>> ./bin/flink run \
>>>   --python examples/python/table/batch/word_count.py \
>>>   --pyFiles file:///user.txt,hdfs:/// <>$namenode_address/username.txt
>>> It is obviously including .txt file that is not Python files.
>>> 
>>> I believe it is contradictory here. Can anyone confirm?
>>> 
>>> Best,
>>> Yik San
>> 
> 



Re: Deployment/Memory Configuration/Scalability

2021-04-26 Thread Radoslav Smilyanov
Hi Yangze Guo,

Thanks for your reply.

> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> > 2. Is it possible to have high availability with Per-Job mode? Or maybe
> I should go with session mode and make sure that my flink cluster is
> running a single job?
> Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at
> your configuration, you need to also enable the checkpoint[2], which
> is automatically triggered and helps you to resume the program when
> failure, by setting the execution.checkpointing.interval.


I forgot to add the checkpoint configuration since it's part of a custom
job configuration which is mounted in each pod. So checkpoints are enabled.
:)
That's why savepoint is triggered on a daily basis since the existing
deployment setup has a single Job Manager.
I will take a look at k8s or Zookeeper HA options.

> 3. Let's assume that savepoints should be triggered only before job
> update/deployment. How can I trigger a savepoint if my job is already
> consuming more than 80% of the allowed memory per pod in k8s? My
> observations show that k8s kills task managers (which are running as pods)
> and I need to retry it a couple of times.
> I think with the checkpoint, you no longer need to trigger the
> savepoint manually with a specific condition as the checkpoint will be
> periodically triggered.


Checkpoints are already enabled (once per every 10 minutes). Once HA is
setuped correctly I think that savepoints can be used only when the job
needs to be updated.

> 6. How do I decide when the job parallelism should be increased? Are
> there some metrics which can lead me to a clue that the parallelism should
> be increased?
> As there are 6 Kafka sources in your job, I think the parallelism
> should first be fixed with the topic partition number. For metrics,
> you could refer to the backpressure of tasks and
> numRecordsOutPerSecond[5].


Currently I am using parallelism which is equal to the highest number of
kafka topic partitions. Unfortunately some of the topics have higher load
compared to others and thus some of them are having 1 partition while
others are having 4 partitions (for example).

Thanks,
Rado

On Tue, Apr 27, 2021 at 7:50 AM Yangze Guo  wrote:

> Hi, Radoslav,
>
> > 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> > 2. Is it possible to have high availability with Per-Job mode? Or maybe
> I should go with session mode and make sure that my flink cluster is
> running a single job?
>
> Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at
> your configuration, you need to also enable the checkpoint[2], which
> is automatically triggered and helps you to resume the program when
> failure, by setting the execution.checkpointing.interval.
>
> > 3. Let's assume that savepoints should be triggered only before job
> update/deployment. How can I trigger a savepoint if my job is already
> consuming more than 80% of the allowed memory per pod in k8s? My
> observations show that k8s kills task managers (which are running as pods)
> and I need to retry it a couple of times.
>
> I think with the checkpoint, you no longer need to trigger the
> savepoint manually with a specific condition as the checkpoint will be
> periodically triggered.
>
> > 4. Should I consider upgrading to version 1.12.3?
> > 5. Should I consider switching off state.backend.rocksdb.memory.managed
> property even in version 1.12.3?
>
> I'm not an expert on the state backend, but it seems the fix of that
> issue is only applied to the docker image. So I guess you can package
> a custom image yourselves if you do not want to upgrade. However, if
> you are using the Native K8S mode[3] and there is no compatibility
> issue, I think it might be good to upgrading because there are also
> lots of improvements[4] in 1.12.
>
> > 6. How do I decide when the job parallelism should be increased? Are
> there some metrics which can lead me to a clue that the parallelism should
> be increased?
>
> As there are 6 Kafka sources in your job, I think the parallelism
> should first be fixed with the topic partition number. For metrics,
> you could refer to the backpressure of tasks and
> numRecordsOutPerSecond[5].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/zookeeper_ha/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
> [4] https://issues.apache.org/jira/browse/FLINK-17709
> [5]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#io
>
> Best,
> Yangze Guo
>
> On Mon, Apr 26, 2021 at 4:14 PM Radoslav Smilyanov
>  wrote:
> >
> > Hi all,
> >
> > I am having multiple questions regarding Flink :) Let me give you some
> background of what I have done so far.
> >
> > Description
> > I am using Flink 1.11.2. My job is doing data enrichment. Data

Re: Watermarks in Event Time Temporal Join

2021-04-26 Thread Leonard Xu
Hello, Maciej
> I agree the watermark should pass on versioned table side, because
> this is the only way to know which version of record should be used.
> But if we mimics behaviour of interval join then main stream watermark
> could be skipped.

IIRC, rowtime interval join requires the watermark on both sides, and the 
watermark 
will be used to clean up the outdated data and advance the data progress both 
in rowtime  interval join and rowtime temporal join.

Best,
Leonard