Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-12 Thread Arvid Heise
Hi Yun,

Thank you for starting the discussion. This will solve one of the
long-standing issues [1] that confuse users. I'm also a big fan of option
3. It is also a bit closer to Chandy-Lamport again.

A couple of comments:

1) You call the tasks that get the barriers injected leaf nodes, which
would make the sinks the root nodes. That is very similar to how graphs in
relational algebra are labeled. However, I got the feeling that in Flink,
we rather iterate from sources to sink, making the sources root nodes and
the sinks the leaf nodes. However, I have no clue how it's done in similar
cases, so please take that hint cautiously.
2) I'd make the algorithm to find the subtasks iterative and react in
CheckpointCoordinator. Let's assume that we inject the barrier at all root
subtasks (initially all sources). So in the iterative algorithm, whenever
root A finishes, it looks at all connected subtasks B if they have any
upstream task left. If not B becomes a new root. That would require to only
touch a part of the job graph, but would require some callback from
JobManager to CheckpointCoordinator.
2b) We also need to be careful for out-of-sync updates: if the root is
about to finish, we could send the barrier to it from
CheckpointCoordinator, but at the time it arrives, the subtask is finished
already.
3) An implied change is that checkpoints are not aborted anymore at
EndOfPartition,
which is good, but might be explicitly added.
4) The interaction between unaligned checkpoint and EndOfPartition is a bit
ambiguous: What happens when an unaligned checkpoint is started and then
one input channel contains the EndOfPartition event? From the written
description, it sounds to me like, we move back to an aligned checkpoint
for the whole receiving task. However, that is neither easily possible nor
necessary. Imho it would be enough to also store the EndOfPartition in the
channel state.
5) I'd expand the recovery section a bit. It would be the first time that
we recover an incomplete DAG. Afaik the subtasks are deployed before the
state is recovered, so at some point, the subtasks either need to be
removed again or maybe we could even avoid them being created in the first
place.

[1] https://issues.apache.org/jira/browse/FLINK-2491

On Fri, Oct 9, 2020 at 8:22 AM Yun Gao  wrote:

