Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-11-21 Thread Ravi Bhushan Ratnakar
Hi congxian, Thank you for your reply. As I shared details in my previous mail, in my case, last successful checkpoint is missing details for some of the shards. I am not doing any up scale or down scale of kinesis shard. I always run with fix number of shards, so there is no possibility of new sh

Re: Re: how to setup a ha flink cluster on k8s?

2019-11-21 Thread Yang Wang
Hi Rock, If you correctly set the restart strategy, i think the jobmanager will failover and relaunched again. Also the job will be recovered, please share more logs about jobmanager if you want. Best, Yang Rock 于2019年11月20日周三 下午2:57写道: > Hi Yang Wang, > > Thanks for your reply, I MAY HAVE se

Re: Flink on YARN: frequent "container released on a *lost* node"

2019-11-21 Thread Yang Wang
Hi Amran, >> Container released on a *lost* node If you see such exceptions, it means that the corresponding Yarn NodeManager has lost. So all the containers running on this node will be released. The Flink YarnResourceManager receives the 'lost' message from Yarn ResourceManager and will allocat

Re: Dynamically creating new Task Managers in YARN

2019-11-21 Thread Yang Wang
Hi Piper, Jingsong is right. Both per-job and session cluster, the YarnResourceManager will allocate taskmanager containers dynamically on demand. For per-job cluster, it will allocate taskmanagers base on the job slot demand. The excess containers will return to yarn immediately. When the job fi

Apache Flink - Uid and name for Flink sources and sinks

2019-11-21 Thread M Singh
Hi Folks: I am assigning uid and name for all stateful processors in our application and wanted to find out the following: 1. Should we assign uid and name to the sources and sinks too ?  2. What are the pros and cons of adding uid to sources and sinks ?3. The sinks have uid and hashUid - which

Re: Dynamically creating new Task Managers in YARN

2019-11-21 Thread Piper Piper
Thank you, I will check it out. On Thu, Nov 21, 2019, 9:21 PM Jingsong Li wrote: > Hi Piper, > > AFAIK, There are no these flexible operations. You can get some > information from metrics, but you can not control them. > Maybe you should modify some source code in flink-yarn. > > Best, > Jingson

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-21 Thread Zhijiang
The hint of mmap usage below is really helpful to locate this problem. I forgot this biggest change for batch job in release-1.9. The blocking type option can be set to `file` as Piotr suggested to behave similar as before. I think it can solve your problem. ---

Re: Dynamically creating new Task Managers in YARN

2019-11-21 Thread Jingsong Li
Hi Piper, AFAIK, There are no these flexible operations. You can get some information from metrics, but you can not control them. Maybe you should modify some source code in flink-yarn. Best, Jingsong Lee On Thu, Nov 21, 2019 at 8:17 PM Piper Piper wrote: > Hi Jingsong, > > Thank you for your

Re: Flink on YARN: frequent "container released on a *lost* node"

2019-11-21 Thread vino yang
Hi Amran, Did you monitor or have a look at your memory metrics(e.g. full GC) of your TM. There is a similar thread that a user reported the same question due to full GC, the link is here[1]. Best, Vino [1]: http://mail-archives.apache.org/mod_mbox/flink-user/201709.mbox/%3cfa4068d3-2696-4f29-8

Re: Savepoints and checkpoints

2019-11-21 Thread Congxian Qiu
Hi First, Checkpoint for Flink is a distributed snapshot of the job. As Yun said, Kafka consumer will snapshot the topic name and partition to the checkpoint, then when restoring from the last checkpoint you do not know about the newly topic name. Inner the checkpoint, you can think checkpoint as

Re: Retrieving Flink job ID/YARN Id programmatically

2019-11-21 Thread vino yang
Hi Lei, It would be better to use Flink's RESTful API to fetch the information of the running jobs[1]. [1]: https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-1 Best, Vino Lei Nie 于2019年11月22日周五 上午4:14写道: > I looked at the code, and > StreamExecutionEnvironme

Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-11-21 Thread Congxian Qiu
Hi For idle shards, I think restore from the previous not consumed data is ok, because Flink did not consume any data before, but for not idle shards this is problematic. From my experience of other connectors, could you check whether the "error" shards are newly split? maybe the newly split shard

Flink on YARN: frequent "container released on a *lost* node"

2019-11-21 Thread amran dean
Hello, I am frequently seeing this error in my jobmanager logs: 2019-11-18 09:07:08,863 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: kdi_g2 -> (Sink: s_id_metadata, Sink: t_id_metadata) (23/24) (5e10e88814fe4ab0be5f554ec59bd93d) switched from RUNNING to FAILED. jav

Re: Metrics for Task States

