Re: How Flink distinguishes between late and in-time events?

2020-08-20 Thread Ori Popowski
That makes sense. Thanks On Thu, Aug 20, 2020 at 7:45 PM Piotr Nowojski wrote: > Hi Ori, > > No. Flink does it differently. Operators that are keeping track of late > events, are remembering the latest watermark. If a new element arrives with > even time lower compared to the latest watermark, i

AWS EMR deployment error : NoClassDefFoundError org/apache/flink/api/java/typeutils/ResultTypeQueryable

2020-08-20 Thread Manas Kale
Hi, I am trying to deploy a Flink jar on AWS EMR service. I have ensured that Flink v1.10.0 is used in my pom file as that's the version supported by EMR. However, I get the following error: Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/typeutils/ResultTypeQu

flink1.10中hive module 没有plus,greaterThan等函数

2020-08-20 Thread faaron zheng
Hi all, 我在使用flink1.10的sql-client时候发现使用hive module时会缺少某些core module 中的build-in function比如plus,greaterThan。这会导致同样的sql core module可以执行成功,hive module却会报错,比如在使用row_number() over()时候。这是什么原因?

Re: Flink checkpointing with Azure block storage

2020-08-20 Thread Yun Tang
Hi Boris I think the official guide [1] should be enough to tell you how to configure. However, I think your changes to flink-conf.ymal might not take effect as you have configured the state backend as 'filesystem' while logs still tell us that "No state backend has been configured, using defaul

Monitor the usage of keyed state

2020-08-20 Thread Mu Kong
Hi community, I have a Flink job running with RichMapFunction that uses keyed state. Although the TTL is enabled, I wonder if there is a way that I can monitor the memory usage of the keyed state. I'm using RocksDB as the state backend. Best regards, Mu

JSON to Parquet

2020-08-20 Thread Averell
Hello, I have a stream with each message is a JSON string with a quite complex schema (multiple fields, multiple nested layers), and I need to write that into parquet files after some slight modifications/enrichment. I wonder what options are available for me to do that. I'm thinking of JSON -> A

Re: Flink checkpointing with Azure block storage

2020-08-20 Thread Boris Lublinsky
To test it, I created flink-conf.yaml file and put it in resource directory of my project The file contains the following: #== # Fault tolerance and checkpointing #=

Flink checkpointing with Azure block storage

2020-08-20 Thread Boris Lublinsky
Is there somewhere a complete configuration example for such option?

Re: No space left on device exception

2020-08-20 Thread Chesnay Schepler
That should work as well. On 20/08/2020 22:46, Vishwas Siravara wrote: Thank you Chesnay. Yes but I could change the staging directory by adding -Djava.io.tmpdir=/data/flink-1.7.2/tmp to /env.java.opts /in the flink-conf.yaml file. Do you see any problem with that? Best, Vishwas On Thu, Aug

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
Thank you Chesnay. Yes but I could change the staging directory by adding -Djava.io.tmpdir=/data/flink-1.7.2/tmp to *env.java.opts *in the flink-conf.yaml file. Do you see any problem with that? Best, Vishwas On Thu, Aug 20, 2020 at 2:01 PM Chesnay Schepler wrote: > Could you try adding this to

Re: No space left on device exception

2020-08-20 Thread Chesnay Schepler
Could you try adding this to your flink-conf.yaml? s3.staging-directory:/usr/mware/flink/tmp On 20/08/2020 20:50, Vishwas Siravara wrote: Hi Piotr, I did some analysis and realised that the temp files for s3 checkpoints are staged in /tmp although the /io.tmp.dirs /is set to a different direc

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
Hi Piotr, I did some analysis and realised that the temp files for s3 checkpoints are staged in /tmp although the *io.tmp.dirs *is set to a different directory. ls -lrth drwxr-xr-x. 2 was was 32 Aug 20 17:52 hsperfdata_was -rw---. 1 was was 505M Aug 20 18:45 presto-s3-8158855

Debezium Flink EMR

2020-08-20 Thread Rex Fenley
Hi, I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html https://ci.apache.org/projects/

Re: Decompose failure recovery time

2020-08-20 Thread Zhinan Cheng
Hi Piotr, Thanks a lot. I will try your suggestion to see what happen. Regards, Zhinan On Fri, 21 Aug 2020 at 00:40, Piotr Nowojski wrote: > > Hi Zhinan, > > It's hard to say, but my guess it takes that long for the tasks to respond to > cancellation which consists of a couple of steps. If a t

