Re: How does Flink handle shorted lived keyed streams

2020-12-24 Thread Xintong Song
I believe what you are looking for is the State TTL [1][2].


Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

[2]
https://ci.apache.org/projects/flink/flink-docs-stabledev/table/config.html#table-exec-state-ttl



On Wed, Dec 23, 2020 at 11:57 PM narasimha  wrote:

> Hi,
>
> Belos is the use case.
>
> Have a stream of transaction events, success/failure of a transaction can
> be determined by those events.
> Partitioning stream by transaction id and applying CEP to determine the
> success/failure of a transaction.
> Each transaction keyed stream is valid only until the final status is
> found. Which can end up having large inactive keyed streams in the system.
>
> Know that using keygroup flink distributes the keyedstream to tasks based
> on it, but still there will be a large set of inactive keys.
>
> Does this have any side effects? If so what has to be done to overcome
> humongous keyed streams?
>
> --
> A.Narasimha Swamy
>


RE: RE: checkpointing seems to be throttled.

2020-12-24 Thread Colletta, Edward
FYI, this was an EFS issue.  I originally dismissed EFS being the issue because 
the Percent I/O limit metric  was very low.  But I later noticed the throughput 
utilization was very high.  We increased the provisioned throughput and the 
checkpoint times are greatly reduced.

From: Colletta, Edward
Sent: Monday, December 21, 2020 12:32 PM
To: Yun Gao ; user@flink.apache.org
Subject: RE: RE: checkpointing seems to be throttled.

Doh!   Yeah, we set the state backend in code and I read the flink-conf.yaml 
file and use the high-availability storage dir.


From: Yun Gao mailto:yungao...@aliyun.com>>
Sent: Monday, December 21, 2020 11:28 AM
To: Colletta, Edward mailto:edward.colle...@fmr.com>>; 
user@flink.apache.org
Subject: Re: RE: checkpointing seems to be throttled.

This email is from an external source - exercise caution regarding links and 
attachments.

Hi Edward,

   Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code 
it requires a path parameter and the path would be the state.checkpoint.dir. If 
via flink-conf.yaml, I tried on 1.12 by setting   state.backend: filesystem in 
config file and enable checkpoint, it indeed threw an exception said

  org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Cannot create the file system state backend: The configuration 
does not specify the checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
​ at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
​ at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
​ at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
​ at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
​ at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
​ at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
​ at java.security.AccessController.doPrivileged(Native Method)
​ at javax.security.auth.Subject.doAs(Subject.java:422)
​ at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
​ at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
​ at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot 
create the file system state backend: The configuration does not specify the 
checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
​ at 
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
​ at java.util.Optional.map(Optional.java:215)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
​ at CheckpointTest.main(CheckpointTest.java:26)
​ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
​ at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
​ at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
​ at java.lang.reflect.Method.invoke(Method.java:498)
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
​ ... 11 more
​
​
   For the timeout, if there are no backpressure, I think it might be helpful 
to see the time decompostion for the checkpoint in the checkpoint history page 
in WEB UI to see which phase takes too long time.


Best,
 Yun


--Original Mail --
Sender:Colletta, Edward 
mailto:edward.colle...@fmr.com>>
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao mailto:yungao...@aliyun.com>>, 
user@flink.apache.org

Re: How does Flink handle shorted lived keyed streams

2020-12-24 Thread narasimha
Thanks Xintong.

I'll check it out and get back to you.

On Thu, Dec 24, 2020 at 1:30 PM Xintong Song  wrote:

> I believe what you are looking for is the State TTL [1][2].
>
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stabledev/table/config.html#table-exec-state-ttl
>
>
>
> On Wed, Dec 23, 2020 at 11:57 PM narasimha  wrote:
>
>> Hi,
>>
>> Belos is the use case.
>>
>> Have a stream of transaction events, success/failure of a transaction can
>> be determined by those events.
>> Partitioning stream by transaction id and applying CEP to determine the
>> success/failure of a transaction.
>> Each transaction keyed stream is valid only until the final status is
>> found. Which can end up having large inactive keyed streams in the system.
>>
>> Know that using keygroup flink distributes the keyedstream to tasks based
>> on it, but still there will be a large set of inactive keys.
>>
>> Does this have any side effects? If so what has to be done to overcome
>> humongous keyed streams?
>>
>> --
>> A.Narasimha Swamy
>>
>