2019-11-21 Thread Piper Piper
Thank you, Kelly! On Thu, Nov 21, 2019 at 4:06 PM Kelly Smith wrote: > Hi Piper, > > > > The repro is pretty simple: > >- Submit a job with parallelism set higher than YARN has resources to >support > > > > What this ends up looking like in the Flink UI is this: > > > > The Job is in a “

Re: Streaming data to Segment

2019-11-21 Thread Li Peng
Awesome, I'll definitely try that out, thanks! On Wed, Nov 20, 2019 at 9:36 PM Yuval Itzchakov wrote: > Hi Li, > > You're in the right direction. One additional step would be to use > RickSinkFunction[Data] instead of SinkFunction[Data] which exposes open and > close functions which allow you to

Re: Metrics for Task States

2019-11-21 Thread Kelly Smith
Hi Piper, The repro is pretty simple: * Submit a job with parallelism set higher than YARN has resources to support What this ends up looking like in the Flink UI is this: [cid:image001.png@01D5A06C.6E16D580] The Job is in a “RUNNING” state, but all of the tasks are in the “SCHEDULED” sta

Re: Metrics for Task States

2019-11-21 Thread Piper Piper
Hello Kelly, I thought that Flink scheduler only starts a job if all requested containers/TMs are available and allotted to that job. How can I reproduce your issue on Flink with YARN? Thank you, Piper On Thu, Nov 21, 2019, 1:48 PM Kelly Smith wrote: > I’ve been running Flink in production

Re: Retrieving Flink job ID/YARN Id programmatically

2019-11-21 Thread Lei Nie
I looked at the code, and StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID is generating a random ID unrelated to the actual ID used. Is there any way to fetch the real ID at runtime? Use case: fetch most recent checkpoint from stable storage for automated restarts. Most recent check

Re: Retrieving Flink job ID/YARN Id programmatically

