Re: Start flink job from the latest checkpoint programmatically

2020-03-13 Thread Vijay Bhaskar
2 things you can do,

stop flink job is going to generate savepoint.
You need to save the save point directory path in some persistent store
(because you are restarting the cluster, otherwise checkpoint monitoring
api should give you save point file details)
After spinning the cluster read the path of the save point file, use flink
monitoring rest api to load the job using save point

You can also use the retained checkpoint as mentioned above.

Also another option is to give save point path manually as program argument
and respin your job.

Best,
Bhaskar


On Fri, Mar 13, 2020 at 5:47 AM Flavio Pompermaier 
wrote:

> Have you tried to retain checkpoints or use savepoints? Take a look at [1]
> and see if that can help.
>
> Best,
> Flavio
>
> [1] https://eventador.io/blog/apache-flink-checkpoints-and-savepoints/
>
> Il Ven 13 Mar 2020, 00:02 Eleanore Jin  ha
> scritto:
>
>> Hi All,
>>
>> The setup of my flink application is allow user to start and stop.
>>
>> The Flink job is running in job cluster (application jar is available to
>> flink upon startup). When stop a running application, it means exit the
>> program.
>>
>> When restart a stopped job, it means to spin up new job cluster with the
>> same application jar, but this essentially means a new flink job.
>>
>> I just wonder is there a way to let the restarted job resume from the
>> latest checkpoint from previous stopped flink job? And is there a way to
>> set it up programmatically in the application?
>>
>> Thanks a lot!
>> Eleanore
>>
>>
>>


Implicit Flink Context Documentation

2020-03-13 Thread Padarn Wilson
Hi Users,

I am trying to understand the details of how some aspects of Flink work.

While understanding `keyed state` I kept coming up against a claim that `there
is a specific key implicitly in context` I would like to understand how
this works, which I'm guessing means understanding the details of the
runtime context: Is there any documentation or FLIP someone can recommend
on this?


[DISCUSS] FLIP-115: Filesystem connector in Table

2020-03-13 Thread Jingsong Li
Hi everyone,

I'd like to start a discussion about FLIP-115 Filesystem connector in Table
[1].
This FLIP will bring:
- Introduce Filesystem table factory in table, support
csv/parquet/orc/json/avro formats.
- Introduce streaming filesystem/hive sink in table

CC to user mail list, if you have any unmet needs, please feel free to
reply~

Look forward to hearing from you.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

Best,
Jingsong Lee


Stop job with savepoint during graceful shutdown on a k8s cluster

2020-03-13 Thread shravan
Job Manager , Task Manager  are run as separate pods within K8S cluster in
our setup. As job cluster is not used, job jars are not part of Job Manager
docker image. The job is submitted from a different Flink client pod. Flink
is configured with RocksDB state backend. The docker images are created by
us as the base OS image needs to be compliant to our organization
guidelines. 

We are looking for a reliable approach to stop the job with savepoint during
graceful shutdown to avoid duplicates on restart. 
The Job Manager pod traps shutdown signal and stops all the jobs with
savepoints. The Flink client pod starts the job with savepoint on restart of
client pod. But as the order in which pods will be shutdown is not
predictable, we have following queries,
1.  Our understanding is to stop job with savepoint, all the task manager
will persist their state during savepoint. If a Task Manager receives a
shutdown signal while savepoint is being taken, does it complete the
savepoint before shutdown ?
2.  The job manager K8S service is configured as remote job manager address
in Task Manager. This service may not be available during savepoint,  will
this affect the communication between Task Manager and Job Manager during
savepoint ?

Can you provide some pointers on the internals of savepoint in Flink ? 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] FLIP-115: Filesystem connector in Table

2020-03-13 Thread Piotr Nowojski
Hi,

Which actual sinks/sources are you planning to use in this feature? Is it about 
exposing StreamingFileSink in the Table API? Or do you want to implement new 
Sinks/Sources?

Piotrek