-- 
A.Narasimha Swamy


Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

2020-12-24 Thread Yuval Itzchakov
Hi,

I have a UDF which returns a type of MAP')>. When I try to register this type with Flink via the
CREATE TABLE DDL, I encounter an exception:

- SQL parse failed. Encountered "(" at line 2, column 256.
Was expecting one of:
"NOT" ...
"NULL" ...
">" ...
"MULTISET" ...
"ARRAY" ...
"." ...

Which looks like the planner doesn't like the round brackets on the LEGACY
type. What is the correct way to register the table with this type with
Flink?
-- 
Best Regards,
Yuval Itzchakov.


Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

2020-12-24 Thread Yuval Itzchakov
An expansion to my question:

What I really want is for the UDF to return `RAW(io.circe.Json, ?)` type,
but I have to do a conversion between Table and DataStream, and
TypeConversions.fromDataTypeToLegacyInfo cannot convert a plain RAW type
back to TypeInformation.

On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov  wrote:

> Hi,
>
> I have a UDF which returns a type of MAP 'ANY')>. When I try to register this type with Flink via the
> CREATE TABLE DDL, I encounter an exception:
>
> - SQL parse failed. Encountered "(" at line 2, column 256.
> Was expecting one of:
> "NOT" ...
> "NULL" ...
> ">" ...
> "MULTISET" ...
> "ARRAY" ...
> "." ...
>
> Which looks like the planner doesn't like the round brackets on the LEGACY
> type. What is the correct way to register the table with this type with
> Flink?
> --
> Best Regards,
> Yuval Itzchakov.
>


-- 
Best Regards,
Yuval Itzchakov.


StreamingFileSink closed file exception

2020-12-24 Thread Billy Bain
I am new to Flink and am trying to process a file and write it out
formatted as JSON.

This is a much simplified version.

public class AndroidReader {
public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
DataStreamSource lines =
 env.readTextFile("file:///path/to/file/input.json");

SingleOutputStreamOperator android = lines.map(new
AndroidDataMapper());
StreamingFileSink sink =
 StreamingFileSink.forRowFormat(new Path("file:///path/to/file/output"),
new AndroidDataEncoder() )

.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix("json").build())
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))

.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
android.addSink(sink);
env.execute("Android");
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
public class AndroidData {
public AndroidData() {
}
private String packageName;
public String getPackageName() {
return packageName;
}
public void setPackageName(String packageName) {
this.packageName = packageName;
}
}
public class AndroidDataMapper implements MapFunction {

private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public AndroidData map(String value) throws Exception {
return objectMapper.readValue(value, AndroidData.class);
}
}
AndroidDataEncoder class:
public class AndroidDataEncoder implements Encoder {

private static final ObjectMapper objectMapper = new ObjectMapper();

@Override
public void encode(AndroidData element, OutputStream stream) throws
IOException {
objectMapper.writeValue(stream, element);
}
}

The issue is that I get an ClosedChannelException. I see the folder get
created, but then no files are written to it.

java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:
150)
at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:
325)
at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream
.getPos(LocalRecoverableFsDataOutputStream.java:101)
at org.apache.flink.streaming.api.functions.sink.filesystem.
OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter
.java:70)
at
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.
DefaultRollingPolicy.shouldRollOnEvent(DefaultRollingPolicy.java:76)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
.write(Bucket.java:200)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets
.onElement(Buckets.java:290)
at org.apache.flink.streaming.api.functions.sink.filesystem.
StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at org.apache.flink.streaming.api.functions.sink.filesystem.
StreamingFileSink.invoke(StreamingFileSink.java:436)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(
StreamSink.java:56)

Any help would be appreciated. Thanks!

-- 
Wayne D. Young
aka Billy Bob Bain
billybobb...@gmail.com


