[ 
https://issues.apache.org/jira/browse/FLINK-29555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liwei li updated FLINK-29555:
-----------------------------
    Description: 
env:

Flink 1.14.3

 
{code:java}
EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings.newInstance();
settingsBuilder.inStreamingMode();
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
env.setMaxParallelism(2);
env.setParallelism(2);
tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());

{code}
 

code:
{code:java}
// Register the rows into a temporary table.
 getTableEnv().createTemporaryView("sourceTable",
    getTableEnv().fromValues(SimpleDataUtil.FLINK_SCHEMA.toRowDataType(),
        Expressions.row(1, "hello"),
        Expressions.row(2, "world"),
        Expressions.row(3, (String) null),
        Expressions.row(null, "bar")
    )
);

// Redirect the records from source table to destination table.
sql("INSERT INTO %s SELECT id,data from sourceTable", TABLE_NAME); 


{code}
Actual:

The data is not inserted into the table.

 

Here I use a custom global committer implement from `GlobalCommitter`. When I 
debug this method, I notice that the close method is called immediately after 
the endinput method is called. The committable is not sent to my global 
committer. 

 

Currently, because this statement executes quickly and the task ends after 
execution, it does not generate checkpoints even though I have checkpoints 
turned on.

 

The notifyCheckpointCompleted method in the GlobalStreamingCommitterHandler 
class is not called after endOfInput. This doesn't seem to be the intended 
effect.

It seemed to be a bug so I raised this issuse.

[https://github.com/apache/flink/blob/98997ea37ba08eae0f9aa6dd34823238097d8e0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandler.java#L80-L100]

  was:
env:

Flink 1.14.3

 
{code:java}
EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings.newInstance();
settingsBuilder.inStreamingMode();
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
env.setMaxParallelism(2);
env.setParallelism(2);
tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());

{code}
 

code:
{code:java}
// Register the rows into a temporary table.
 getTableEnv().createTemporaryView("sourceTable",
    getTableEnv().fromValues(SimpleDataUtil.FLINK_SCHEMA.toRowDataType(),
        Expressions.row(1, "hello"),
        Expressions.row(2, "world"),
        Expressions.row(3, (String) null),
        Expressions.row(null, "bar")
    )
);

// Redirect the records from source table to destination table.
sql("INSERT INTO %s SELECT id,data from sourceTable", TABLE_NAME); 


{code}
Actual:

The data is not inserted into the table.

 

Here I use a custom global committer implement from `GlobalCommitter`. When I 
debug this method, I notice that the close method is called immediately after 
the endinput method is called. The committable is not sent to my global 
committer. 

The notifyCheckpointCompleted method in the GlobalStreamingCommitterHandler 
class is not called after endOfInput. This doesn't seem to be the intended 
effect.

It seemed to be a bug so I raised this issuse.

https://github.com/apache/flink/blob/98997ea37ba08eae0f9aa6dd34823238097d8e0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandler.java#L80-L100


> GlobalStreamingCommitterHandler not call notifyCheckpointCompleted after 
> endOfInput
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-29555
>                 URL: https://issues.apache.org/jira/browse/FLINK-29555
>             Project: Flink
>          Issue Type: Bug
>            Reporter: liwei li
>            Priority: Major
>
> env:
> Flink 1.14.3
>  
> {code:java}
> EnvironmentSettings.Builder settingsBuilder = 
> EnvironmentSettings.newInstance();
> settingsBuilder.inStreamingMode();
> StreamExecutionEnvironment env = StreamExecutionEnvironment
> .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
> env.enableCheckpointing(400);
> env.setMaxParallelism(2);
> env.setParallelism(2);
> tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
> {code}
>  
> code:
> {code:java}
> // Register the rows into a temporary table.
>  getTableEnv().createTemporaryView("sourceTable",
>     getTableEnv().fromValues(SimpleDataUtil.FLINK_SCHEMA.toRowDataType(),
>         Expressions.row(1, "hello"),
>         Expressions.row(2, "world"),
>         Expressions.row(3, (String) null),
>         Expressions.row(null, "bar")
>     )
> );
> // Redirect the records from source table to destination table.
> sql("INSERT INTO %s SELECT id,data from sourceTable", TABLE_NAME); 
> {code}
> Actual:
> The data is not inserted into the table.
>  
> Here I use a custom global committer implement from `GlobalCommitter`. When I 
> debug this method, I notice that the close method is called immediately after 
> the endinput method is called. The committable is not sent to my global 
> committer. 
>  
> Currently, because this statement executes quickly and the task ends after 
> execution, it does not generate checkpoints even though I have checkpoints 
> turned on.
>  
> The notifyCheckpointCompleted method in the GlobalStreamingCommitterHandler 
> class is not called after endOfInput. This doesn't seem to be the intended 
> effect.
> It seemed to be a bug so I raised this issuse.
> [https://github.com/apache/flink/blob/98997ea37ba08eae0f9aa6dd34823238097d8e0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandler.java#L80-L100]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to