> On 13 Mar 2020, at 10:04, jinhai wang  wrote:
> 
> Thanks for FLIP-115. It is really useful feature for platform developers who 
> manage hundreds of Flink to Hive jobs in production.
> I think we need add 'connector.sink.username' for UserGroupInformation when 
> data is written to HDFS
> 
> 
> 在 2020/3/13 下午3:33,“Jingsong Li” 写入:
> 
>Hi everyone,
> 
>I'd like to start a discussion about FLIP-115 Filesystem connector in Table
>[1].
>This FLIP will bring:
>- Introduce Filesystem table factory in table, support
>csv/parquet/orc/json/avro formats.
>- Introduce streaming filesystem/hive sink in table
> 
>CC to user mail list, if you have any unmet needs, please feel free to
>reply~
> 
>Look forward to hearing from you.
> 
>[1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
> 
>Best,
>Jingsong Lee
> 
> 
> 



Re: [DISCUSS] FLIP-115: Filesystem connector in Table

2020-03-13 Thread Yun Gao
   Hi,
   Very thanks for Jinsong to bring up this discussion! It should largely 
improve the usability after enhancing the FileSystem connector in Table. 

   I have the same question with Piotr. From my side, I think it should be 
better to be able to reuse existing StreamingFileSink. I think We have began 
   enhancing the supported FileFormat (e.g., ORC, Avro...), and reusing 
StreamFileSink should be able to avoid repeat work in the Table library. 
Besides, 
   the bucket concept seems also matches the semantics of partition. 

   For the notification of adding partitions, I'm a little wondering that 
the Watermark mechanism might not be enough since Bucket/Partition might spans
   multiple subtasks. It depends on the level of notification: if we want 
to notify for the bucket on each subtask, using watermark to notifying each 
subtask
   should be ok, but if we want to notifying for the whole 
Bucket/Partition, we might need to also do some coordination between subtasks. 


 Best, 
  Yun




--
From:Piotr Nowojski 
Send Time:2020 Mar. 13 (Fri.) 18:03
To:dev 
Cc:user ; user-zh 
Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table

Hi,

Which actual sinks/sources are you planning to use in this feature? Is it about 
exposing StreamingFileSink in the Table API? Or do you want to implement new 
Sinks/Sources?

Piotrek

> On 13 Mar 2020, at 10:04, jinhai wang  wrote:
> 
> Thanks for FLIP-115. It is really useful feature for platform developers who 
> manage hundreds of Flink to Hive jobs in production.
> I think we need add 'connector.sink.username' for UserGroupInformation when 
> data is written to HDFS
> 
> 
>  在 2020/3/13 下午3:33,“Jingsong Li” 写入:
> 
>Hi everyone,
> 
>I'd like to start a discussion about FLIP-115 Filesystem connector in Table
>[1].
>This FLIP will bring:
>- Introduce Filesystem table factory in table, support
>csv/parquet/orc/json/avro formats.
>- Introduce streaming filesystem/hive sink in table
> 
>CC to user mail list, if you have any unmet needs, please feel free to
>reply~
> 
>Look forward to hearing from you.
> 
>[1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
> 
>Best,
>Jingsong Lee
> 
> 
> 



Re: Stop job with savepoint during graceful shutdown on a k8s cluster

2020-03-13 Thread Vijay Bhaskar
Please find answers inline

Our understanding is to stop job with savepoint, all the task manager
will persist their state during savepoint. If a Task Manager receives a
shutdown signal while savepoint is being taken, does it complete the
savepoint before shutdown ?
[Ans ] Why task manager is shutdown suddenly? Are you saying about handling
unpredictable shutdown while taking
savepoint? In that case You can also use retained check point. In case
current checkpoint has issues because of shutdown
you will have previous checkpoint. So that you can use it. Now you will
have 2 options, either savepoint/checkpoint. One of them
will always be available.

The job manager K8S service is configured as remote job manager address
in Task Manager. This service may not be available during savepoint,  will
this affect the communication between Task Manager and Job Manager during
savepoint ?
[Ans] you can go for HA right? Where you can run more than one jobmanager
so that one is always service is available




On Fri, Mar 13, 2020 at 2:40 PM shravan 
wrote:

> Job Manager , Task Manager  are run as separate pods within K8S cluster in
> our setup. As job cluster is not used, job jars are not part of Job Manager
> docker image. The job is submitted from a different Flink client pod. Flink
> is configured with RocksDB state backend. The docker images are created by
> us as the base OS image needs to be compliant to our organization
> guidelines.
>
> We are looking for a reliable approach to stop the job with savepoint
> during
> graceful shutdown to avoid duplicates on restart.
> The Job Manager pod traps shutdown signal and stops all the jobs with
> savepoints. The Flink client pod starts the job with savepoint on restart
> of
> client pod. But as the order in which pods will be shutdown is not
> predictable, we have following queries,
> 1.  Our understanding is to stop job with savepoint, all the task
> manager
> will persist their state during savepoint. If a Task Manager receives a
> shutdown signal while savepoint is being taken, does it complete the
> savepoint before shutdown ?
> 2.  The job manager K8S service is configured as remote job manager
> address
> in Task Manager. This service may not be available during savepoint,  will
> this affect the communication between Task Manager and Job Manager during
> savepoint ?
>
> Can you provide some pointers on the internals of savepoint in Flink ?
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Restoring state from an incremental RocksDB checkpoint

2020-03-13 Thread Yuval Itzchakov
Hi,

We're using RocksDB as a state backend. We've come to a situation where due
to high backpressure in one of our operators, we can't make a savepoint
complete.

Since we have retained previous checkpoints, I was wondering if these would
be eligible to serve as a restoration point, given that we are taking
advantage of RocksDBs incremental snapshot capability, I was unsure. Would
the incremental snapshot be missing data? or do they point to the remaining
parts of previous checkpoints?


Re: Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

2020-03-13 Thread Piotr Nowojski
Hi,

Generally speaking changes of parallelism is supported between checkpoints and 
savepoints. Other changes to the job’s topology, like adding/changing/removing 
operators, changing types in the job graph are only officially supported via 
savepoints.

But in reality, as for now, there is no difference between checkpoints and 
savepoints, but that’s subject to change, so it’s better not to relay this 
behaviour. For example with unaligned checkpoints [1] (hopefully in 1.11), 
there will be a difference between those two concepts.

Piotrek

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
 


> On 12 Mar 2020, at 12:16, Aaron Levin  wrote:
> 
> Hi,
> 
> What's the expected behaviour of:
> 
> * changing an operator's parallelism
> * deploying this change from an incremental (RocksDB) checkpoint instead of a 
> savepoint
> 
> The flink docs[0][1] are a little unclear on what the expected behaviour is 
> here. I understand that the key-space is being changed because parallelism is 
> changed. I've seen instances where this happens and a job does not fail. But 
> how does it treat potentially missing state for a given key? 
> 
> I know I can test this, but I'm curious what the _expected_ behaviour is? 
> I.e. what behaviour can I rely on, which won't change between versions or 
> releases? Do we expect the job to fail? Do we expect missing keys to just be 
> considered empty? 
> 
> Thanks!
> 
> [0] 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint
>  
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/upgrading.html
>  
> 
> 
> Aaron Levin



Re: Communication between two queries

2020-03-13 Thread Piotr Nowojski
Hi,

Could you explain a bit more what are you trying to achieve? 

One problem that pops into my head is that currently in Flink Streaming (it is 
possible for processing bounded data), there is no way to “not ingest” the data 
reliably in general case, as this might deadlock the upstream operator once the 
output buffers will fill out. However instead, you can for example filter 
out/ignore records until some condition is met.

BroadcastState works for one single operator (and it’s parallel instances) - it 
doesn’t automatically communicate with any upstream/downstream operators - you 
have to wire/connect your operators and distribute the information as you want 
to. For examples how does it work you can take a look at this ITCase for 
example [1].

What you could do, is create following job topology using side outputs [2]:

Src1 -> OP1 -> broadcast_side_output  
 | 
V
Sink1

And use BroadcastProcessFunction to read Src1 and  broadcast_side_output.

Src1 +  broadcast_side_output -> OP2 -> Sink2

But as I wrote before, you have to be careful in OP2. If both OP1 and OP2 are 
reading from the same data stream Src1, if you stop reading records from Src1 
in OP2, you eventually deadlock Src1 itself. Solution for that, would be to 
create second instance of Src1 operator, that would read records from the 
external system second time:

Src1" +  broadcast_side_output -> OP2 -> Sink2   

Piotrek

[1] 
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
 

 

> On 12 Mar 2020, at 12:53, Mikael Gordani  wrote:
> 
> Hello everyone!
> 
> So essentially, I've two identical queries (q1 and q2) running in parallel 
> (Streams).
> I'm trying to activate the ingestion of data to q2 based on what is processed 
> in q1. 
> E.g say that we want to start ingesting data to q2 when a tuple with 
> timestamp > 5000 appears in q1.
> 
> The queries are constructed in this way. (they share the same source)
> q1: Source -> Filter -> Aggregate -> Filter -> Sink
> | 
>V
> q2:  Filter -> Filter -> Aggregate -> Filter -> Sink
> 
> The initial idea was to have a global variable which is shared between the 
> two queries. When this tuple appears in q1, it will set the variable to true 
> in the first Filter operator. While in q2, the first Filter-operator returns 
> tuples depending on the value of the global variable. 
> When the variable = true, it will let data pass, when set to false, no data 
> is allowed to be ingested.
> 
> This works fine when you have all the tasks on the same machine, but of 
> course, it becomes troublesome in distributed deployments (tasks in different 
> nodes and such).
> 
> My second approach was to create some sort of "loop" in the query. So let's 
> say that we have the processing logic placed in the last Filter operator in 
> q1, and when this "special" tuple appears, it can communicate with the first 
> Filter operator in q2, in order to allow data to be ingested.
> I've tried playing around with IterativeStreams but I don't really get it to 
> work, and I feel like it's the wrong approach..
> 
> How can I achieve this sort of functionality? 
> I'm looking a bit on the BroadcastState part of the DataStream API, but I 
> feel confused on how to use it. Is it possible to broadcast from a downstream 
> to an upstream?
> Suggestions would be much appreciated!
> 
> Best Regards,
> Mikael Gordani



Re: Cancel the flink task and restore from checkpoint ,can I change the flink operator's parallelism

2020-03-13 Thread Piotr Nowojski
Hi,

Yes, you can change the parallelism. One thing that you can not change is “max 
parallelism”.

Piotrek

> On 13 Mar 2020, at 04:34, Sivaprasanna  wrote:
> 
> I think you can modify the operator’s parallelism. It is only if you have set 
> maxParallelism, and while restoring from a checkpoint, you shouldn’t modify 
> the maxParallelism. Otherwise, I believe the state will be lost.
> 
> -
> Sivaprasanna 
> 
> On Fri, 13 Mar 2020 at 9:01 AM, LakeShen  > wrote:
> Hi community,
>   I have a question is that I cancel the flink task and retain the 
> checkpoint dir, then restore from the checkpoint dir ,can I change the flink 
> operator's parallelism,in my thoughts, I think I can't change the flink 
> operator's parallelism,but I am not sure.
>  Thanks to your reply.
> 
> Best wishes,
> LakeShen



Re: time-windowed joins and tumbling windows

2020-03-13 Thread Timo Walther

Hi Vinod,

I cannot spot any problems in your SQL query.

Some questions for clarification:
1) Which planner are you using?
2) How do you create your watermarks?
3) Did you unit test with only parallelism of 1 or higher?
4) Can you share the output of TableEnvironment.explain() with us?