Flink+DDL读取kafka没有输出信息,但是kafka消费端有信息

2020-12-24 Thread Appleyuchi
是Flink1.12的,kafka消费端能读取到数据,但是下面的代码无法读取到数据,运行后没有报错也没有输出,求助,谢谢



import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.math.Ordering.Int



object FlinkKafkaDDLDemo
{

def main(args: Array[String]): Unit =
{

val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(3)



val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()
val tEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

val createTable =
"""
  |CREATE TABLE PERSON (

  |name VARCHAR COMMENT '姓名',

  |age VARCHAR COMMENT '年龄',

  |city VARCHAR COMMENT '所在城市',

  |address VARCHAR COMMENT '家庭住址',

  |ts TIMESTAMP(3) COMMENT '时间戳'

  |)

  |WITH (

  |'connector.type' = 'kafka', -- 使用 kafka connector

  |'connector.version' = 'universal',  -- kafka 版本

  |'connector.topic' = 'kafka_ddl',  -- kafka topic

  |'connector.startup-mode' = 'earliest-offset', -- 从最早的 offset 
开始读取

  |'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息

  |'connector.properties.0.value' = 'Desktop:2181',

  |'connector.properties.1.key' = 'bootstrap.servers',

  |'connector.properties.1.value' = 'Desktop:9091',

  |'update-mode' = 'append',

  |'format.type' = 'json',  -- 数据源格式为 json

  |'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则

  |)

""".stripMargin



tEnv.executeSql(createTable)



val query: String ="""SELECT name,COUNT(age) FROM PERSON GROUP BY 
name""".stripMargin



val result: Table = tEnv.sqlQuery(query)

tEnv.toRetractStream[Row](result).print()
//tEnv.execute("Flink SQL DDL")

}

}

Realtime Data processing from HBase

2020-12-24 Thread s_penakalap...@yahoo.com
Hi Team,
I recently encountered one usecase in my project as described below:
My data source is HBaseWe receive huge volume of data at very high speed to 
HBase tables from source system.Need to read from HBase, perform computation 
and insert to postgreSQL.
I would like few inputs on the below points:   
   - Using Flink streaming API,  is continuous streaming possible from HBase 
Database? As I tried using RichSourceFunction ,StreamExecutionEnvironment  and 
was able to read data but Job stops once all data is read from HBase. My 
requirement is Job should be continuously executing and read data as and when 
data arrives to HBase table.
   - If continuous streaming from HBase is supported, How can Checkpointing be 
done on HBase so that Job can be restarted from the pointed where Job aborted. 
I tried googling but no luck. Request to help with any simple example or 
approach. 
   - If continuous streaming from HBase is not supported then what should be 
alternative approach - Batch Job?(Our requirement is to process the realtime 
data from HBase and not to launch multiple ETL Job)

Happy Christmas to all  :)

Regards,Sunitha.


FileSink class in 1.12?

2020-12-24 Thread Billy Bain
I can't seem to find the org.apache.flink.connector.file.sink.FileSink
class.

I can find the StreamingFileSink, but not FileSink referenced here:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html

Am I missing a dependency?

compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12',
version: '1.12.0'
compile group: 'org.apache.flink', name: 'flink-jdbc_2.12', version:
'1.10.2'

This is related to my issue with StreamingFileSink that I asked earlier.

Thanks.
-- 
Wayne D. Young
aka Billy Bob Bain
billybobb...@gmail.com


Re: FileSink class in 1.12?

2020-12-24 Thread Billy Bain
Of course I found it shortly after submitting my query. 

compile group: 'org.apache.flink', name: 'flink-connector-files', version: 
'1.12.0'

On 2020/12/24 15:57:20, Billy Bain  wrote: 
> I can't seem to find the org.apache.flink.connector.file.sink.FileSink
> class.
> 
> I can find the StreamingFileSink, but not FileSink referenced here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html
> 
> Am I missing a dependency?
> 
> compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12',
> version: '1.12.0'
> compile group: 'org.apache.flink', name: 'flink-jdbc_2.12', version:
> '1.10.2'
> 
> This is related to my issue with StreamingFileSink that I asked earlier.
> 
> Thanks.
> -- 
> Wayne D. Young
> aka Billy Bob Bain
> billybobb...@gmail.com
> 