> Hi, devs & users
>
> Very sorry for the spoiled formats, I resent the discussion as follows.
>
>
> As discussed in FLIP-131[1], Flink will make DataStream the unified API for 
> processing bounded and unbounded data in both streaming and blocking modes. 
> However, one long-standing problem for the streaming mode is that currently 
> Flink does not s
> ​
> upport checkpoints after some tasks finished, which causes some problems for 
> bounded or mixed jobs:
> 1.
> Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed 
> before committed to external systems in streaming mode. If sources are 
> bounded and checkpoints are disabled after some tasks are finished, the data 
> sent after the last checkpoint would always not be able to be committed. This 
> issue has already been reported some times in the user ML[2][3][4] and is 
> future brought up when working on FLIP-143: Unified Sink API [5].
> 2.
> The jobs with both bounded and unbounded sources might have to replay a large 
> amount of records after failover due to no periodic checkpoints are taken 
> after the bounded sources finished.
>
>
> Therefore, we propose to also support checkpoints after some tasks finished. 
> Your Could find more details in FLIP-147[6].
>
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
> [3]
> https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
> [4]
> https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
> [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> [6]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>
> --Original Mail --
> *Sender:*Yun Gao 
> *Send Date:*Fri Oct 9 14:16:52 2020
> *Recipients:*Flink Dev , User-Flink <
> user@flink.apache.org>
> *Subject:*[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
>> Hi, devs & users
>>
>>
>> As discussed in FLIP-131 [1], Flink will make DataStream the unified API for 
>> processing bounded and unbounded data in both streaming and blocking modes. 
>> However, one long-standing problem for the streaming mode is that currently 
>> Flink does not support checkpoints after some tasks finished, which causes 
>> some problems for bounded or mixed jobs:
>>
>> Flink exactly-once sinks rely on checkpoints to ensure data won’t be 

TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-12 Thread DONG, Weike
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes
per-job mode: when the parallelism increases, the time it takes for the
TaskManagers to register with *JobManager *becomes abnormally long (for a
task with parallelism of 50, it could take 60 ~ 120 seconds or even longer
for the registration attempt), and usually more than 10 attempts are needed
to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots
with the default configuration, as the TaskManager would say:


> Registration at JobManager 
> (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2)
> attempt 9 timed out after 25600 ms

Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The
> slot 60d5277e138a94fb73fc6691557001e0 has timed out.

Free slot TaskSlot(index:0, state:ALLOCATED, resource profile:
> ResourceProfile{cpuCores=1., taskHeapMemory=1.425gb
> (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb
> (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)},
> allocationId: 60d5277e138a94fb73fc6691557001e0, jobId:
> 493cd86e389ccc8f2887e1222903b5ce).
> java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed
> out.


In order to cope with this issue, we have to change the below configuration
parameters:

>
> # Prevent "Could not allocate the required slot within slot request
> timeout. Please make sure that the cluster has enough resources. Stopping
> the JobMaster for job"
> slot.request.timeout: 50

# Increase max timeout in a single attempt
> cluster.registration.max-timeout: 30
> # Prevent "free slot (TaskSlot)"
> akka.ask.timeout: 10 min
> # Prevent "Heartbeat of TaskManager timed out."
> heartbeat.timeout: 50


However, we acknowledge that this is only a temporary dirty fix, which is
not what we want. It could be seen that during TaskManager registration to
JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP
> address as host name. Local input split assignment (such as for HDFS files)
> may be impacted.


Initially we thought this was probably the cause (reverse lookup of DNS
might take up a long time), however we later found that the reverse lookup
only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and
they seem to be perfectly normal, without any signs of pauses. And the
heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then
extra slow with TaskManager, so this is not because of a slow network
connection.

Here we wonder what could be the cause for the slow registration between
JobManager and TaskManager(s)? No other warning or error messages in the
log (DEBUG level) other than the "No hostname could be resolved" messages,
which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike


Re: Flink Kuberntes Libraries

2020-10-12 Thread superainbower
Hi Till,
Could u tell me how to configure HDFS as statebackend when I deploy flink on 
k8s?
I try to add the following to flink-conf.yaml


state.backend: rocksdb
state.checkpoints.dir: hdfs://slave2:8020/flink/checkpoints
state.savepoints.dir: hdfs://slave2:8020/flink/savepoints
state.backend.incremental: true


And add flink-shaded-hadoop2-2.8.3-1.8.3.jar to /opt/flink/lib


But It doesn’t work and I got this error logs


Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 'hdfs'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded. For a full list of supported file systems, please 
seehttps://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in the 
classpath, or some classes are missing from the classpath


Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.runtime.util.HadoopUtils
On 10/09/2020 22:13, Till Rohrmann wrote:
Hi Saksham,


if you want to extend the Flink Docker image you can find here more details 
[1]. 


If you want to include the library in your user jar, then you have to add the 
library as a dependency to your pom.xml file and enable the shade plugin for 
building an uber jar [2].


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#advanced-customization
[2] 
https://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html


Cheers,
Till


On Fri, Oct 9, 2020 at 3:22 PM saksham sapra  wrote:

Thanks Till for helping out,


The way you suggested, is it possible to copy libs which is in D directory to 
FLINK_HOME/libs. I tried to run a copy command : copy D:/data/libs to 
FLINK_HOME/libs and it gets copied but i dont how can i check where it gets 
copied and this libs is taken by flink?




Thanks,
Saksham Sapra


On Wed, Oct 7, 2020 at 9:40 PM Till Rohrmann  wrote:

HI Saksham,


the easiest approach would probably be to include the required libraries in 
your user code jar which you submit to the cluster. Using maven's shade plugin 
should help with this task. Alternatively, you could also create a custom Flink 
Docker image where you add the required libraries to the FLINK_HOME/libs 
directory. This would however mean that every job you submit to the Flink 
cluster would see these libraries in the system class path.


Cheers,
Till


On Wed, Oct 7, 2020 at 2:08 PM saksham sapra  wrote:

Hi ,


i have made some configuration using this link page 
:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html.
and i am able to run flink on UI , but i need to submit a job using : 
http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:webui/proxy/#/submit
 through POstman, and i have some libraries which in local i can add in libs 
folder but in this how can i add my libraries so that it works properly.





Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Arvid Heise
Hi Rinat,

Which API are you using? If you use datastream API, the common way to
simulate side inputs (which is what you need) is to use a broadcast. There
is an example on SO [1].

[1]
https://stackoverflow.com/questions/54667508/how-to-unit-test-broadcastprocessfunction-in-flink-when-processelement-depends-o

On Sat, Oct 10, 2020 at 7:12 PM Sharipov, Rinat 
wrote:

> Hi mates !
>
> I'm in the beginning of the road of building a recommendation pipeline on
> top of Flink.
> I'm going to register a list of UDF python functions on job
> startups where each UDF is an ML model.
>
> Over time new model versions appear in the ML registry and I would like to
> update my UDF functions on the fly without need to restart the whole job.
> Could you tell me, whether it's possible or not ? Maybe the community can
> give advice on how such tasks can be solved using Flink and what other
> approaches exist.
>
> Thanks a lot for your help and advice !
>
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Additional options to S3 Filesystem: Interest?

2020-10-12 Thread Arvid Heise
Hi Padarn,

sounds like a good addition to me. We could wait for more feedback or you
could start immedately.

The next step would be to create a JIRA and get it assigned to you.

Looking forward to your contribution

Arvid

On Sun, Oct 11, 2020 at 7:45 AM Padarn Wilson  wrote:

> Hi Flink Users,
>
> We need to expose some additional options for the s3 hadoop filesystem:
> Specifically, we want to set object tagging and lifecycle. This would be a
> fairly easy change and we initially thought to create a new Filsystem with
> very minor changes to allow this.
>
> However then I wondered, would others use this? If it something that is
> worth raising as a Flink issue and then contributing back upstream.
>
> Any others who would like to be able to set object tags for the s3
> filesystem?
>
> Cheers,
> Padarn
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-12 Thread DONG, Weike
Hi community,

I have uploaded the log files of JobManager and TaskManager-1-1 (one of the
50 TaskManagers) with DEBUG log level and default Flink configuration, and
it clearly shows that TaskManager failed to register with JobManager after
10 attempts.

Here is the link:

JobManager:
https://gist.github.com/kylemeow/740c470d9b5a1ab3552376193920adce

TaskManager-1-1:
https://gist.github.com/kylemeow/41b9a8fe91975875c40afaf58276c2fe

Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike  wrote:

> Hi community,
>
> Recently we have noticed a strange behavior for Flink jobs on Kubernetes
> per-job mode: when the parallelism increases, the time it takes for the
> TaskManagers to register with *JobManager *becomes abnormally long (for a
> task with parallelism of 50, it could take 60 ~ 120 seconds or even longer
> for the registration attempt), and usually more than 10 attempts are needed
> to finish this registration.
>
> Because of this, we could not submit a job requiring more than 20 slots
> with the default configuration, as the TaskManager would say:
>
>
>> Registration at JobManager 
>> (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2)
>> attempt 9 timed out after 25600 ms
>
> Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The
>> slot 60d5277e138a94fb73fc6691557001e0 has timed out.
>
> Free slot TaskSlot(index:0, state:ALLOCATED, resource profile:
>> ResourceProfile{cpuCores=1., taskHeapMemory=1.425gb
>> (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb
>> (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)},
>> allocationId: 60d5277e138a94fb73fc6691557001e0, jobId:
>> 493cd86e389ccc8f2887e1222903b5ce).
>> java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed
>> out.
>
>
> In order to cope with this issue, we have to change the below
> configuration parameters:
>
>>
>> # Prevent "Could not allocate the required slot within slot request
>> timeout. Please make sure that the cluster has enough resources. Stopping
>> the JobMaster for job"
>> slot.request.timeout: 50
>
> # Increase max timeout in a single attempt
>> cluster.registration.max-timeout: 30
>> # Prevent "free slot (TaskSlot)"
>> akka.ask.timeout: 10 min
>> # Prevent "Heartbeat of TaskManager timed out."
>> heartbeat.timeout: 50
>
>
> However, we acknowledge that this is only a temporary dirty fix, which is
> not what we want. It could be seen that during TaskManager registration to
> JobManager, lots of warning messages come out in logs:
>
> No hostname could be resolved for the IP address 9.166.0.118, using IP
>> address as host name. Local input split assignment (such as for HDFS files)
>> may be impacted.
>
>
> Initially we thought this was probably the cause (reverse lookup of DNS
> might take up a long time), however we later found that the reverse lookup
> only took less than 1ms, so maybe not because of this.
>
> Also, we have checked the GC log of both TaskManagers and JobManager, and
> they seem to be perfectly normal, without any signs of pauses. And the
> heartbeats are processed as normal according to the logs.
>
> Moreover, TaskManagers register quickly with ResourceManager, but then
> extra slow with TaskManager, so this is not because of a slow network
> connection.
>
> Here we wonder what could be the cause for the slow registration between
> JobManager and TaskManager(s)? No other warning or error messages in the
> log (DEBUG level) other than the "No hostname could be resolved" messages,
> which is quite weird.
>
> Thanks for the reading, and hope to get some insights into this issues : )
>
> Sincerely,
> Weike
>
>
>


Re: Flink Kuberntes Libraries

2020-10-12 Thread Till Rohrmann
Hi Superainbower,

could you share the complete logs with us? They contain which Flink version
you are using and also the classpath you are starting the JVM with. Have
you tried whether the same problem occurs with the latest Flink version?

Cheers,
Till

On Mon, Oct 12, 2020 at 10:32 AM superainbower 
wrote:

> Hi Till,
> Could u tell me how to configure HDFS as statebackend when I deploy flink
> on k8s?
> I try to add the following to flink-conf.yaml
>
> state.backend: rocksdb
> state.checkpoints.dir: hdfs://slave2:8020/flink/checkpoints
> state.savepoints.dir: hdfs://slave2:8020/flink/savepoints
> state.backend.incremental: true
>
> And add flink-shaded-hadoop2-2.8.3-1.8.3.jar to /opt/flink/lib
>
> But It doesn’t work and I got this error logs
>
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded. For a full list of supported file systems,
> please see
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
>
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in
> the classpath, or some classes are missing from the classpath
>
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.flink.runtime.util.HadoopUtils
> On 10/09/2020 22:13, Till Rohrmann  wrote:
>
> Hi Saksham,
>
> if you want to extend the Flink Docker image you can find here more
> details [1].
>
> If you want to include the library in your user jar, then you have to add
> the library as a dependency to your pom.xml file and enable the shade
> plugin for building an uber jar [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#advanced-customization
> [2]
> https://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html
>
> Cheers,
> Till
>
> On Fri, Oct 9, 2020 at 3:22 PM saksham sapra 
> wrote:
>
>> Thanks Till for helping out,
>>
>> The way you suggested, is it possible to copy libs which is in D
>> directory to FLINK_HOME/libs. I tried to run a copy command : copy
>> D:/data/libs to FLINK_HOME/libs and it gets copied but i dont how can i
>> check where it gets copied and this libs is taken by flink?
>>
>>
>> Thanks,
>> Saksham Sapra
>>
>> On Wed, Oct 7, 2020 at 9:40 PM Till Rohrmann 
>> wrote:
>>
>>> HI Saksham,
>>>
>>> the easiest approach would probably be to include the required libraries
>>> in your user code jar which you submit to the cluster. Using maven's shade
>>> plugin should help with this task. Alternatively, you could also create a
>>> custom Flink Docker image where you add the required libraries to the
>>> FLINK_HOME/libs directory. This would however mean that every job you
>>> submit to the Flink cluster would see these libraries in the system class
>>> path.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Oct 7, 2020 at 2:08 PM saksham sapra 
>>> wrote:
>>>
 Hi ,

 i have made some configuration using this link page :
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html
 .
 and i am able to run flink on UI , but i need to submit a job using :
 http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:webui/proxy/#/submit
 through POstman, and i have some libraries which in local i can add in libs
 folder but in this how can i add my libraries so that it works properly.

 [image: image.png]

>>>


Re: restoring from externalized incremental rocksdb checkpoint?

2020-10-12 Thread Congxian Qiu
Hi  Jeff
   Sorry for the late reply.  You can only restore the checkpoint in which
there is a _metadata in the chk-xxx directory, if there is not _metadata in
the chk-xxx directory, that means the chk-xxx is not complete, you can't
restore from it.

Best,
Congxian


Jeffrey Martin  于2020年9月15日周二 下午2:18写道:

> Thanks for the quick reply Congxian.
>
> The non-empty chk-N directories I looked at contained only files whose
> names are UUIDs. Nothing named _metadata (unless HDFS hides files that
> start with an underscore?).
>
> Just to be clear though -- I should expect a metadata file when using
> incremental checkpoints?
>
> On Mon, Sep 14, 2020 at 10:46 PM Congxian Qiu 
> wrote:
>
>> Hi Jeff
>>You can restore from retained checkpoint such as[1] `bin/flink run -s
>> :checkpointMetaDataPath [:runArgs]` ,  you may find the metadata in the
>> `chk-xxx` directory[2]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#directory-structure
>> Best,
>> Congxian
>>
>>
>> Jeffrey Martin  于2020年9月15日周二 下午1:30写道:
>>
>>> Hi,
>>>
>>> My job on Flink 1.10 uses RocksDB with incremental checkpointing
>>> enabled. The checkpoints are retained on cancellation.
>>>
>>> How do I resume from the retained checkpoint after cancellation (e.g.,
>>> when upgrading the job binary)? Docs say to use the checkpoint or savepoint
>>> metadata file, but AFAICT there's no metadata file in HDFS in the various
>>> directories under "$checkpointsDir/snapshots/$jobID",
>>>
>>> Thanks,
>>>
>>> Jeff Martin
>>>
>>>
>>>
>>
>>
>
>


Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Sharipov, Rinat
Hi Arvid, thx for your reply.

We are already using the approach with control streams to propagate
business rules through our data-pipeline.

Because all our models are powered by Python, I'm going to use Table API
and register UDF functions, where each UDF is a separate model.

So my question is - can I update the UDF function on the fly without a job
restart ?
Because new model versions become available on a daily basis and we should
use them as soon as possible.

Thx !




пн, 12 окт. 2020 г. в 11:32, Arvid Heise :

> Hi Rinat,
>
> Which API are you using? If you use datastream API, the common way to
> simulate side inputs (which is what you need) is to use a broadcast. There
> is an example on SO [1].
>
> [1]
> https://stackoverflow.com/questions/54667508/how-to-unit-test-broadcastprocessfunction-in-flink-when-processelement-depends-o
>
> On Sat, Oct 10, 2020 at 7:12 PM Sharipov, Rinat 
> wrote:
>
>> Hi mates !
>>
>> I'm in the beginning of the road of building a recommendation pipeline on
>> top of Flink.
>> I'm going to register a list of UDF python functions on job
>> startups where each UDF is an ML model.
>>
>> Over time new model versions appear in the ML registry and I would like
>> to update my UDF functions on the fly without need to restart the whole job.
>> Could you tell me, whether it's possible or not ? Maybe the community can
>> give advice on how such tasks can be solved using Flink and what other
>> approaches exist.
>>
>> Thanks a lot for your help and advice !
>>
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-12 Thread Timo Walther

Hi Austin,

your explanation for the KeyedProcessFunction implementation sounds good 
to me. Using the time and state primitives for this task will make the 
implementation more explicit but also more readable.


Let me know if you could solve your use case.

Regards,
Timo


On 09.10.20 17:27, Austin Cawley-Edwards wrote:

Hey Timo,

Hah, that's a fair point about using time. I guess I should update my 
statement to "as a user, I don't want to worry about /manually managing/ 
time".


That's a nice suggestion with the KeyedProcessFunction and no windows, 
I'll give that a shot. If I don't want to emit any duplicates, I'd have 
to essentially buffer the "last seen duplicate" for each key in that 
process function until the MAX_WATERMARK is sent through though, right? 
I could emit early results if I assume the max number of possible 
duplicates, but for records with no duplicates, I'd have to wait until 
no more records are coming -- am I missing something?


Thanks so much,
Austin

On Fri, Oct 9, 2020 at 10:44 AM Timo Walther > wrote:


Hi Austin,

if you don't want to worry about time at all, you should probably not
use any windows because those are a time-based operation.

A solution that would look a bit nicer could be to use a pure
KeyedProcessFunction and implement the deduplication logic without
reusing windows. In ProcessFunctions you can register an event-time
timer. The timer would be triggered by the MAX_WATERMARK when the
pipeline shuts down even without having a timestamp assigned in the
StreamRecord. Watermark will leave SQL also without a time attribute as
far as I know.

Regards,
Timo


On 08.10.20 17:38, Austin Cawley-Edwards wrote:
 > Hey Timo,
 >
 > Sorry for the delayed reply. I'm using the Blink planner and using
 > non-time-based joins. I've got an example repo here that shows my
query/
 > setup [1]. It's got the manual timestamp assignment commented out
for
 > now, but that does indeed solve the issue.
 >
 > I'd really like to not worry about time at all in this job hah -- I
 > started just using processing time, but Till pointed out that
processing
 > time timers won't be fired when input ends, which is the case for
this
 > streaming job processing CSV files, so I should be using event time.
 > With that suggestion, I switched to ingestion time, where I then
 > discovered the issue converting from SQL to data stream.
 >
 > IMO, as a user manually assigning timestamps on conversion makes
sense
 > if you're using event time and already handling time attributes
 > yourself, but for ingestion time you really don't want to think
about
 > time at all, which is why it might make sense to propigate the
 > automatically assigned timestamps in that case. Though not sure how
 > difficult that would be. Let me know what you think!
 >
 >
 > Best + thanks again,
 > Austin
 >
 > [1]: https://github.com/austince/flink-1.10-sql-windowing-error
 >
 > On Mon, Oct 5, 2020 at 4:24 AM Timo Walther mailto:twal...@apache.org>
 > >> wrote:
 >
 >     Btw which planner are you using?
 >
 >     Regards,
 >     Timo
 >
 >     On 05.10.20 10:23, Timo Walther wrote:
 >      > Hi Austin,
 >      >
 >      > could you share some details of your SQL query with us? The
 >     reason why
 >      > I'm asking is because I guess that the rowtime field is
not inserted
 >      > into the `StreamRecord` of DataStream API. The rowtime
field is only
 >      > inserted if there is a single field in the output of the query
 >     that is a
 >      > valid "time attribute".
 >      >
 >      > Esp. after non-time-based joins and aggregations, time
attributes
 >     loose
 >      > there properties and become regular timestamps. Because
timestamp
 >     and
 >      > watermarks might have diverged.
 >      >
 >      > If you know what you're doing, you can also assign the
timestamp
 >      > manually after
`toRetractStream.assignTimestampAndWatermarks` and
 >      > reinsert the field into the stream record. But before you
do that, I
 >      > think it is better to share more information about the
query with us.
 >      >
 >      > I hope this helps.
 >      >
 >      > Regards,
 >      > Timo
 >      >
 >      >
 >      >
 >      > On 05.10.20 09:25, Till Rohrmann wrote:
 >      >> Hi Austin,
 >      >>
 >      >> thanks for offering to help. First I would suggest asking
Timo
 >     whether
 >      >> this is an aspect which is still missing or whether we
 >     overlooked it.
 >      >> Based on that we can then take the next steps.
 >      >>
 > 

Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Dian Fu
Hi Rinat,

Do you want to replace the UDFs with new ones on the fly or just want to update 
the model which could be seen as instance variables inside the UDF?

For the former case, it's not supported AFAIK.
For the latter case, I think you could just update the model in the UDF 
periodically or according to some custom strategy. It's the behavior of the UDF.

Regards,
Dian

> 在 2020年10月12日,下午5:51,Sharipov, Rinat  写道:
> 
> Hi Arvid, thx for your reply.
> 
> We are already using the approach with control streams to propagate business 
> rules through our data-pipeline.
> 
> Because all our models are powered by Python, I'm going to use Table API and 
> register UDF functions, where each UDF is a separate model.
> 
> So my question is - can I update the UDF function on the fly without a job 
> restart ? 
> Because new model versions become available on a daily basis and we should 
> use them as soon as possible.
> 
> Thx !
> 
> 
> 
> 
> пн, 12 окт. 2020 г. в 11:32, Arvid Heise  >:
> Hi Rinat,
> 
> Which API are you using? If you use datastream API, the common way to 
> simulate side inputs (which is what you need) is to use a broadcast. There is 
> an example on SO [1].
> 
> [1] 
> https://stackoverflow.com/questions/54667508/how-to-unit-test-broadcastprocessfunction-in-flink-when-processelement-depends-o
>  
> 
> On Sat, Oct 10, 2020 at 7:12 PM Sharipov, Rinat  > wrote:
> Hi mates !
> 
> I'm in the beginning of the road of building a recommendation pipeline on top 
> of Flink.
> I'm going to register a list of UDF python functions on job startups where 
> each UDF is an ML model.
> 
> Over time new model versions appear in the ML registry and I would like to 
> update my UDF functions on the fly without need to restart the whole job.
> Could you tell me, whether it's possible or not ? Maybe the community can 
> give advice on how such tasks can be solved using Flink and what other 
> approaches exist.
> 
> Thanks a lot for your help and advice !
> 
> 
> 
> 
> -- 
> Arvid Heise | Senior Java Developer
>  
> Follow us @VervericaData
> --
> Join Flink Forward  - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji 
> (Toni) Cheng



回复: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread 大森林
Thanks for your replies.
When I use no state-relevant code in my program,the checkingpoint can be saved 
and resumed.❶


So then why we need Keyed State/Operator State/Stateful Function?❷
"the operators are reset to the time of the respective checkpoint."
We already have met the requirement:"resume from checkpoint(last state of each 
operator which store the result)"❶,
why we still need ❷?
Thanks for your help~!






-- 原始邮件 --
发件人:
"Arvid Heise"   
 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#definitions


On Wed, Oct 7, 2020 at 6:51 AM 大森林 

Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Sharipov, Rinat
Hi Dian, thx for your reply !

I was wondering to replace UDF on the fly from Flink, of course I'm pretty
sure that it's possible to implement update logic directly in Python, thx
for idea

Regards,
Rinat

пн, 12 окт. 2020 г. в 14:20, Dian Fu :

> Hi Rinat,
>
> Do you want to replace the UDFs with new ones on the fly or just want to
> update the model which could be seen as instance variables inside the UDF?
>
> For the former case, it's not supported AFAIK.
> For the latter case, I think you could just update the model in the UDF
> periodically or according to some custom strategy. It's the behavior of the
> UDF.
>
> Regards,
> Dian
>
> 在 2020年10月12日,下午5:51,Sharipov, Rinat  写道:
>
> Hi Arvid, thx for your reply.
>
> We are already using the approach with control streams to propagate
> business rules through our data-pipeline.
>
> Because all our models are powered by Python, I'm going to use Table API
> and register UDF functions, where each UDF is a separate model.
>
> So my question is - can I update the UDF function on the fly without a job
> restart ?
> Because new model versions become available on a daily basis and we should
> use them as soon as possible.
>
> Thx !
>
>
>
>
> пн, 12 окт. 2020 г. в 11:32, Arvid Heise :
>
>> Hi Rinat,
>>
>> Which API are you using? If you use datastream API, the common way to
>> simulate side inputs (which is what you need) is to use a broadcast. There
>> is an example on SO [1].
>>
>> [1]
>> https://stackoverflow.com/questions/54667508/how-to-unit-test-broadcastprocessfunction-in-flink-when-processelement-depends-o
>>
>> On Sat, Oct 10, 2020 at 7:12 PM Sharipov, Rinat 
>> wrote:
>>
>>> Hi mates !
>>>
>>> I'm in the beginning of the road of building a recommendation pipeline
>>> on top of Flink.
>>> I'm going to register a list of UDF python functions on job
>>> startups where each UDF is an ML model.
>>>
>>> Over time new model versions appear in the ML registry and I would like
>>> to update my UDF functions on the fly without need to restart the whole job.
>>> Could you tell me, whether it's possible or not ? Maybe the community
>>> can give advice on how such tasks can be solved using Flink and what other
>>> approaches exist.
>>>
>>> Thanks a lot for your help and advice !
>>>
>>>
>>>
>>
>> --
>> Arvid Heise | Senior Java Developer
>> 
>>
>> Follow us @VervericaData
>> --
>> Join Flink Forward  - The Apache Flink
>> Conference
>> Stream Processing | Event Driven | Real Time
>> --
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>


[DISCUSS] Remove flink-connector-filesystem module.

2020-10-12 Thread Kostas Kloudas
Hi all,

As the title suggests, this thread is to discuss the removal of the
flink-connector-filesystem module which contains (only) the deprecated
BucketingSink. The BucketingSin is deprecated since FLINK 1.9 [1] in
favor of the relatively recently introduced StreamingFileSink.

For the sake of a clean and more manageable codebase, I propose to
remove this module for release-1.12, but of course we should see first
if there are any usecases that depend on it.

Let's have a fruitful discussion.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-13396


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-12 Thread Chesnay Schepler

Are older versions of the module compatible with 1.12+?

On 10/12/2020 4:30 PM, Kostas Kloudas wrote:

Hi all,

As the title suggests, this thread is to discuss the removal of the
flink-connector-filesystem module which contains (only) the deprecated
BucketingSink. The BucketingSin is deprecated since FLINK 1.9 [1] in
favor of the relatively recently introduced StreamingFileSink.

For the sake of a clean and more manageable codebase, I propose to
remove this module for release-1.12, but of course we should see first
if there are any usecases that depend on it.

Let's have a fruitful discussion.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-13396





Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-12 Thread Kostas Kloudas
Hi Chesnay,

Unfortunately not from what I can see in the code.
This is the reason why I am opening a discussion. I think that if we
supported backwards compatibility, this would have been an easier
process.

Kostas

On Mon, Oct 12, 2020 at 4:32 PM Chesnay Schepler  wrote:
>
> Are older versions of the module compatible with 1.12+?
>
> On 10/12/2020 4:30 PM, Kostas Kloudas wrote:
> > Hi all,
> >
> > As the title suggests, this thread is to discuss the removal of the
> > flink-connector-filesystem module which contains (only) the deprecated
> > BucketingSink. The BucketingSin is deprecated since FLINK 1.9 [1] in
> > favor of the relatively recently introduced StreamingFileSink.
> >
> > For the sake of a clean and more manageable codebase, I propose to
> > remove this module for release-1.12, but of course we should see first
> > if there are any usecases that depend on it.
> >
> > Let's have a fruitful discussion.
> >
> > Cheers,
> > Kostas
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-13396
> >
>


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-12 Thread Yun Gao
Hi Arvid,
Very thanks for the insightful comments! I added the responses for this issue 
under the quota: 
>> 1) You call the tasks that get the barriers injected leaf nodes, which would 
>> make the > sinks the root nodes. That is very similar to how graphs in 
>> relational algebra are labeled. However, I got the feeling that in Flink, we 
>> rather iterate from sources to sink, making the sources root nodes and the 
>> sinks the leaf nodes. However, I have no clue how it's done in similar 
>> cases, so please take that hint cautiously.
>> 2) I'd make the algorithm to find the subtasks iterative and react in 
>> CheckpointCoordinator. Let's assume that we inject the barrier at all root 
>> subtasks (initially all sources). So in the iterative algorithm, whenever 
>> root A finishes, it looks at all connected subtasks B if they have any 
>> upstream task left. If not B becomes a new root. That would require to only 
>> touch a part of the job graph, but would require some callback from 
>> JobManager to CheckpointCoordinator.

I think I should have used a bad name of "leaf nodes", in fact I think we 
should have the same thoughts that we start with the source nodes to find all 
the nodes whose precedent nodes are all finished. It would be much better to 
call these nodes (which we would trigger) as "root nodes". I'll modify the FLIP 
to change the names to "root nodes".
>> 2b) We also need to be careful for out-of-sync updates: if the root is about 
>> to finish, we could send the barrier to it from CheckpointCoordinator, but 
>> at the time it arrives, the subtask is finished already.
Exactly. When the checkpoint triggers a task but found the task is not there, 
it may then further check if the task has been finished, if so, it should then 
re-check its descendants to see if there are new "root nodes" to trigger.
>> 3) An implied change is that checkpoints are not aborted anymore at 
>> EndOfPartition, which is good, but might be explicitly added.
Yes, currently barrier alignment would fail the current checkpoint on 
EndOfPartition, and we would modify the behavior.
>> 4) The interaction between unaligned checkpoint and EndOfPartition is a bit 
>> ambiguous: What happens when an unaligned checkpoint is started and then one 
>> input channel contains the EndOfPartition event? From the written 
>> description, it sounds to me like, we move back to an aligned checkpoint for 
>> the whole receiving task. However, that is neither easily possible nor 
>> necessary. Imho it would be enough to also store the EndOfPartition in the 
>> channel state.