Shouldn't c have a rowtime constraint around o instead of r? Such that 
all time-based operations work on o.rowtime?


Regards,
Timo


On 10.03.20 19:26, Vinod Mehra wrote:

Hi!

We are testing the following 3 way time windowed join to keep the 
retained state size small. Using joins for the first time here. It works 
in unit tests but we are not able to get expected results in production. 
We are still troubleshooting this issue. Can you please help us review 
this in case we missed something or our assumptions are wrong?


SELECT o.region_code,
concat_ws(
  '/',
  CAST(sum(CASE WHEN r.order_idIS NOT NULL AND c.order_idIS NULL THEN 1 
ELSE 0 END)AS VARCHAR),
  CAST(count(1)AS VARCHAR)
)AS offer_conversion_5m
   FROM (
 SELECT region_code,
offer_id,
rowtime
   FROM event_offer_created
  WHERE ...
) o
LEFT JOIN (
 SELECT offer_id,
order_id,
rowtime
   FROM event_order_requested
  WHERE ...
) r
  ON o.offer_id = r.offer_id
  AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1' hour
LEFT JOIN (
 SELECT order_id,
rowtime
   FROM event_order_cancelled
  WHERE ...
)c
ON r.order_id =c.order_id
  AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1' hour
GROUP BY
o.region_code,
TUMBLE(o.rowtime,INTERVAL '5' minute)


The sequence of events is:

 1. At time X an offer is created (event stream = "*event_offer_created"*)
 2. At time Y that offer is used to create an order (event stream =
"*event_order_requested*"). Left join because not all offers get used.
 3. At time Z that order is cancelled (event stream =
"*event_order_cancelled*"). Left join because not all orders get
cancelled.

"*offer_conversion_5m*" represents: number of converted orders / total 
number of offerings" in a 5 minutes bucket. If an order gets cancelled 
we don't want to count that. That's why we have [c.order_id IS NULL THEN 
1 ELSE 0 END] in the select.


We picked 1 hour time windows because that's the maximum time we expect 
the successive events to take for a given record chain.


The outer GROUP BY is to get 5 minute aggregation for each "region". As 
expected the watermark lags 2 hour from the current time because of the 
two time-window joins above. The IdleStateRetentionTime is not set, so 
the expectation is that the state will be retained as per the time 
window size and as the records fall off the window the state will be 
cleaned up. The aggregated state is expected to be kept around for 5 
minutes (GROUP BY).


However, we are unable to see the conversion (offer_created -> 
order_requested (without order_cancelled)). '*offer_conversion_5m*' is 
always zero although we know the streams contain records that should 
have incremented the count. Any idea what could be wrong? Is the state 
being dropped too early (5 mins) because of the outer 5 minute tumbling 
window?


Thanks,
Vinod




Re: Implicit Flink Context Documentation

2020-03-13 Thread Piotr Nowojski
Hi,

Please take a look for example here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state
 

And the example in particular
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state
 