Re: Issue in WordCount Example with DataStream API in BATCH RuntimeExecutionMode

2020-12-24 Thread Aljoscha Krettek
Thanks for reporting this! This is not the expected behaviour, I created 
a Jira Issue: https://issues.apache.org/jira/browse/FLINK-20764.


Best,
Aljoscha

On 23.12.20 22:26, David Anderson wrote:

I did a little experiment, and I was able to reproduce this if I use the
sum aggregator on KeyedStream to do the counting.

However, if I implement my own counting in a KeyedProcessFunction, or if I
use the Table API, I get correct results with RuntimeExecutionMode.BATCH --
though the results are produced incrementally, as they would be in
streaming mode.

In FLIP-134: Batch execution for the DataStream API [1] it was decided to
deprecate these relational methods -- such as sum -- on KeyedStream. But I
don't know if this means this behavior is to be expected, or not.

I've cc'ed @Aljoscha Krettek , who should be able to
shed some light on this.

Best,
David

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API

On Wed, Dec 23, 2020 at 8:22 PM Derek Sheng 
wrote:


Hi team,

Recently I am trying to explore the new features of Flink 1.12 with Batch
Execution.

I locally wrote a classic WordCount program to read from text file and
count the words (almost same as the one in Flink Github repo
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala),
and after reading
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
I added `env.setRuntimeMode(RuntimeExecutionMode.BATCH);` after declare the
"env" to make it execute under BATCH mode. After running the code, the
printed results showed only final count results instead of incremental
results, which is expected. *But I also notice, all the words that only
appear once have NOT been printed out*. I have tried different things
like wrap the word in a case class etc, and read more details and see if I
have missed anything but still not able to figure out (And I have tried the
default examples come with the Flink package and got same results, and with
using DataSet API I do not see this issue).

Is there anything extra user need to specify or notice when using BATCH
execution mode in datastream API with Flink 1.12 or this is kind of a bug
please? The flink version I used is 1.12 with scala 2.11 (also tried java
1.8 and observed same issue)

Please let me know if you need other info to help diagnose. Thank you very
much!

Bests,

Derek Sheng







Re: StreamingFileSink closed file exception

2020-12-24 Thread Yun Gao
Hi Billy,
StreamingFileSink does not expect the Encoder to close the stream passed in 
in encode method. However, ObjectMapper would close it at the end of the write 
method. Thus I think you think disable the close action for ObjectMapper, or 
change the encode implementation to 

objectMapper.writeValue(new CloseShieldOutputStream(stream), element);

to avoid the stream get closed actually.
 --Original Mail --
Sender:Billy Bain 
Send Date:Thu Dec 24 22:32:06 2020
Recipients:User 
Subject:StreamingFileSink closed file exception

I am new to Flink and am trying to process a file and write it out formatted as 
JSON. 

This is a much simplified version. 

public class AndroidReader {
public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
DataStreamSource lines =  
env.readTextFile("file:///path/to/file/input.json");

SingleOutputStreamOperator android = lines.map(new 
AndroidDataMapper());
StreamingFileSink sink =  
StreamingFileSink.forRowFormat(new Path("file:///path/to/file/output"), new 
AndroidDataEncoder() )

.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix("json").build())
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
android.addSink(sink);
env.execute("Android");
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
public class AndroidData {
public AndroidData() {
}
private String packageName;
public String getPackageName() {
return packageName;
}
public void setPackageName(String packageName) {
this.packageName = packageName;
}
}
public class AndroidDataMapper implements MapFunction {

private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public AndroidData map(String value) throws Exception {
return objectMapper.readValue(value, AndroidData.class);
}
}
AndroidDataEncoder class:
public class AndroidDataEncoder implements Encoder {

private static final ObjectMapper objectMapper = new ObjectMapper();

@Override
public void encode(AndroidData element, OutputStream stream) throws 
IOException {
objectMapper.writeValue(stream, element);
}
}

