Re: Start flink job from the latest checkpoint programmatically

2020-03-13 Thread Vijay Bhaskar
2 things you can do, stop flink job is going to generate savepoint. You need to save the save point directory path in some persistent store (because you are restarting the cluster, otherwise checkpoint monitoring api should give you save point file details) After spinning the cluster read the path

Implicit Flink Context Documentation

2020-03-13 Thread Padarn Wilson
Hi Users, I am trying to understand the details of how some aspects of Flink work. While understanding `keyed state` I kept coming up against a claim that `there is a specific key implicitly in context` I would like to understand how this works, which I'm guessing means understanding the details

[DISCUSS] FLIP-115: Filesystem connector in Table

2020-03-13 Thread Jingsong Li
Hi everyone, I'd like to start a discussion about FLIP-115 Filesystem connector in Table [1]. This FLIP will bring: - Introduce Filesystem table factory in table, support csv/parquet/orc/json/avro formats. - Introduce streaming filesystem/hive sink in table CC to user mail list, if you have any u

Stop job with savepoint during graceful shutdown on a k8s cluster

2020-03-13 Thread shravan
Job Manager , Task Manager are run as separate pods within K8S cluster in our setup. As job cluster is not used, job jars are not part of Job Manager docker image. The job is submitted from a different Flink client pod. Flink is configured with RocksDB state backend. The docker images are created

Re: [DISCUSS] FLIP-115: Filesystem connector in Table

2020-03-13 Thread Piotr Nowojski
Hi, Which actual sinks/sources are you planning to use in this feature? Is it about exposing StreamingFileSink in the Table API? Or do you want to implement new Sinks/Sources? Piotrek > On 13 Mar 2020, at 10:04, jinhai wang wrote: > > Thanks for FLIP-115. It is really useful feature for plat

Re: [DISCUSS] FLIP-115: Filesystem connector in Table

2020-03-13 Thread Yun Gao
Hi, Very thanks for Jinsong to bring up this discussion! It should largely improve the usability after enhancing the FileSystem connector in Table. I have the same question with Piotr. From my side, I think it should be better to be able to reuse existing StreamingFileSink.

Re: Stop job with savepoint during graceful shutdown on a k8s cluster

2020-03-13 Thread Vijay Bhaskar
Please find answers inline Our understanding is to stop job with savepoint, all the task manager will persist their state during savepoint. If a Task Manager receives a shutdown signal while savepoint is being taken, does it complete the savepoint before shutdown ? [Ans ] Why task manager is shutd

Restoring state from an incremental RocksDB checkpoint

2020-03-13 Thread Yuval Itzchakov
Hi, We're using RocksDB as a state backend. We've come to a situation where due to high backpressure in one of our operators, we can't make a savepoint complete. Since we have retained previous checkpoints, I was wondering if these would be eligible to serve as a restoration point, given that we

Re: Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

2020-03-13 Thread Piotr Nowojski
Hi, Generally speaking changes of parallelism is supported between checkpoints and savepoints. Other changes to the job’s topology, like adding/changing/removing operators, changing types in the job graph are only officially supported via savepoints. But in reality, as for now, there is no dif

Re: Communication between two queries

2020-03-13 Thread Piotr Nowojski
Hi, Could you explain a bit more what are you trying to achieve? One problem that pops into my head is that currently in Flink Streaming (it is possible for processing bounded data), there is no way to “not ingest” the data reliably in general case, as this might deadlock the upstream operator

Re: Cancel the flink task and restore from checkpoint ,can I change the flink operator's parallelism

2020-03-13 Thread Piotr Nowojski
Hi, Yes, you can change the parallelism. One thing that you can not change is “max parallelism”. Piotrek > On 13 Mar 2020, at 04:34, Sivaprasanna wrote: > > I think you can modify the operator’s parallelism. It is only if you have set > maxParallelism, and while restoring from a checkpoint,

Re: time-windowed joins and tumbling windows

2020-03-13 Thread Timo Walther
Hi Vinod, I cannot spot any problems in your SQL query. Some questions for clarification: 1) Which planner are you using? 2) How do you create your watermarks? 3) Did you unit test with only parallelism of 1 or higher? 4) Can you share the output of TableEnvironment.explain() with us? Shouldn't