The part about "there is a specific key implicitly in context” might be 
referring to the fact, that for every instance of `CountWindowAverage` that 
will be running in the cluster, user doesn’t have to set the key context 
explicility. Flink will set the the key context automatically for the 
`ValueState> sum;` before any invocation of 
`CountWindowAverage#flatMap` method.

In other words, one parallel instance of `CountWindowAverage` function, for two 
consecutive invocations of `CountWindowAverage#flatMap` can be referring to 
different underlying value of `CountWindowAverage#sum` field. For details you 
could take a look at 
`org.apache.flink.streaming.api.operators.AbstractStreamOperator#setKeyContextElement1`
 method and how it’s being used/implemented.

I hope that helps.

Piotrek

> On 13 Mar 2020, at 08:20, Padarn Wilson  wrote:
> 
> Hi Users,
> 
> I am trying to understand the details of how some aspects of Flink work.
> 
> While understanding `keyed state` I kept coming up against a claim that 
> `there is a specific key implicitly in context` I would like to understand 
> how this works, which I'm guessing means understanding the details of the 
> runtime context: Is there any documentation or FLIP someone can recommend on 
> this?  



Re: Communication between two queries

2020-03-13 Thread Mikael Gordani
Hi Piotr!
Thanks for your response, I'll try to explain what I'm trying to achieve in
more detail:

Essentially, If I've two queries, in which has the same operators and runs
in the same task, I would want to figure out some way of controlling the
ingestion from *a source* to the respective queries in such a way that only
one of the queries receive data, based on a condition.
For more context, the second query (query2), is equipped with instrumented
operators, which are standard operators extended with some extra
functionality, in my case, they enrich the tuples with meta-data.

Source --> *Filter1* ---> rest of query1
   |
   v
   *Filter2* ---> rest of query2

By using *filters* prior to the queries, they allow records to pass
depending on a condition*, *let's say a global boolean variable (which is
initially set to false).
If it's set to *true, Filter1 will accept every record and Filter2 will
disregard every record.*
If it's set to
*false, Filter2 will accept every record and Filter1 will disregard every
record.*

*So the filter operators looks something like this: *

boolean global_var = false;

private static class filter1 implements FilterFunction {
@Override
public boolean filter(Tuple t) throws Exception {
return !global_var;
}
}

private static class filter2 implements FilterFunction {
@Override
public boolean filter(Tuple t) throws Exception {
return global_var;
}
}


Then later on, in the respective queries, there are some processing logic
in which changes the value of the global variable, thus enabling and
disabling the flow of data from the source to the respective queries.
The problem lies in this global variable being problematic in distributed
deployments, in which I'm having a hard time figuring out how to solve.
Is it a bit more clear? =)


Re: Implicit Flink Context Documentation

2020-03-13 Thread Padarn Wilson
Thanks Piotr,

Conceptually I understand (and use) the key'ed state quite a lot, but the
implementation details are what I was looking for.

It looks like
`org.apache.flink.streaming.api.operators.AbstractStreamOperator#setKeyContextElement1`
is what I'm looking for though. It would be cool if there were some
internals design doc however? Quite hard to dig through the code as there
is a log tied to how the execution of the job actually happens.

Padarn

On Fri, Mar 13, 2020 at 9:43 PM Piotr Nowojski  wrote:

> Hi,
>
> Please take a look for example here:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state
> And the example in particular
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state
>
> The part about "there is a specific key implicitly in context” might be
> referring to the fact, that for every instance of `CountWindowAverage` that
> will be running in the cluster, user doesn’t have to set the key context
> explicility. Flink will set the the key context automatically for the
> `ValueState> sum;` before any invocation of
> `CountWindowAverage#flatMap` method.
>
> In other words, one parallel instance of `CountWindowAverage` function,
> for two consecutive invocations of `CountWindowAverage#flatMap` can be
> referring to different underlying value of `CountWindowAverage#sum` field.
> For details you could take a look at
> `org.apache.flink.streaming.api.operators.AbstractStreamOperator#setKeyContextElement1`
> method and how it’s being used/implemented.
>
> I hope that helps.
>
> Piotrek
>
> On 13 Mar 2020, at 08:20, Padarn Wilson  wrote:
>
> Hi Users,
>
> I am trying to understand the details of how some aspects of Flink work.
>
> While understanding `keyed state` I kept coming up against a claim that `there
> is a specific key implicitly in context` I would like to understand how
> this works, which I'm guessing means understanding the details of the
> runtime context: Is there any documentation or FLIP someone can recommend
> on this?
>
>
>


Re: Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

2020-03-13 Thread Aaron Levin
Hi Piotr,

Thanks for your response! I understand that checkpoints and savepoints may
be diverging (for unaligned checkpoints) but parts also seem to be
converging per FLIP-47[0]. Specifically, in FLIP-47 they state that
rescaling is "Supported but not in all cases" for checkpoints. What I'm
hoping to find is guidance or documentation on when rescaling is supported
for checkpoints, and, more importantly, if the cases where it's not
supported will result in hard or silent failures.

The context here is that we rely on the exactly-once semantics for our
Flink jobs in some important systems. In some cases when a job is in a bad
state it may not be able to take a checkpoint, but changing the job's
parallelism may resolve the issue. Therefore it's important for us to know
if deploying from a checkpoint, on purpose or by operator error, will break
the semantic guarantees of our job.

Hard failure in the cases where you cannot change parallelism would be the
desired outcome imo.

Thank you!

[0]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-47%3A+Checkpoints+vs.+Savepoints

Best,

Aaron Levin

On Fri, Mar 13, 2020 at 9:08 AM Piotr Nowojski  wrote:

> Hi,
>
> Generally speaking changes of parallelism is supported between checkpoints
> and savepoints. Other changes to the job’s topology, like
> adding/changing/removing operators, changing types in the job graph are
> only officially supported via savepoints.
>
> But in reality, as for now, there is no difference between checkpoints and
> savepoints, but that’s subject to change, so it’s better not to relay this
> behaviour. For example with unaligned checkpoints [1] (hopefully in 1.11),
> there will be a difference between those two concepts.
>
> Piotrek
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> 
>
> On 12 Mar 2020, at 12:16, Aaron Levin  wrote:
>
> Hi,
>
> What's the expected behaviour of:
>
> * changing an operator's parallelism
> * deploying this change from an incremental (RocksDB) checkpoint instead
> of a savepoint
>
> The flink docs[0][1] are a little unclear on what the expected behaviour
> is here. I understand that the key-space is being changed because
> parallelism is changed. I've seen instances where this happens and a job
> does not fail. But how does it treat potentially missing state for a given
> key?
>
> I know I can test this, but I'm curious what the _expected_ behaviour is?
> I.e. what behaviour can I rely on, which won't change between versions or
> releases? Do we expect the job to fail? Do we expect missing keys to just
> be considered empty?
>
> Thanks!
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/upgrading.html
>
> Aaron Levin
>
>
>


Re: [Survey] Default size for the new JVM Metaspace limit in 1.10

2020-03-13 Thread Andrey Zagrebin
Hi all,

Bumping this topic. Poll about:

*Increasing default JVM Metaspace size from 96Mb to 256Mb and*
*Existing Flink 1.10 setups with small process memory size (~1GB)*

The community discusses 1.10.1 bugfix release and whether to increase the
default size for the JVM Metaspace size.
So far increasing this setting from 96Mb to 256Mb helped in all reported
cases where the default value of 96m was not enough.

Increasing the default value can affect already existing Flink 1.10 setups,
especially the case where the process memory size is explicitly set to some
relatively small value, e.g. around 1GB,
but the JVM Metaspace is not. This can lead to the decreased size of the
Flink memory and all its components, e.g. JVM heap and managed memory.

The question is how many important setups like this (with small process
memory size) already exist to investigate how badly they will be affected
by the suggested change.
Any feedback is appreciated.

Best,
Andrey

On Tue, Mar 3, 2020 at 6:35 PM Andrey Zagrebin  wrote:

> Hi All,
>
> Recently, FLIP-49 [1] introduced the new JVM Metaspace limit in the 1.10
> release [2]. Flink scripts, which start the task manager JVM process, set
> this limit by adding the corresponding JVM argument. This has been done to
> properly plan resources. especially to derive container size for
> Yarn/Mesos/Kubernetes. Also, it should surface potential class loading
> leaks. There is an option to change it:
> 'taskmanager.memory.jvm-metaspace.size' [3]. Its current default value is
> 96Mb.
>
> This change led to 'OutOfMemoryError: Metaspace' in certain cases after
> upgrading to 1.10 version. In some cases, a class loading leak has been
> detected [4] and has to be investigated on its own. In other cases, just
> increasing the option value helped because the default value was not
> enough, presumably, due to the job specifics. In general, the required
> Metaspace size depends on the job and there is no default value to cover
> all cases. There is an issue to improve docs for this concern [5].
>
> This survey is to come up with the most reasonable default value for this
> option. If you have encountered this issue and increasing the Metaspace
> size helped (there is no class loading leak), please, report any specifics
> of your job, if you think it is relevant for this concern, and the option
> value that resolved it. There is also a dedicated Jira issue [6] for
> reporting.
>
> Thanks,
> Andrey
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#jvm-parameters
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-jvm-metaspace-size
> [4] https://issues.apache.org/jira/browse/FLINK-16142
> [5] https://issues.apache.org/jira/browse/FLINK-16278
> [6] https://jira.apache.org/jira/browse/FLINK-16406
>


Re: Restoring state from an incremental RocksDB checkpoint

2020-03-13 Thread Andrey Zagrebin
Hi Yuval,

You should be able to restore from the last checkpoint by restarting the job 
with the same checkpoint directory.
An incremental part is removed only if none of retained checkpoints points to 
it.

Best,
Andrey

> On 13 Mar 2020, at 16:06, Yuval Itzchakov  wrote:
> 
> Hi,
> 
> We're using RocksDB as a state backend. We've come to a situation where due 
> to high backpressure in one of our operators, we can't make a savepoint 
> complete.
> 
> Since we have retained previous checkpoints, I was wondering if these would 
> be eligible to serve as a restoration point, given that we are taking 
> advantage of RocksDBs incremental snapshot capability, I was unsure. Would 
> the incremental snapshot be missing data? or do they point to the remaining 
> parts of previous checkpoints?



Re: Restoring state from an incremental RocksDB checkpoint

2020-03-13 Thread Andrey Zagrebin
As I understand you have already enabled retained checkpoints [1] because you 
can only restore from them in case of job cancellation to restart it.
Just in case, here is also the link to docs about restoring from a retained 
checkpoint [2] and how to find path to it [3].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#retained-checkpoints
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
 

[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
 


> On 14 Mar 2020, at 00:12, Andrey Zagrebin  wrote:
> 
> Hi Yuval,
> 
> You should be able to restore from the last checkpoint by restarting the job 
> with the same checkpoint directory.
> An incremental part is removed only if none of retained checkpoints points to 
> it.
> 
> Best,
> Andrey
> 
>> On 13 Mar 2020, at 16:06, Yuval Itzchakov  wrote:
>> 
>> Hi,
>> 
>> We're using RocksDB as a state backend. We've come to a situation where due 
>> to high backpressure in one of our operators, we can't make a savepoint 
>> complete.
>> 
>> Since we have retained previous checkpoints, I was wondering if these would 
>> be eligible to serve as a restoration point, given that we are taking 
>> advantage of RocksDBs incremental snapshot capability, I was unsure. Would 
>> the incremental snapshot be missing data? or do they point to the remaining 
>> parts of previous checkpoints?
> 



Re: time-windowed joins and tumbling windows

2020-03-13 Thread Vinod Mehra
Thanks Timo for responding back! Answers below:

> 1) Which planner are you using?

We are using Flink 1.8 and using the default planner
(org.apache.flink.table.calcite.FlinkPlannerImpl)
from: org.apache.flink:flink-table-planner_2.11:1.8

> 2) How do you create your watermarks?

We are using periodic watermarking and have configured stream time
characteristics as TimeCharacteristic.EventTime. The watermark assigner
extracts the timestamp from time attributes from the event and keeps it 5
seconds behind the maximum timestamp seen in order to allow for stale
events.

> 3) Did you unit test with only parallelism of 1 or higher?