The issue is that I get an ClosedChannelException. I see the folder get 
created, but then no files are written to it. 
java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
at 
org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnEvent(DefaultRollingPolicy.java:76)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

Any help would be appreciated. Thanks!

-- 
Wayne D. Young
aka Billy Bob Bain
billybobb...@gmail.com


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-24 Thread Yun Gao
   Hi all,

 I tested the previous PoC with the current tests and I found some new 
issues that might cause divergence, and sorry for there might also be some 
reversal for some previous problems:

 1. Which operators should wait for one more checkpoint before close ?

One motivation for this FLIP is to ensure the 2PC sink commits the last 
part of data before closed, which makes the sink operator need to wait for one 
more checkpoint like onEndOfInput() -> waitForCheckpoint() -> 
notifyCheckpointComplete() -> close(). This lead to the issue which operators 
should wait for checkpoint? Possible options are 
 a. Make all the operators (or UDF) implemented 
notifyCheckpointCompleted method wait for one more checkpoint. One exception is 
that since we can only snapshot one or all tasks for a legacy source operator 
to avoid data repetition[1], we could not support legacy operators and its 
chained operators to wait for checkpoints since there will be deadlock if part 
of the tasks are finished, this would finally be solved after legacy source are 
deprecated. The PoC used this option for now.
b. Make operators (or UDF) implemented a special marker 
interface to wait for one more checkpoint.  


   2. Do we need to solve the case that tasks finished before triggered ?

  Previously I think we could postpone it, however, during testing I found 
that it might cause some problems since by default checkpoint failure would 
cause job failover, and the job would also need wait for another interval to 
trigger the next checkpoint. To pass the tests, I updated the PoC to include 
this part, and we may have a double think on if we need to include it or use 
some other options.

3. How to extend a new format for checkpoint meta ?

Sorry previously I gave a wrong estimation, after I extract a sub-component 
for (de)serialize operator state, I found the problem just goes to the new 
OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have 
different fields, thus they use different process when (de)serialize, which is 
a bit different from the case that we have a fixed steps and each step has 
different logic. Thus we might either
 a. Use base classes for each two version.
 b. Or have a unified framework contains all the possible fields across all 
version, and use empty field serializer to skip some fields in each version.

Best,
Yun

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&Checkpointsdonotcontainthefinalstatesfromfinishedtasks


--
From:Yun Gao 
Send Time:2020 Dec. 16 (Wed.) 11:07
To:Aljoscha Krettek ; dev ; user 

Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

 Hi Aljoscha,

Very thanks for the feedbacks! For the remaining issues:

  > 1. You mean we would insert "artificial" barriers for barrier 2 in case 
we receive  EndOfPartition while other inputs have already received barrier 2? 
I think that makes sense, yes.

  Yes, exactly, I would like to  insert "artificial" barriers for in case 
we receive  EndOfPartition while other inputs have already received barrier 2, 
and also for the similar cases that some input channels received EndOfPartition 
during checkpoint 2 is ongoing and when the task receive directly checkpoint 
triggering after all the precedent tasks are finished but not received their 
EndOfPartition yet.

 > 3. This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

I re-checked the code and now I think composition would be better to avoid 
complex inheritance hierarchy by exposing the changed part 
`(de)serializeOperatorState` out, and I'll update the PoC to change this part. 
Very thanks for the suggestions!

   > 4. Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. Are you 
saying we should fix that or would the new work introduce even 
more duplicate code?

  Currently since we would never trigger non-source tasks, thus the 
triggerCheckpoint logic is now implemented in the base StreamTask class and 
only be used by the source tasks. However, after the change the non-source 
tasks would also get triggered with a different behavior, we might not be able 
to continue using this pattern.

Best,
Yun


--
From:Aljoscha Krettek 
Send Time:2020 Dec. 15 (Tue.) 18:11
To:dev 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>  1. To include EndOfPartition into consideration for barrier alignment at 
> the TM side, we now tend to decouple t