Very thanks for the suggestions on this issue and in fact I did stuck on it for 
some time. Previously for me one implementation detail issue is that 
EndOfPartition seems not be able to overtake the previous buffers easily as 
CheckpointBarrier does, otherwise it might start destroying the input channels 
if all EndOfPartitions are received.
Therefore, although we could also persistent the channels with EndOfPartition:
1. Start persisting the channels when CheckpointUnaligner received barrier (if 
not all precendant tasks are finished) or received triggering (if all 
precendant tasks are finished).
2. The persisting actually stops when onBuffer received EndOfPartition.
After the last channel stopped persisting, CheckpointUnaligner still need to 
wait till all the previous buffers are processed before complete the 
allBarriersReceivedFuture. Therefore it would not be able to accelerate the 
checkpoint in this case.
After some rethinking today currently I think we might inserts some additional 
virtual events into receivedBuffer when received EndOfPartition and allows 
these virtual events to overtake the previous buffers. I'll try to double check 
if it is feasible and let me know if there are also other solutions on this 
issue :). 
> 5) I'd expand the recovery section a bit. It would be the first time that we 
> recover an incomplete DAG. Afaik the subtasks are deployed before the state 
> is recovered, so at some point, the subtasks either need to be removed again 
> or maybe we could even avoid them being created in the first place.
I also agree that finally we should not "restarted" the finished tasks in some 
way. It seems not start it in the first place would be better. We should be 
able to bookkeep additional information in the checkpoint meta about which 
operators are fully finished, and the scheduler could restore the status of 
tasks on restoring from previous checkpoints. It would also requires some 
modification in the task side to support input channels that are finished on 
starting.
But in the first version, I think we might simplify this issue by still restart 
all the tasks, but let the finished sources to exit directly? The new Source 
API would terminate directly since there is no pending splits and the legacy 
sources would be dealt specially by skipped execution if the source operator is 
fully finished before. We would be able to turn to the 

PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

2020-10-12 Thread Sharipov, Rinat
Hi mates !

I'm very new at pyflink and trying to register a custom UDF function using
python API.
Currently I faced an issue in both server env and my local IDE environment.

When I'm trying to execute the example below I got an error message: *The
configured Task Off-Heap Memory 0 bytes is less than the least required
Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
using the configuration key 'taskmanager.memory.task.off-heap.size*

Of course I've added required property into *flink-conf.yaml *and checked
that *pyflink-shell.sh *initializes env using specified configuration but
it doesn't make any sense and I still have an error.

I've also attached my flink-conf.yaml file

Thx for your help !

*Here is an example:*

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def test_udf(i):
return i


if __name__ == "__main__":
env = ExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

bt_env = BatchTableEnvironment.create(env)
bt_env.register_function("test_udf", test_udf)

my_table = bt_env.from_elements(
[
("user-1", "http://url/1";),
("user-2", "http://url/2";),
("user-1", "http://url/3";),
("user-3", "http://url/4";),
("user-1", "http://url/3";)
],
[
"uid", "url"
]
)

my_table_grouped_by_uid = my_table.group_by("uid").select("uid,
collect(url) as urls")
bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)

bt_env.execute_sql("select test_udf(uid) as uid, urls from
my_temp_table").print()


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-12 Thread Chesnay Schepler
Is there a way for us to change the module (in a reasonable way) that 
would allow users to continue using it?

Is it an API problem, or one of semantics?

On 10/12/2020 4:57 PM, Kostas Kloudas wrote:

Hi Chesnay,

Unfortunately not from what I can see in the code.
This is the reason why I am opening a discussion. I think that if we
supported backwards compatibility, this would have been an easier
process.

Kostas

On Mon, Oct 12, 2020 at 4:32 PM Chesnay Schepler  wrote:

Are older versions of the module compatible with 1.12+?

On 10/12/2020 4:30 PM, Kostas Kloudas wrote:

Hi all,

As the title suggests, this thread is to discuss the removal of the
flink-connector-filesystem module which contains (only) the deprecated
BucketingSink. The BucketingSin is deprecated since FLINK 1.9 [1] in
favor of the relatively recently introduced StreamingFileSink.

For the sake of a clean and more manageable codebase, I propose to
remove this module for release-1.12, but of course we should see first
if there are any usecases that depend on it.

Let's have a fruitful discussion.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-13396





Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-12 Thread Arvid Heise
Hi Yun,

4) Yes, the interaction is not trivial and also I have not completely
thought it through. But in general, I'm currently at the point where I
think that we also need non-checkpoint related events in unaligned
checkpoints. So just keep that in mind, that we might converge anyhow at
this point.