I tried both 1 and higher values in tests and for all parallelism values
the unit tests works as expected.

4) Can you share the output of TableEnvironment.explain() with us?

Attached. Please note that I had obfuscated the query a bit in my original
post for clarity. I have pasted the actual query along with the plan so
that you can correlate it.

> Shouldn't c have a rowtime constraint around o instead of r? Such that
all time-based operations work on o.rowtime?

I have tried both (and some more variations). Got the same results (unit
tests passes but production execution doesn't join as expected). Here is
the modified query:

SELECT o.region_code,
   concat_ws(
 '/',
 CAST(sum(CASE WHEN r.order_id IS NOT NULL AND c.order_id IS
NULL THEN 1 ELSE 0 END) AS VARCHAR),
 CAST(count(1) AS VARCHAR)
   ) AS offer_conversion_5m
  FROM (
SELECT region_code,
   offer_id,
   rowtime
  FROM event_offer_created
 WHERE ...
) o
   LEFT JOIN (
SELECT offer_id,
   order_id,
   rowtime
  FROM event_order_requested
 WHERE ...
) r
 ON o.offer_id = r.offer_id
 AND o.rowtime BETWEEN r.rowtime - INTERVAL '1' hour AND r.rowtime

   LEFT JOIN (
SELECT order_id,
   rowtime
  FROM event_order_cancelled
 WHERE ...
) c
 ON r.order_id = c.order_id
 AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime

 GROUP BY
   o.region_code,
   TUMBLE(o.rowtime, INTERVAL '5' minute)


We used minus two hours ("c.rowtime - INTERVAL '2' hour")  in the 2nd time
window because it is from the first table and 3rd one.

-- Vinod

On Fri, Mar 13, 2020 at 6:42 AM Timo Walther  wrote:

> Hi Vinod,
>
> I cannot spot any problems in your SQL query.
>
> Some questions for clarification:
> 1) Which planner are you using?
> 2) How do you create your watermarks?
> 3) Did you unit test with only parallelism of 1 or higher?
> 4) Can you share the output of TableEnvironment.explain() with us?
>
> Shouldn't c have a rowtime constraint around o instead of r? Such that
> all time-based operations work on o.rowtime?
>
> Regards,
> Timo
>
>
> On 10.03.20 19:26, Vinod Mehra wrote:
> > Hi!
> >
> > We are testing the following 3 way time windowed join to keep the
> > retained state size small. Using joins for the first time here. It works
> > in unit tests but we are not able to get expected results in production.
> > We are still troubleshooting this issue. Can you please help us review
> > this in case we missed something or our assumptions are wrong?
> >
> > SELECT o.region_code,
> > concat_ws(
> >   '/',
> >   CAST(sum(CASE WHEN r.order_idIS NOT NULL AND c.order_idIS NULL
> THEN 1 ELSE 0 END)AS VARCHAR),
> >   CAST(count(1)AS VARCHAR)
> > )AS offer_conversion_5m
> >FROM (
> >  SELECT region_code,
> > offer_id,
> > rowtime
> >FROM event_offer_created
> >   WHERE ...
> > ) o
> > LEFT JOIN (
> >  SELECT offer_id,
> > order_id,
> > rowtime
> >FROM event_order_requested
> >   WHERE ...
> > ) r
> >   ON o.offer_id = r.offer_id
> >   AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1' hour
> > LEFT JOIN (
> >  SELECT order_id,
> > rowtime
> >FROM event_order_cancelled
> >   WHERE ...
> > )c
> > ON r.order_id =c.order_id
> >   AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1' hour
> > GROUP BY
> > o.region_code,
> > TUMBLE(o.rowtime,INTERVAL '5' minute)
> >
> >
> > The sequence of events is:
> >
> >  1. At time X an offer is created (event stream =
> "*event_offer_created"*)
> >  2. At time Y that offer is used to create an order (event stream =
> > "*event_order_requested*"). Left join because not all offers get
> used.
> >  3. At time Z that order is cancelled (event stream =
> > "*event_order_cancelled*"). Left join because not all orders get
> > cancelled.
> >
> > "*offer_conversion_5m*" represents: number of converted orders / total
> > number of offerings" in a 5 minutes bucket. If an order gets cancelled
> > we don't want to count that. That's why we have [c.order_id IS NULL 

Re: time-windowed joins and tumbling windows

2020-03-13 Thread Vinod Mehra
I wanted to add that when I used the following the watermark was delayed by
3 hours instead of 2 hours that I would have expected:

AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime

(time window constraint between o and c: 1st and 3rd table)

Thanks,
Vinod

On Fri, Mar 13, 2020 at 3:56 PM Vinod Mehra  wrote:

> Thanks Timo for responding back! Answers below:
>
> > 1) Which planner are you using?
>
> We are using Flink 1.8 and using the default planner
> (org.apache.flink.table.calcite.FlinkPlannerImpl)
> from: org.apache.flink:flink-table-planner_2.11:1.8
>
> > 2) How do you create your watermarks?
>
> We are using periodic watermarking and have configured stream time
> characteristics as TimeCharacteristic.EventTime. The watermark assigner
> extracts the timestamp from time attributes from the event and keeps it 5
> seconds behind the maximum timestamp seen in order to allow for stale
> events.
>
> > 3) Did you unit test with only parallelism of 1 or higher?
>
> I tried both 1 and higher values in tests and for all parallelism values
> the unit tests works as expected.
>
> 4) Can you share the output of TableEnvironment.explain() with us?
>
> Attached. Please note that I had obfuscated the query a bit in my original
> post for clarity. I have pasted the actual query along with the plan so
> that you can correlate it.
>
> > Shouldn't c have a rowtime constraint around o instead of r? Such that
> all time-based operations work on o.rowtime?
>
> I have tried both (and some more variations). Got the same results (unit
> tests passes but production execution doesn't join as expected). Here is
> the modified query:
>
> SELECT o.region_code,
>concat_ws(
>  '/',
>  CAST(sum(CASE WHEN r.order_id IS NOT NULL AND c.order_id IS NULL 
> THEN 1 ELSE 0 END) AS VARCHAR),
>  CAST(count(1) AS VARCHAR)
>) AS offer_conversion_5m
>   FROM (
> SELECT region_code,
>offer_id,
>rowtime
>   FROM event_offer_created
>  WHERE ...
> ) o
>LEFT JOIN (
> SELECT offer_id,
>order_id,
>rowtime
>   FROM event_order_requested
>  WHERE ...
> ) r
>  ON o.offer_id = r.offer_id
>  AND o.rowtime BETWEEN r.rowtime - INTERVAL '1' hour AND r.rowtime
>
>LEFT JOIN (
> SELECT order_id,
>rowtime
>   FROM event_order_cancelled
>  WHERE ...
> ) c
>  ON r.order_id = c.order_id
>  AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime
>
>  GROUP BY
>o.region_code,
>TUMBLE(o.rowtime, INTERVAL '5' minute)
>
>
> We used minus two hours ("c.rowtime - INTERVAL '2' hour")  in the 2nd
> time window because it is from the first table and 3rd one.
>
> -- Vinod
>
> On Fri, Mar 13, 2020 at 6:42 AM Timo Walther  wrote:
>
>> Hi Vinod,
>>
>> I cannot spot any problems in your SQL query.
>>
>> Some questions for clarification:
>> 1) Which planner are you using?
>> 2) How do you create your watermarks?
>> 3) Did you unit test with only parallelism of 1 or higher?
>> 4) Can you share the output of TableEnvironment.explain() with us?
>>
>> Shouldn't c have a rowtime constraint around o instead of r? Such that
>> all time-based operations work on o.rowtime?
>>
>> Regards,
>> Timo
>>
>>
>> On 10.03.20 19:26, Vinod Mehra wrote:
>> > Hi!
>> >
>> > We are testing the following 3 way time windowed join to keep the
>> > retained state size small. Using joins for the first time here. It
>> works
>> > in unit tests but we are not able to get expected results in
>> production.
>> > We are still troubleshooting this issue. Can you please help us review
>> > this in case we missed something or our assumptions are wrong?
>> >
>> > SELECT o.region_code,
>> > concat_ws(
>> >   '/',
>> >   CAST(sum(CASE WHEN r.order_idIS NOT NULL AND c.order_idIS
>> NULL THEN 1 ELSE 0 END)AS VARCHAR),
>> >   CAST(count(1)AS VARCHAR)
>> > )AS offer_conversion_5m
>> >FROM (
>> >  SELECT region_code,
>> > offer_id,
>> > rowtime
>> >FROM event_offer_created
>> >   WHERE ...
>> > ) o
>> > LEFT JOIN (
>> >  SELECT offer_id,
>> > order_id,
>> > rowtime
>> >FROM event_order_requested
>> >   WHERE ...
>> > ) r
>> >   ON o.offer_id = r.offer_id
>> >   AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1' hour
>> > LEFT JOIN (
>> >  SELECT order_id,
>> > rowtime
>> >FROM event_order_cancelled
>> >   WHERE ...
>> > )c
>> > ON r.order_id =c.order_id
>> >   AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1' hour
>> > GROUP BY
>> > o.region_code,
>> > TUMBLE(o.rowtime,INTERVAL '5' minute)
>> >
>> >
>> > The sequence of events is:
>> >
>> >  1. At time X an offer is created (event stream =
>> "*event_offer

Re: Very large _metadata file

2020-03-13 Thread Jacob Sevart
Running *Checkpoints.loadCheckpointMetadata *under a debugger, I found
something:
*subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value
*weights
43MB (5.3 million longs).

"startup-times" is an operator state of mine (union list of
java.time.Instant). I see a way to end up fewer items in the list, but I'm
not sure how the actual size is related to the number of offsets. Can you
elaborate on that?

Incidentally, 42.5MB is the number I got out of
https://issues.apache.org/jira/browse/FLINK-14618. So I think my two
problems are closely related.

Jacob

On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu  wrote:

> Hi
>
> As Gordon said, the metadata will contain the ByteStreamStateHandle, when
> writing out the ByteStreamStateHandle, will write out the handle name --
> which is a path(as you saw). The ByteStreamStateHandle will be created when
> state size is small than `state.backend.fs.memory-threshold`(default is
> 1024).
>
> If you want to verify this, you can ref the unit test
> `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load the
> metadata, you can find out that there are many `ByteStreamStateHandle`, and
> their names are the strings you saw in the metadata.
>
> Best,
> Congxian
>
>
> Jacob Sevart  于2020年3月6日周五 上午3:57写道:
>
>> Thanks, I will monitor that thread.
>>
>> I'm having a hard time following the serialization code, but if you know
>> anything about the layout, tell me if this makes sense. What I see in the
>> hex editor is, first, many HDFS paths. Then gigabytes of unreadable data.
>> Then finally another HDFS path at the end.
>>
>> If it is putting state in there, under normal circumstances, does it make
>> sense that it would be interleaved with metadata? I would expect all the
>> metadata to come first, and then state.
>>
>> Jacob
>>
>>
>>
>> Jacob
>>
>> On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas 
>> wrote:
>>
>>> Hi Jacob,
>>>
>>> As I said previously I am not 100% sure what can be causing this
>>> behavior, but this is a related thread here:
>>>
>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI&e=
>>>
>>> Which you can re-post your problem and monitor for answers.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart  wrote:
>>> >
>>> > Kostas and Gordon,
>>> >
>>> > Thanks for the suggestions! I'm on RocksDB. We don't have that setting
>>> configured so it should be at the default 1024b. This is the full "state.*"
>>> section showing in the JobManager UI.
>>> >
>>> >
>>> >
>>> > Jacob
>>> >
>>> > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>> >>
>>> >> Hi Jacob,
>>> >>
>>> >> Apart from what Klou already mentioned, one slightly possible reason:
>>> >>
>>> >> If you are using the FsStateBackend, it is also possible that your
>>> state is small enough to be considered to be stored inline within the
>>> metadata file.
>>> >> That is governed by the "state.backend.fs.memory-threshold"
>>> configuration, with a default value of 1024 bytes, or can also be
>>> configured with the `fileStateSizeThreshold` argument when constructing the
>>> `FsStateBackend`.
>>> >> The purpose of that threshold is to ensure that the backend does not
>>> create a large amount of very small files, where potentially the file
>>> pointers are actually larger than the state itself.
>>> >>
>>> >> Cheers,
>>> >> Gordon
>>> >>
>>> >>
>>> >>
>>> >> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas 
>>> wrote:
>>> >>>
>>> >>> Hi Jacob,
>>> >>>
>>> >>> Could you specify which StateBackend you are using?
>>> >>>
>>> >>> The reason I am asking is that, from the documentation in [1]:
>>> >>>
>>> >>> "Note that if you use the MemoryStateBackend, metadata and savepoint
>>> >>> state will be stored in the _metadata file. Since it is
>>> >>> self-contained, you may move the file and restore from any location."
>>> >>>
>>> >>> I am also cc'ing Gordon who may know a bit more about state formats.
>>> >>>
>>> >>> I hope this helps,
>>> >>> Kostas
>>> >>>
>>> >>> [1]
>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.6_ops_state_savepoints.html&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=fw0c-Ct21HHJv4MzZRicIaltqHLQOrNvqchzNgCdwkA&e=
>>> >>>
>>> >>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart 
>>> wrote:
>>> >>> >
>>> >>> > Per the documentation:
>>> >>> >
>>> >>> > "The meta data file of a Savepoint contains (primarily) pointers
>>> to all files on stable storage that are part of the Savepoint, in form of
>>> absolute paths."
>>> >>> >
>>> >>> > I somehow have a _metadata file that's 1.9GB. Running strings on
>>> it 

Re: Very large _metadata file

2020-03-13 Thread Jacob Sevart
Oh, I should clarify that's 43MB per partition, so with 48 partitions it
explains my 2GB.

On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart  wrote:

> Running *Checkpoints.loadCheckpointMetadata *under a debugger, I found
> something:
> *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value
>  *weights
> 43MB (5.3 million longs).
>
> "startup-times" is an operator state of mine (union list of
> java.time.Instant). I see a way to end up fewer items in the list, but I'm
> not sure how the actual size is related to the number of offsets. Can you
> elaborate on that?
>
> Incidentally, 42.5MB is the number I got out of
> https://issues.apache.org/jira/browse/FLINK-14618. So I think my two
> problems are closely related.
>
> Jacob
>
> On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu 
> wrote:
>
>> Hi
>>
>> As Gordon said, the metadata will contain the ByteStreamStateHandle, when
>> writing out the ByteStreamStateHandle, will write out the handle name --
>> which is a path(as you saw). The ByteStreamStateHandle will be created when
>> state size is small than `state.backend.fs.memory-threshold`(default is
>> 1024).
>>
>> If you want to verify this, you can ref the unit test
>> `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load the
>> metadata, you can find out that there are many `ByteStreamStateHandle`, and
>> their names are the strings you saw in the metadata.
>>
>> Best,
>> Congxian
>>
>>
>> Jacob Sevart  于2020年3月6日周五 上午3:57写道:
>>
>>> Thanks, I will monitor that thread.
>>>
>>> I'm having a hard time following the serialization code, but if you know
>>> anything about the layout, tell me if this makes sense. What I see in the
>>> hex editor is, first, many HDFS paths. Then gigabytes of unreadable data.
>>> Then finally another HDFS path at the end.
>>>
>>> If it is putting state in there, under normal circumstances, does it
>>> make sense that it would be interleaved with metadata? I would expect all
>>> the metadata to come first, and then state.
>>>
>>> Jacob
>>>
>>>
>>>
>>> Jacob
>>>
>>> On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas 
>>> wrote:
>>>
 Hi Jacob,

 As I said previously I am not 100% sure what can be causing this
 behavior, but this is a related thread here:

 https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI&e=

 Which you can re-post your problem and monitor for answers.

 Cheers,
 Kostas

 On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart  wrote:
 >
 > Kostas and Gordon,
 >
 > Thanks for the suggestions! I'm on RocksDB. We don't have that
 setting configured so it should be at the default 1024b. This is the full
 "state.*" section showing in the JobManager UI.
 >
 >
 >
 > Jacob
 >
 > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai <
 tzuli...@apache.org> wrote:
 >>
 >> Hi Jacob,
 >>
 >> Apart from what Klou already mentioned, one slightly possible reason:
 >>
 >> If you are using the FsStateBackend, it is also possible that your
 state is small enough to be considered to be stored inline within the
 metadata file.
 >> That is governed by the "state.backend.fs.memory-threshold"
 configuration, with a default value of 1024 bytes, or can also be
 configured with the `fileStateSizeThreshold` argument when constructing the
 `FsStateBackend`.
 >> The purpose of that threshold is to ensure that the backend does not
 create a large amount of very small files, where potentially the file
 pointers are actually larger than the state itself.
 >>
 >> Cheers,
 >> Gordon
 >>
 >>
 >>
 >> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas 
 wrote:
 >>>
 >>> Hi Jacob,
 >>>
 >>> Could you specify which StateBackend you are using?
 >>>
 >>> The reason I am asking is that, from the documentation in [1]:
 >>>
 >>> "Note that if you use the MemoryStateBackend, metadata and savepoint
 >>> state will be stored in the _metadata file. Since it is
 >>> self-contained, you may move the file and restore from any
 location."
 >>>
 >>> I am also cc'ing Gordon who may know a bit more about state formats.
 >>>
 >>> I hope this helps,
 >>> Kostas
 >>>
 >>> [1]
 https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.6_ops_state_savepoints.html&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=fw0c-Ct21HHJv4MzZRicIaltqHLQOrNvqchzNgCdwkA&e=
 >>>
 >>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart 
 wrote:
 >>> >
 >>> > Pe

FLIP 27 is not already, how can i workaround ?

2020-03-13 Thread forideal
Hello everyone
   Now i have a job with big state in RocksDB.This job's source is Kafka. If i 
want to replay data, the job will crash.
   One of the motivation of FLIP 27 is event time alignment , however , it is 
not already for me.
   How can i work around?
   Here is an immature solution, I don't know if it works
   1. I save all partition's event time in exteranl storage,for example, Redis
   2. In source function,i read all partition's event time periodically
   3. If I find something faster, I let him wait
Thank you