2019-11-21 Thread Lei Nie
This does not get the correct id: StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID = eea5abc21dd8743a4090f4a3a660f9e8 Actual job ID (from webUI): 1357d21be640b6a3b8a86a063f4bba8a On Thu, Nov 7, 2019 at 6:56 PM vino yang wrote: > > Hi Lei Nie, > > You can use `StreamExecutionEnviro

Metrics for Task States

2019-11-21 Thread Kelly Smith
I’ve been running Flink in production on EMR (YARN) for some time and have found the metrics system to be quite useful, but there is one specific case where I’m missing a signal for this scenario: * When a job has been submitted, but YARN does not have enough resources to provide Observed

Re: Savepoints and checkpoints

2019-11-21 Thread Yun Tang
Hi Min Since kafka consumer would store KafkaTopicPartition [1] within checkpoint, you cannot load previous state if you changed the kafka topic name. If you assign operator-id to previous stateful operator and splits into two operator but still maintain one new operator as previous operator-id

Re: Re:Apache Flink - Operator name and uuid best practices

2019-11-21 Thread M Singh
Thanks so much Congxian for your pointers - I will try to go through them.  Mans On Thursday, November 21, 2019, 07:26:49 AM EST, Congxian Qiu wrote: Hi1. I think this issue[1] can help to understand the directory layout2. chk-6 directory or the metafilePath, for more information, you

RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-21 Thread Hailu, Andreas
Thanks, Piotr. We’ll rerun our apps today with this and get back to you. // ah From: Piotr Nowojski On Behalf Of Piotr Nowojski Sent: Thursday, November 21, 2019 10:14 AM To: Hailu, Andreas [Engineering] Cc: Zhijiang ; user@flink.apache.org Subject: Re: CoGroup SortMerger performance degradatio

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-21 Thread Piotr Nowojski
Hi, I would suspect this: https://issues.apache.org/jira/browse/FLINK-12070 To be the source of the problems. There seems to be a hidden configuration option that avoids using memory mapped files: taskmanager.network.bounded-blocking-subparti

RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-21 Thread Hailu, Andreas
Hi Zhijiang, I looked into the container logs for the failure, and didn’t see any specific OutOfMemory errors before it was killed. I ran the application using the same config this morning on 1.6.4, and it went through successfully. I took a snapshot of the memory usage from the dashboard and c

Re: Completed job wasn't saved to archive

2019-11-21 Thread Chesnay Schepler
If the archiving fails there should be some log message, like "Failed to archive job" or "Could not archive completed job..." . If nothing of the sort is logged my first instinct would be that the operation is being slowed down, _a lot_. Where are you archiving them to? Could it be the write op

Re: Completed job wasn't saved to archive

2019-11-21 Thread Pavel Potseluev
Hi Vino, Usually Flink archives jobs correctly and the problem is rarely reproduced. So I think it isn't a problem with configuration. Job Manager log when job 5ec264a20bb5005cdbd8e23a5e59f136 was canceled:771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:52:13.294 [Checkpoint Timer] INFO  org.apa

Re: Re:Apache Flink - Operator name and uuid best practices

2019-11-21 Thread Congxian Qiu
Hi 1. I think this issue[1] can help to understand the directory layout 2. chk-6 directory or the metafilePath, for more information, you can ref to[2][3] 3. every checkpoint contains a meta-file record such, maybe you can ref to[4], and debug it for more information 4. currently, you need to go th

Re: Dynamically creating new Task Managers in YARN

2019-11-21 Thread Piper Piper
Hi Jingsong, Thank you for your reply! >Is this what you want? Piper. Yes. This is exactly what I want. Is there any way for me to specify to Flink RM how much of resources to ask YARN's RM for, and if we want Flink's RM to ask for resources proactively before it runs out? Similarly, is there a

Re: Cron style for checkpoint

2019-11-21 Thread Congxian Qiu
Hi thanks for your explanation, what you want is to disable periodic checkpoint in some time duration, and at other times the periodic checkpoint is doing as normal. Currently, Flink does not support this, as you've created an issue for this, we can track this in the issue side. for now, if you re

Re: Re:Apache Flink - Operator name and uuid best practices

2019-11-21 Thread M Singh
Hi Congxian: For my application i see many uuids under the chk-6 directory ( I posted one in the sample above).   I am trying to understand that if I restart the application with this checkpoint (which I believe I can just like a savepoint - I am using chk-6 as an example below)1. I believe eac

回复:Compound Time interval in SQL queries

2019-11-21 Thread 贺小令(晓令)
hi arujit, blink planner with flink-1.9 supports this query. the reason is both planners do not support complex expressions like INTERVAL '7' HOUR + INTERVAL '30' MINUTE when transforming window to LogicalWindowAggregate node now. why blink planner supports this query? the optimization orde

回复:Compound Time interval in SQL queries

2019-11-21 Thread 贺小令(晓令)
hi arujit, Which Flink version are you using? thanks, godfrey -- 发件人:Arujit Pradhan 发送时间:2019年11月21日(星期四) 17:21 收件人:贺小令(晓令) ; user 主 题:Re: Compound Time interval in SQL queries Hi, godfrey, Thanks for your reply. But now I am

Savepoints and checkpoints

2019-11-21 Thread min.tan
Hi, Are Flink savepoints and checkpoitns still vlaid after some data entity changes e.g. Kafka topic name changes? I expect the answer is "No"? Similarly, are Flink savepoints and checkpoitns still valid after some job graph changes e.g. one stateful operator splits into two? I expect the answer

Re: Compound Time interval in SQL queries

2019-11-21 Thread Arujit Pradhan
Hi, godfrey, Thanks for your reply. But now I am getting this error : *Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.table.api.TableException: Only constant window descriptors are supported.at com.gojek.daggers.KafkaProt

回复:Compound Time interval in SQL queries

2019-11-21 Thread 贺小令(晓令)
please try this approach: interval + interval like this: SELECT count(1) AS event_count , TUMBLE_END(rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE) AS window_timestamp FROM `data_stream` GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE) thanks, godfrey ---

Re: Cron style for checkpoint

2019-11-21 Thread shuwen zhou
Hi Yun and Congxian, I would actually want checkpoint to avoid being triggered on a certain time. It still remains as system mechanism just avoid being triggered at a certain range of time. Waiting for the checkpoint to timeout still waste CPU&disk IO resources since it was being triggered. I would

Re: StreamingFileSink duplicate data

2019-11-21 Thread Paul Lam
Hi, StreamingFileSink would not remove committed files, so if you use a non-latest checkpoint to restore state, you may need to perform a manual cleanup. WRT the part id issue, StreamingFileSink will track the global max part number, and use this value + 1 as the new id upon restoring. In this

Re: Cron style for checkpoint

2019-11-21 Thread Yun Tang
Hi Shuwen Conceptually, checkpoints in Flink behaves more like a system mechanism to achieve fault tolerance and transparent for users. On the other hand, savepoint in Flink behaves more like a user control behavior, can savepoint not satisfy your demands for crontab? Best Yun Tang From: Cong

Compound Time interval in SQL queries

2019-11-21 Thread Arujit Pradhan
Hi all, Is there a way to define a compound time interval(that can consist of both HOUR and MINUTE) in windows in a Flink SQL query. For example, we want to do something like this: SELECT count(1) AS event_count , TUMBLE_END(rowtime, INTERVAL '7' HOUR AND '30' MINUTE) AS window_timestamp FROM `da