Re: ERROR : RocksDBStateBackend

2020-08-20 Thread Vijayendra Yadav
Hi Till/ Piotr, *My process was working with : FsStateBackend but when I switched to RocksDBStateBackend I faced this problem. My class path is below. * *Related jar in classpath: * /usr/lib/hadoop-yarn/hadoop-yarn-api-2.8.5-amzn-6.jar:/usr/lib/hadoop-yarn/hadoop-yarn-api.jar: *Classpath:* :co

Re: How Flink distinguishes between late and in-time events?

2020-08-20 Thread Piotr Nowojski
Hi Ori, No. Flink does it differently. Operators that are keeping track of late events, are remembering the latest watermark. If a new element arrives with even time lower compared to the latest watermark, it is marked as a late event [1] Piotrek [1] https://ci.apache.org/projects/flink/flink-do

Re: Decompose failure recovery time

2020-08-20 Thread Piotr Nowojski
Hi Zhinan, It's hard to say, but my guess it takes that long for the tasks to respond to cancellation which consists of a couple of steps. If a task is currently busy processing something, it has to respond to interruption (`java.lang.Thread#interrupt`). If it takes 30 seconds for a task to react

Re: Flink checkpoint recovery time

2020-08-20 Thread Zhinan Cheng
Hi Till, Thanks for the quick reply. Yes, the job actually restarts twice, the metric fullRestarts also indicates this, its value is 2. But the job indeed takes around 30s to switch from CANCELLING to RESTARTING in its first restart. I just wonder why it takes so long here? Also, even I set the

Re: Decompose failure recovery time

2020-08-20 Thread Zhinan Cheng
Hi Piotr, Thanks a lot for your help. Yes, I finally realize that I can only approximate the time for [1] and [3] and measure [2] by monitoring the uptime and downtime metric provided by Flink. And now my problem is that I found the time in [2] can be up to 40s, I wonder why it takes so long to r

Re: Flink checkpoint recovery time

2020-08-20 Thread Till Rohrmann
Hi Zhinan, the logs show that the cancellation does not take 30s. What happens is that the job gets restarted a couple of times. The problem seems to be that one TaskManager died permanently but it takes the heartbeat timeout (default 50s) until it is detected as dead. In the meantime the system t

Re: ERROR : RocksDBStateBackend

2020-08-20 Thread Till Rohrmann
Here is a link to information on how to integrate Flink with Hadoop [1]. In the latest version you only need to point Flink to the Hadoop libraries via setting the HADOOP_CLASSPATH environment variable. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html Ch

Re: ERROR : RocksDBStateBackend

2020-08-20 Thread Till Rohrmann
I agree with Piotr's analysis. It should not matter whether you are using RocksDBStateBackend or not. It seems as if you have a Hadoop dependency clash. Could you check which dependencies are on the class path? Cheers, Till On Thu, Aug 20, 2020 at 3:52 PM Piotr Nowojski wrote: > Hi, > > It look

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
Hi Piotr, Thank you for your suggestion. I will try that, are the temporary files created in the directory set in *io.tmp.dirs* in the flink-conf.yaml ? Would these files be the same size as checkpoints ? Thanks, Vishwas On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski wrote: > Hi, > > As far as

How Flink distinguishes between late and in-time events?

2020-08-20 Thread Ori Popowski
In the documentation it states that: *[…], Flink keeps the state of windows until their allowed lateness expires. Once this happens, Flink removes the window and deletes its state, as also d

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-20 Thread Kostas Kloudas
Hi all, Thanks for the comments! @Dawid: "execution.mode" can be a nice alternative and from a quick look it is not used currently by any configuration option. I will update the FLIP accordingly. @David: Given that having the option to allow timers to fire at the end of the job is already in the

Re: Watermark generation issues with File sources in Flink 1.11.1