In general, what is helping in this case is to remember that there no
unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we
can completely ignore the problem on how to store and restore output
buffers of a completed task (also important for the next point).

5) I think we are on the same page and I completely agree that for the
MVP/first version, it's completely fine to start and immediately stop. A
tad better would be even to not even start the procession loop.

On Mon, Oct 12, 2020 at 6:18 PM Yun Gao  wrote:

> Hi Arvid,
>
> Very thanks for the insightful comments! I added the responses for this
> issue under the quota:
>
> >> 1) You call the tasks that get the barriers injected leaf nodes, which
> would make the > sinks the root nodes. That is very similar to how graphs
> in relational algebra are labeled. However, I got the feeling that in
> Flink, we rather iterate from sources to sink, making the sources root
> nodes and the sinks the leaf nodes. However, I have no clue how it's done
> in similar cases, so please take that hint cautiously.
>
> >> 2) I'd make the algorithm to find the subtasks iterative and react in
> CheckpointCoordinator. Let's assume that we inject the barrier at all root
> subtasks (initially all sources). So in the iterative algorithm, whenever
> root A finishes, it looks at all connected subtasks B if they have any
> upstream task left. If not B becomes a new root. That would require to only
> touch a part of the job graph, but would require some callback from
> JobManager to CheckpointCoordinator.
>
>
> I think I should have used a bad name of "leaf nodes", in fact I think we
> should have the same thoughts that we start with the source nodes to find
> all the nodes whose precedent nodes are all finished. It would be much
> better to call these nodes (which we would trigger) as "root nodes". I'll
> modify the FLIP to change the names to "root nodes".
>
> >> 2b) We also need to be careful for out-of-sync updates: if the root is
> about to finish, we could send the barrier to it from
> CheckpointCoordinator, but at the time it arrives, the subtask is finished
> already.
>
> Exactly. When the checkpoint triggers a task but found the task is not
> there, it may then further check if the task has been finished, if so, it
> should then re-check its descendants to see if there are new "root nodes"
> to trigger.
>
> >> 3) An implied change is that checkpoints are not aborted anymore at
> EndOfPartition, which is good, but might be explicitly added.
>
> Yes, currently barrier alignment would fail the current checkpoint on
> EndOfPartition, and we would modify the behavior.
>
> >> 4) The interaction between unaligned checkpoint and EndOfPartition is a
> bit ambiguous: What happens when an unaligned checkpoint is started and
> then one input channel contains the EndOfPartition event? From the written
> description, it sounds to me like, we move back to an aligned checkpoint
> for the whole receiving task. However, that is neither easily possible nor
> necessary. Imho it would be enough to also store the EndOfPartition in the
> channel state.
>
>
> Very thanks for the suggestions on this issue and in fact I did stuck on
> it for some time. Previously for me one implementation detail issue is that
> EndOfPartition seems not be able to overtake the previous buffers easily as
> CheckpointBarrier does, otherwise it might start destroying the input
> channels if all EndOfPartitions are received.
>
> Therefore, although we could also persistent the channels with
> EndOfPartition:
>
> 1. Start persisting the channels when CheckpointUnaligner received barrier
> (if not all precendant tasks are finished) or received triggering (if all
> precendant tasks are finished).
>
> 2. The persisting actually stops when onBuffer received EndOfPartition.
>
> After the last channel stopped persisting, CheckpointUnaligner still need
> to wait till all the previous buffers are processed before complete the
> allBarriersReceivedFuture. Therefore it would not be able to accelerate the
> checkpoint in this case.
>
> After some rethinking today currently I think we might inserts some
> additional virtual events into receivedBuffer when received EndOfPartition
> and allows these virtual events to overtake the previous buffers. I'll try
> to double check if it is feasible and let me know if there are also other
> solutions on this issue :).
>
> > 5) I'd expand the recovery section a bit. It would be the first time
> that we recover an incomplete DAG. Afaik the subtasks are deployed before
> the state is recovered, so at some point, the subtasks either need to be
> removed again or maybe we could even 