Re: Implicit Flink Context Documentation

2020-03-13 Thread Piotr Nowojski
Hi, Please take a look for example here: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state And the example in particular https://ci.apache.org/projects/fl

Re: Communication between two queries

2020-03-13 Thread Mikael Gordani
Hi Piotr! Thanks for your response, I'll try to explain what I'm trying to achieve in more detail: Essentially, If I've two queries, in which has the same operators and runs in the same task, I would want to figure out some way of controlling the ingestion from *a source* to the respective queries

Re: Implicit Flink Context Documentation

2020-03-13 Thread Padarn Wilson
Thanks Piotr, Conceptually I understand (and use) the key'ed state quite a lot, but the implementation details are what I was looking for. It looks like `org.apache.flink.streaming.api.operators.AbstractStreamOperator#setKeyContextElement1` is what I'm looking for though. It would be cool if ther

Re: Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

2020-03-13 Thread Aaron Levin
Hi Piotr, Thanks for your response! I understand that checkpoints and savepoints may be diverging (for unaligned checkpoints) but parts also seem to be converging per FLIP-47[0]. Specifically, in FLIP-47 they state that rescaling is "Supported but not in all cases" for checkpoints. What I'm hoping

Re: [Survey] Default size for the new JVM Metaspace limit in 1.10

2020-03-13 Thread Andrey Zagrebin
Hi all, Bumping this topic. Poll about: *Increasing default JVM Metaspace size from 96Mb to 256Mb and* *Existing Flink 1.10 setups with small process memory size (~1GB)* The community discusses 1.10.1 bugfix release and whether to increase the default size for the JVM Metaspace size. So far incr

Re: Restoring state from an incremental RocksDB checkpoint

2020-03-13 Thread Andrey Zagrebin
Hi Yuval, You should be able to restore from the last checkpoint by restarting the job with the same checkpoint directory. An incremental part is removed only if none of retained checkpoints points to it. Best, Andrey > On 13 Mar 2020, at 16:06, Yuval Itzchakov wrote: > > Hi, > > We're usin

Re: Restoring state from an incremental RocksDB checkpoint

2020-03-13 Thread Andrey Zagrebin
As I understand you have already enabled retained checkpoints [1] because you can only restore from them in case of job cancellation to restart it. Just in case, here is also the link to docs about restoring from a retained checkpoint [2] and how to find path to it [3]. [1] https://ci.apache.or

Re: time-windowed joins and tumbling windows

2020-03-13 Thread Vinod Mehra
Thanks Timo for responding back! Answers below: > 1) Which planner are you using? We are using Flink 1.8 and using the default planner (org.apache.flink.table.calcite.FlinkPlannerImpl) from: org.apache.flink:flink-table-planner_2.11:1.8 > 2) How do you create your watermarks? We are using perio

Re: time-windowed joins and tumbling windows

2020-03-13 Thread Vinod Mehra
I wanted to add that when I used the following the watermark was delayed by 3 hours instead of 2 hours that I would have expected: AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime (time window constraint between o and c: 1st and 3rd table) Thanks, Vinod On Fri, Mar 13, 2020 at

Re: Very large _metadata file

2020-03-13 Thread Jacob Sevart
Running *Checkpoints.loadCheckpointMetadata *under a debugger, I found something: *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value *weights 43MB (5.3 million longs). "startup-times" is an operator state of mine (union list of java.time.Instant). I see

Re: Very large _metadata file

2020-03-13 Thread Jacob Sevart
Oh, I should clarify that's 43MB per partition, so with 48 partitions it explains my 2GB. On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart wrote: > Running *Checkpoints.loadCheckpointMetadata *under a debugger, I found > something: > *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("

FLIP 27 is not already, how can i workaround ?

2020-03-13 Thread forideal
Hello everyone Now i have a job with big state in RocksDB.This job's source is Kafka. If i want to replay data, the job will crash. One of the motivation of FLIP 27 is event time alignment , however , it is not already for me. How can i work around? Here is an immature solution, I don