2020-08-20 Thread Arti Pande
Hi Till, Thank you for your quick response. Both the AssignerWithPeriodicWatermarks and WatermarkStrategy I am using are very simple ones. *Code for AssignerWithPeriodicWatermarks:* public class CustomEventTimeWatermarkGenerator implements AssignerWithPeriodicWatermarks { private final long

Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-20 Thread Piotr Nowojski
Thank you for the clarification Chesney and sorry for the incorrect previous answer. Piotrek czw., 20 sie 2020 o 15:59 Chesnay Schepler napisał(a): > This is incorrect; we do store the JobGraph in ZooKeeper. If you just > delete the deployment the cluster will recover the previous JobGraph > (a

Re: Stream job with Event Timestamp (Backfill getting incorrect results)

2020-08-20 Thread Piotr Nowojski
Hi, It's hard for me to help you debug your code, but as long as: - you are using event time for processing records (in operators like `WindowOperator`) - you do not have late records - you are replaying the same records - your code is deterministic - you do not rely on the order of the records F

Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-20 Thread Chesnay Schepler
This is incorrect; we do store the JobGraph in ZooKeeper. If you just delete the deployment the cluster will recover the previous JobGraph (assuming you aren't changing the Zookeeper configuration). If you wish to update the job, then you should cancel it (along with creating a savepoint), whi

Re: ERROR : RocksDBStateBackend

2020-08-20 Thread Piotr Nowojski
Hi, It looks more like a dependency convergence issue - you have two conflicting versions of `org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest` on the class path. Or you built your jar with one version and trying to execute it with a different one. Till is it some kind of a known issue?

Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-20 Thread Piotr Nowojski
Hi Alexey, I might be wrong (I don't know this side of Flink very well), but as far as I know JobGraph is never stored in the ZK. It's always recreated from the job's JAR. So you should be able to upgrade the job by replacing the JAR with a newer version, as long as the operator UIDs are the same

Re: No space left on device exception

2020-08-20 Thread Piotr Nowojski
Hi, As far as I know when uploading a file to S3, the writer needs to first create some temporary files on the local disks. I would suggest to double check all of the partitions on the local machine and monitor available disk space continuously while the job is running. If you are just checking th

Re: Decompose failure recovery time

2020-08-20 Thread Piotr Nowojski
Hi, > I want to decompose the recovery time into different parts, say > (1) the time to detect the failure, > (2) the time to restart the job, > (3) and the time to restore the checkpointing. 1. Maybe I'm missing something, but as far as I can tell, Flink can not help you with that. Time to detec

Stream job with Event Timestamp (Backfill getting incorrect results)

2020-08-20 Thread aj
I have a streaming job where I am doing window operation on *"user_id" *and then doing some summarization based on some time bases logic like : 1. end the session based on 30 mins inactivity of the user. 2. The End_trip event or cancellation event has arrived for the user. I am trying to rerun

Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-20 Thread Stephan Ewen
We have removed some public methods in the past, after a careful deprecation period, if they were not well working any more. The sentiment I got from users is that careful cleanup is in fact appreciated, otherwise things get confusing over time (the deprecated methods cause noise in the API). Stil

Re: Table API Kafka Connector Sink Key Definition

2020-08-20 Thread Till Rohrmann
This is indeed not optimal. Could you file a JIRA issue to add this functionality? Thanks a lot Yuval. Cheers, Till On Thu, Aug 20, 2020 at 9:47 AM Yuval Itzchakov wrote: > Hi Till, > KafkaSerializationSchema is only pluggable for the DataStream API, not for > the Table API. KafkaTableSink hard

Same kafka partition being consumed by multiple task managers.

2020-08-20 Thread Deshpande, Omkar
Hello, I am running a streaming Beam app with the Flink runner(java). * Beam 2.19 * Flink 1.9 Checkpoints and savepoints are configured to go to s3 and HA is enabled using Zookeeper. I was running the app with 3 task managers. I took a savepoint and started the app with 6 task manage

Re: Table API Kafka Connector Sink Key Definition

2020-08-20 Thread Yuval Itzchakov
Hi Till, KafkaSerializationSchema is only pluggable for the DataStream API, not for the Table API. KafkaTableSink hard codes a KeyedSerializationSchema that uses a null key, and this behavior can't be overridden. I have to say I was quite surprised by this behavior, as publishing events to Kafka u

Re: Table API Kafka Connector Sink Key Definition

2020-08-20 Thread Dawid Wysakowicz
Hi Yuval, Unfortunately setting the key or timestamp (or other metadata) from the SQL API is not supported yet. There is an ongoing discussion to support it[1]. Right now your option would be to change the code of KafkaTableSink and write your own version of KafkaSerializationSchema as Till menti

Re: Table API Kafka Connector Sink Key Definition

2020-08-20 Thread Till Rohrmann
Hi Yuval, it looks as if the KafkaTableSink only supports writing out rows without a key. Pulling in Timo for verification. If you want to use a Kafka producer which writes the records out with a key, then please take a look at KafkaSerializationSchema. It supports this functionality. Cheers, Ti