Re: Additional options to S3 Filesystem: Interest?

2020-10-12 Thread Dan Diephouse
We use the StreamingFileSink. An option to expire files after some time
period would certainly be welcome. (I could probably figure out a way to do
this from the S3 admin UI too though)

On Sat, Oct 10, 2020 at 10:45 PM Padarn Wilson  wrote:

> Hi Flink Users,
>
> We need to expose some additional options for the s3 hadoop filesystem:
> Specifically, we want to set object tagging and lifecycle. This would be a
> fairly easy change and we initially thought to create a new Filsystem with
> very minor changes to allow this.
>
> However then I wondered, would others use this? If it something that is
> worth raising as a Flink issue and then contributing back upstream.
>
> Any others who would like to be able to set object tags for the s3
> filesystem?
>
> Cheers,
> Padarn
>


-- 
Dan Diephouse
@dandiep


FW: NPE in disposing flink sql group window when running flink using ./gradlew shadowJar run

2020-10-12 Thread Dcosta, Agnelo (HBO)

Flink application using kafka topics as source and destination. Using
javaVersion = '1.11'
flinkVersion = '1.11.1'
scalaBinaryVersion ='2.11'
the application is primarily using Flink SQL apis. We have a StatementSet and 
add sql inserts to that set using addInsertSql.
when there are more insert statements (say 10+) running job outside of flink 
cluster using ./gradlew shadowJar run fails with following error
[GroupWindowAggregate(groupBy………) ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of 
stream operator.
java.lang.NullPointerException: null
at 
org.apache.flink.table.runtime.operators.window.WindowOperator.dispose(WindowOperator.java:318)
 ~[flink-table-runtime-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:703)
 [flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:635)
 [flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:542) 
[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
[flink-runtime_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[flink-runtime_2.11-1.11.1.jar:1.11.1]
at java.lang.Thread.run(Thread.java:834) [?:?]

some approaches that did not work:
Having multiple statement set. This works locally, but not with flink cluster 
with error saying cannot have multiple execute statements.
Changing partition count of kafka topics. No impact.


This e-mail is intended only for the use of the addressees. Any copying, 
forwarding, printing or other use of this e-mail by persons other than the 
addressees is not authorized. This e-mail may contain information that is 
privileged, confidential and exempt from disclosure. If you are not the 
intended recipient, please notify us immediately by return e-mail (including 
the original message in your reply) and then delete and discard all copies of 
the e-mail. Thank you.
HB75


Re: Additional options to S3 Filesystem: Interest?

2020-10-12 Thread Padarn Wilson
Thanks for the feedback. I've created a JIRA here
https://issues.apache.org/jira/browse/FLINK-19589.

@Dan: This indeed would make it easier to set a lifetime property on
objects created by Flink, but actually if you want to apply it to all your
objects for a given bucket you can set bucket wide policies instead. The
reason I want this is that we have a shared bucket and wish to tag
different objects based on which pipeline is producing them.

On Tue, Oct 13, 2020 at 4:13 AM Dan Diephouse  wrote:

> We use the StreamingFileSink. An option to expire files after some time
> period would certainly be welcome. (I could probably figure out a way to do
> this from the S3 admin UI too though)
>
> On Sat, Oct 10, 2020 at 10:45 PM Padarn Wilson  wrote:
>
>> Hi Flink Users,
>>
>> We need to expose some additional options for the s3 hadoop filesystem:
>> Specifically, we want to set object tagging and lifecycle. This would be a
>> fairly easy change and we initially thought to create a new Filsystem with
>> very minor changes to allow this.
>>
>> However then I wondered, would others use this? If it something that is
>> worth raising as a Flink issue and then contributing back upstream.
>>
>> Any others who would like to be able to set object tags for the s3
>> filesystem?
>>
>> Cheers,
>> Padarn
>>
>
>
> --
> Dan Diephouse
> @dandiep
>


Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

2020-10-12 Thread Xingbo Huang
Hi,

You can use api to set configuration:
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')

The flink-conf.yaml way will only take effect when submitted through flink
run, and the minicluster way(python xxx.py) will not take effect.

Best,
Xingbo

Sharipov, Rinat  于2020年10月13日周二 上午1:56写道:

> Hi mates !
>
> I'm very new at pyflink and trying to register a custom UDF function using
> python API.
> Currently I faced an issue in both server env and my local IDE
> environment.
>
> When I'm trying to execute the example below I got an error message: *The
> configured Task Off-Heap Memory 0 bytes is less than the least required
> Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
> using the configuration key 'taskmanager.memory.task.off-heap.size*
>
> Of course I've added required property into *flink-conf.yaml *and checked
> that *pyflink-shell.sh *initializes env using specified configuration but
> it doesn't make any sense and I still have an error.
>
> I've also attached my flink-conf.yaml file
>
> Thx for your help !
>
> *Here is an example:*
>
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import BatchTableEnvironment, DataTypes
> from pyflink.table.udf import udf
>
>
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def test_udf(i):
> return i
>
>
> if __name__ == "__main__":
> env = ExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
>
> bt_env = BatchTableEnvironment.create(env)
> bt_env.register_function("test_udf", test_udf)
>
> my_table = bt_env.from_elements(
> [
> ("user-1", "http://url/1";),
> ("user-2", "http://url/2";),
> ("user-1", "http://url/3";),
> ("user-3", "http://url/4";),
> ("user-1", "http://url/3";)
> ],
> [
> "uid", "url"
> ]
> )
>
> my_table_grouped_by_uid = my_table.group_by("uid").select("uid, 
> collect(url) as urls")
> bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)
>
> bt_env.execute_sql("select test_udf(uid) as uid, urls from 
> my_temp_table").print()
>
>
>
>


Missing annotation in SimpleJdbcConnectionProvider.java ?

2020-10-12 Thread Kenzyme
Hi,

I would like to know if class 
[SimpleJdbcConnectionProvider](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProvider.java).java
 should be marked as @Internal or @PublicEvolving? Since there's annotation 
provided, I'm unsure if I should be using this class in my user code.

If one of them is missing, I would like to create a PR for this if that's fine 
as well.

Thank you for the clarification!

Kenzyme Le

Re: Missing annotation in SimpleJdbcConnectionProvider.java ?

2020-10-12 Thread Kenzyme
After careful examination, seems like it should be marked as @Internal since 
this class is located in package 
org.apache.flink.connector.jdbc.internal.connection.

Here is my PR related to this https://github.com/apache/flink/pull/13603 .

Thanks a lot!

Kenzyme Le

‐‐‐ Original Message ‐‐‐
On Monday, October 12th, 2020 at 10:48 PM, Kenzyme  wrote:

> Hi,
>
> I would like to know if class 
> [SimpleJdbcConnectionProvider](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProvider.java).java
>  should be marked as @Internal or @PublicEvolving? Since there's annotation 
> provided, I'm unsure if I should be using this class in my user code.
>
> If one of them is missing, I would like to create a PR for this if that's 
> fine as well.
>
> Thank you for the clarification!
>
> Kenzyme Le

Dynamic file name prefix - StreamingFileSink

2020-10-12 Thread Vijayendra Yadav
Hi Team,

I have tried to assign a dynamic prefix for file name, which contains
datetime components.
*The Problem is Job always takes initial datetime when job first starts and
never refreshes later. *
*How can I get dynamic current datetime in filename at sink time ?*

*.withPartPrefix
(ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))*


https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html

val config = OutputFileConfig
 .builder() .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build()
val sink = StreamingFileSink
 .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
 .build()


RE: state access causing segmentation fault

2020-10-12 Thread Colletta, Edward
Thanks Arvid,

I added static to ExecQueue and this did fix the problem.  I tested without 
static on RingBufferExec because it seems that if ExecQueue is static nested, 
there should be no reference to the MyKeyedProcessFunction object as 
RingBufferExec is an inner class of ExecQueue.

However, I did that just for the test.  For my prod code, going forward,  I am 
following flink’s rules for POJO types, adding static to any inner class,  and 
checking for any POJO warnings in the logs.


From: Arvid Heise 
Sent: Sunday, October 11, 2020 3:46 PM
To: Colletta, Edward 
Cc: Dawid Wysakowicz ; user@flink.apache.org
Subject: Re: state access causing segmentation fault

This email is from an external source - exercise caution regarding links and 
attachments.

Hi Edward,

could you try adding the static keyword to ExecQueue and RingBufferExec? As is 
they hold a reference to the MyKeyedProcessFunction, which has unforeseen 
consequences.

On Sun, Oct 11, 2020 at 5:38 AM Colletta, Edward 
mailto:edward.colle...@fmr.com>> wrote:
Tried to attach tar file but it got blocked.   Resending with files attached 
individually.


Ok, have minimal reproducible example.   Attaching a tar file of the job that 
crashed.

The crash has nothing to do with the number of state variables.  But it does 
seem to be caused by using a type for the state variable that is a class nested 
in the KeyedProcessFunction.

Reduced to a single state variable.  The type of the state variable was a class 
(ExecQueue) defined in class implementing KeyedProcessFunction.  Moving the 
ExecQueue definition to its own file fixed the problem.



The attached example always crashes  the taskManager in 30 seconds to 5 minutes.



MyKeyedProcessFunction.java  and also cut and pasted here:



package crash;



import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.common.typeinfo.TypeHint;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;

import 
org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;

import org.apache.flink.util.Collector;



public class MyKeyedProcessFunction extends KeyedProcessFunction {

private static final Logger LOG = 
LoggerFactory.getLogger(MyKeyedProcessFunction.class);

public TypeInformation leftTypeInfo;

public transient ValueState leftState;



public int initQueueSize;

public long emitFrequencyMs;



public MyKeyedProcessFunction() {

initQueueSize = 10;

emitFrequencyMs = 1;

}



@Override

public void open(Configuration conf) {

leftTypeInfo = TypeInformation.of(new TypeHint(){});

leftState = getRuntimeContext().getState(

new ValueStateDescriptor<>("left", leftTypeInfo, null));

}



@Override

public void processElement(Exec leftIn, Context ctx, Collector out) {

try {

ExecQueue eq = leftState.value();

if (eq == null) {

eq = new ExecQueue(10);


ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + emitFrequencyMs);

}

leftState.update(eq);

}

catch (Exception e) {

LOG.error("Exception in processElement1. Key: " + 
ctx.getCurrentKey() + ". " + e + ". trace = " );

for (java.lang.StackTraceElement s:e.getStackTrace())

LOG.error(s.toString());



}

}





@Override

public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) {

try {

ExecQueue eq = leftState.value();


ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + emitFrequencyMs);

}

catch ( Exception e) {

LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey() + ". 
" + e + ". trace = " );

for (java.lang.StackTraceElement s:e.getStackTrace())

LOG.error(s.toString());

}

}

public class ExecQueue {

public RingBufferExec queue;

public ExecQueue (){}

public ExecQueue (int initSize) {

queue = new RingBufferExec(initSize);

}



public class RingBufferExec {

public Integer size;

public Integer count;

public RingBufferExec(){ }

public RingBufferExec(int sizeIn){

size = sizeIn;

count = 0;

}

}

}

}


From: Dawid Wysakowicz mailto:dwysakow...@apache.org>>
Sent: Thursday, October 8, 2020 6:26 AM
To: Colletta, Edward mailto:edward.colle...@fmr.com>>

Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread Congxian Qiu
Hi
As others said, state is different as checkpoint.  a checkpoint is just
a **snapshot** of the state, and you can restore from the previous
checkpoint if the job crashed.

state is for stateful computation, and checkpoint is for
fault-tolerant[1]

The state keeps the information you'll need in the future. Take
wordcount as an example, the count of the word depends on the total count
of the word we have seen, we need to keep the "total count of the word have
seen before" somewhere, in Flink you can keep it in the state.
checkpoint/savepoint contains the **snapshot** of all the state, if
there is not state, then the checkpoint will be *empty*, you can restore
from it, but the content is empty.

PS: maybe you don't create state explicit, but there contain some
states in Flink(such as WindowOperator)

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/stateful-stream-processing.html
Best,
Congxian


大森林  于2020年10月12日周一 下午9:26写道:

> Thanks for your replies.
> When I use no state-relevant code in my program,the checkingpoint can be
> saved and resumed.❶
>
> So then why we need *Keyed State/Operator State/Stateful Function*?❷
> *"the operators are reset to the time of the respective checkpoint."*
> We already have met the requirement:*"resume from checkpoint(last state
> of each operator which store the result)"*❶,
> why we still need ❷?
> Thanks for your help~!
>
>
>
> -- 原始邮件 --
> *发件人:* "Arvid Heise" ;
> *发送时间:* 2020年10月12日(星期一) 下午2:53
> *收件人:* "大森林";
> *抄送:* "Shengkai Fang";"user";
> *主题:* Re: why we need keyed state and operate state when we already have
> checkpoint?
>
> Hi 大森林,
>
> You can always resume from checkpoints independent of the usage of keyed
> or non-keyed state of operators.
> 1 checkpoint contains the state of all operators at a given point in time.
> Each operator may have keyed state, raw state, or non-keyed state.
> As long as you are not changing the operators (too much) before
> restarting, you can always restart.
>
> During (automatic) restart of a Flink application, the state of a given
> checkpoint is restored to the operators, such that it looks like the
> operator never failed. However, the operators are reset to the time of the
> respective checkpoint.
>
> I have no clue what you mean with "previous variable temporary result".
>
> On Wed, Oct 7, 2020 at 9:13 AM 大森林  wrote:
>
>> Thanks for your replies,I have some understandings.
>>
>> There are two cases.
>> 1. if I use no keyed state in program,when it's killed,I can only resume
>> from previous result
>> 1. if I use  keyed state in program,when it's killed,I can
>>  resume from previous result and previous variable temporary result.
>>
>> Am I right?
>> Thanks for your guide.
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Arvid Heise" ;
>> *发送时间:* 2020年10月7日(星期三) 下午2:25
>> *收件人:* "大森林";
>> *抄送:* "Shengkai Fang";"user";
>> *主题:* Re: why we need keyed state and operate state when we already have
>> checkpoint?
>>
>> I think there is some misunderstanding here: a checkpoint IS (a snapshot
>> of) the keyed state and operator state (among a few more things). [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#definitions
>>
>> On Wed, Oct 7, 2020 at 6:51 AM 大森林  wrote:
>>
>>> when the job is killed,state is also misssing.
>>> so why we need keyed state?Is keyed state useful when we try to resuming
>>> the killed job?
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Shengkai Fang" ;
>>> *发送时间:* 2020年10月7日(星期三) 中午12:43
>>> *收件人:* "大森林";
>>> *抄送:* "user";
>>> *主题:* Re: why we need keyed state and operate state when we already
>>> have checkpoint?
>>>
>>> The checkpoint is a snapshot for the job and we can resume the job if
>>> the job is killed unexpectedly. The state is another thing to memorize the
>>> intermediate result of calculation. I don't think the checkpoint can
>>> replace state.
>>>
>>> 大森林  于2020年10月7日周三 下午12:26写道:
>>>
 Could you tell me:

 why we need keyed state and operator state when we already have
 checkpoint?

 when a running jar crash,we can resume from the checkpoint
 automatically/manually.
 So why did we still need keyed state and operator state.

 Thanks

>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink 

Flink Kafka offsets

2020-10-12 Thread Rex Fenley
Hello,

I've been trying to configure the offset start position for a flink kafka
consumer. when there is no committed offset, to always start at the
beginning. It seems like the typical way to do this would be setting
auto.offset.reset=earliest however, I don't see that configuration property
in the documentation.
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

However, I do see scan.startup.mode = earliest-offset, but from the docs it
sounds like this would mean it would never commit an offset and flink would
always start consuming from the beginning of the kafka stream, which is not
what I want.

Is this the case or am I misunderstanding? How can I get the behavior that
I wish to see, where committed offsets are respected, but no offset means
start at the beginning of the kafka log stream?

Thanks!
-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

2020-10-12 Thread Sharipov, Rinat
Hi Xingbo, thx a lot, it works !

But I'm still sure that it's not obvious from a user point of view,
that *pyflink-shell.sh
*doesn't use provided flink-conf.yaml, don't you think that it looks like
an issue ?

Thx !

вт, 13 окт. 2020 г. в 05:35, Xingbo Huang :

> Hi,
>
> You can use api to set configuration:
> table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '80m')
>
> The flink-conf.yaml way will only take effect when submitted through flink
> run, and the minicluster way(python xxx.py) will not take effect.
>
> Best,
> Xingbo
>
> Sharipov, Rinat  于2020年10月13日周二 上午1:56写道:
>
>> Hi mates !
>>
>> I'm very new at pyflink and trying to register a custom UDF function
>> using python API.
>> Currently I faced an issue in both server env and my local IDE
>> environment.
>>
>> When I'm trying to execute the example below I got an error message: *The
>> configured Task Off-Heap Memory 0 bytes is less than the least required
>> Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
>> using the configuration key 'taskmanager.memory.task.off-heap.size*
>>
>> Of course I've added required property into *flink-conf.yaml *and
>> checked that *pyflink-shell.sh *initializes env using specified
>> configuration but it doesn't make any sense and I still have an error.
>>
>> I've also attached my flink-conf.yaml file
>>
>> Thx for your help !
>>
>> *Here is an example:*
>>
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.table import BatchTableEnvironment, DataTypes
>> from pyflink.table.udf import udf
>>
>>
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def test_udf(i):
>> return i
>>
>>
>> if __name__ == "__main__":
>> env = ExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>>
>> bt_env = BatchTableEnvironment.create(env)
>> bt_env.register_function("test_udf", test_udf)
>>
>> my_table = bt_env.from_elements(
>> [
>> ("user-1", "http://url/1";),
>> ("user-2", "http://url/2";),
>> ("user-1", "http://url/3";),
>> ("user-3", "http://url/4";),
>> ("user-1", "http://url/3";)
>> ],
>> [
>> "uid", "url"
>> ]
>> )
>>
>> my_table_grouped_by_uid = my_table.group_by("uid").select("uid, 
>> collect(url) as urls")
>> bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)
>>
>> bt_env.execute_sql("select test_udf(uid) as uid, urls from 
>> my_temp_table").print()
>>
>>
>>
>>