[jira] [Created] (FLINK-20368) Supports custom operator name for Flink SQL

2020-11-26 Thread Danny Chen (Jira)
Danny Chen created FLINK-20368:
--

 Summary: Supports custom operator name for Flink SQL
 Key: FLINK-20368
 URL: https://issues.apache.org/jira/browse/FLINK-20368
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.12.0
Reporter: Danny Chen


A request from USER mailing list from Kevin Kwon:

For SQLs, I know that the operator ID assignment is not possible now since the 
query optimizer may not be backward compatible in each release

But are DDLs also affected by this?

for example,

CREATE TABLE mytable (
  id BIGINT,
  data STRING
) with (
  connector = 'kafka'
  ...
  id = 'mytable'
  name = 'mytable'
)

and we can save all related checkpoint data




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20369) Improve the digest of TableSourceScan and Sink node

2020-11-26 Thread Jark Wu (Jira)
Jark Wu created FLINK-20369:
---

 Summary: Improve the digest of TableSourceScan and Sink node
 Key: FLINK-20369
 URL: https://issues.apache.org/jira/browse/FLINK-20369
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Jark Wu


Currently, 

1. the digest of {{TableSourceScan}} and {{Sink}} doesn't contain the connector 
information which will be quite useful when debugging. 
2. The table name is quite verbose when under default catalog and database, 
would be better to simplify it to only table name if under default catalog and 
database.
3. Maybe it's nicer to have changelog mode of source and sink, because it's a 
meta information of {{DynamicTableSource/Sink#getChangelogMode}}. 


{code}
Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
+- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
   +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
count_customer, sum_gender, id, city_name], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
  :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
  :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
sum_gender], changelogMode=[I,UA,D])
  : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
  :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
changelogMode=[I])
  :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
  :  +- ChangelogNormalize(key=[customer_id], 
changelogMode=[I,UB,UA,D])
  : +- Exchange(distribution=[hash[customer_id]], 
changelogMode=[UA,D])
  :+- MiniBatchAssigner(interval=[3000ms], 
mode=[ProcTime], changelogMode=[UA,D])
  :   +- TableSourceScan(table=[[default_catalog, 
default_database, source_customer]], fields=[customer_id, city_id, age, gender, 
update_time], changelogMode=[UA,D])
  +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
 +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
+- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
   +- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
changelogMode=[UA,D])
  +- TableSourceScan(table=[[default_catalog, default_database, 
source_city]], fields=[id, city_name], changelogMode=[UA,D])
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2020-11-26 Thread Jark Wu (Jira)
Jark Wu created FLINK-20370:
---

 Summary: Result is wrong when sink primary key is not the same 
with query
 Key: FLINK-20370
 URL: https://issues.apache.org/jira/browse/FLINK-20370
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Jark Wu


Both sources are upsert-kafka which synchronizes the changes from MySQL tables 
(source_city, source_customer). The sink is another MySQL table which is in 
upsert mode with "city_name" primary key. The join key is "city_id". 

In this case, the result will be wrong when updating {{source_city.city_name}} 
column in MySQL, as the UPDATE_BEFORE is ignored and the old city_name is 
retained in the sink table. 

{code}
Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
+- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
   +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
count_customer, sum_gender, id, city_name], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
  :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
  :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
sum_gender], changelogMode=[I,UA,D])
  : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
  :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
changelogMode=[I])
  :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
  :  +- ChangelogNormalize(key=[customer_id], 
changelogMode=[I,UB,UA,D])
  : +- Exchange(distribution=[hash[customer_id]], 
changelogMode=[UA,D])
  :+- MiniBatchAssigner(interval=[3000ms], 
mode=[ProcTime], changelogMode=[UA,D])
  :   +- TableSourceScan(table=[[default_catalog, 
default_database, source_customer]], fields=[customer_id, city_id, age, gender, 
update_time], changelogMode=[UA,D])
  +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
 +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
+- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
   +- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
changelogMode=[UA,D])
  +- TableSourceScan(table=[[default_catalog, default_database, 
source_city]], fields=[id, city_name], changelogMode=[UA,D])
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20372) flink sql kafka connector cannot config kerberos

2020-11-26 Thread Jira
谢波 created FLINK-20372:
--

 Summary: flink sql kafka connector cannot config kerberos 
 Key: FLINK-20372
 URL: https://issues.apache.org/jira/browse/FLINK-20372
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Ecosystem
Affects Versions: 1.11.2
Reporter: 谢波
 Fix For: 1.12.0


I tried to configure kerberos on flink sql kafka connector, but i found that 
the connector did not provide options.

eg. security.protocol, {{sasl.kerberos.service.name, sasl.mechanism.}}

{{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20371) Add docs for outer interval join

2020-11-26 Thread Timo Walther (Jira)
Timo Walther created FLINK-20371:


 Summary: Add docs for outer interval join
 Key: FLINK-20371
 URL: https://issues.apache.org/jira/browse/FLINK-20371
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


By looking at the docs, it looks like we only support inner interval joins but 
we also support outer joins according to the tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20373) Flink table jsonArray access all

2020-11-26 Thread HideOnBush (Jira)
HideOnBush created FLINK-20373:
--

 Summary: Flink table jsonArray access all
 Key: FLINK-20373
 URL: https://issues.apache.org/jira/browse/FLINK-20373
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.11.2
Reporter: HideOnBush
 Fix For: 1.11.3


The official jsonArray is provided, and the array is also provided to access 
Row elements based on the subscript. Should we also consider the length of each 
jsonArray, and if the subscript is passed, the code will become longer



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20374) Wrong result when shuffling changelog stream on non-primary-key columns

2020-11-26 Thread Jark Wu (Jira)
Jark Wu created FLINK-20374:
---

 Summary: Wrong result when shuffling changelog stream on 
non-primary-key columns
 Key: FLINK-20374
 URL: https://issues.apache.org/jira/browse/FLINK-20374
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Jark Wu


This is reported from user-zh ML: 
http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html

{code:sql}
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test'
)

CREATE TABLE status (
`id` INT,
`name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (  
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'status'
);

-- output
CREATE TABLE test_status (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`status_name` VARCHAR(255)
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'xxx',
  'index' = 'xxx',
  'username' = 'xxx',
  'password' = 'xxx',
  'sink.bulk-flush.backoff.max-retries' = '10',
  'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
  'sink.bulk-flush.max-actions' = '5000',
  'sink.bulk-flush.max-size' = '10mb',
  'sink.bulk-flush.interval' = '1s'
);

INSERT into test_status
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;
{code}

Data in mysql table:

{code}
test:
0, name0, 2020-07-06 00:00:00 , 0
1, name1, 2020-07-06 00:00:00 , 1
2, name2, 2020-07-06 00:00:00 , 1
.

status
0, status0
1, status1
2, status2
.
{code}

Operations: 
1. start job with paralleslim=40, result in test_status sink is correct: 

{code}
0, name0, 2020-07-06 00:00:00 , 0, status0
1, name1, 2020-07-06 00:00:00 , 1, status1
2, name2, 2020-07-06 00:00:00 , 1, status1
{code}

2. Update {{status}} of {{id=2}} record in table {{test}} from {{1}} to {{2}}.
3. Result is not correct because the {{id=2}} record is missing in the result. 



The reason is that it shuffles the changelog {{test}} on {{status}} column 
which is not the primary key. Therefore, the ordering can't be guaranteed, and 
the result is wrong. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20375) Add a

2020-11-26 Thread hailong wang (Jira)
hailong wang created FLINK-20375:


 Summary: Add a
 Key: FLINK-20375
 URL: https://issues.apache.org/jira/browse/FLINK-20375
 Project: Flink
  Issue Type: Improvement
Reporter: hailong wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20376) Error in restoring savepoint when Flink is upgraded from 1.9 to 1.11.2

2020-11-26 Thread Partha Pradeep Mishra (Jira)
Partha Pradeep Mishra created FLINK-20376:
-

 Summary: Error in restoring savepoint when Flink is upgraded from 
1.9 to 1.11.2
 Key: FLINK-20376
 URL: https://issues.apache.org/jira/browse/FLINK-20376
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: Partha Pradeep Mishra


We tried to save checkpoints for one of the flink job (1.9 version) and then 
import/restore the checkpoints in the newer flink version (1.11.2). The 
import/resume operation failed with the below error. Please note that both the 
jobs(i.e. one running in 1.9 and other in 1.11.2) are same binary with no code 
difference or introduction of new operators. Still we got the below issue.

_Cannot map checkpoint/savepoint state for operator 
fbb4ef531e002f8fb3a2052db255adf5 to the new program, because the operator is 
not available in the new program._

*Complete Stack Trace :*

{"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: Could 
not execute application.\n\tat 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)\n\tat
 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n\tat
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat
 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
 java.lang.Thread.run(Thread.java:748)\nCaused by: 
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute 
application.\n\tat 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tat
 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)\n\t...
 7 more\nCaused by: org.apache.flink.util.FlinkRuntimeException: Could not 
execute application.\n\tat 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)\n\tat
 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat
 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\t...
 7 more\nCaused by: org.apache.flink.client.program.ProgramInvocationException: 
The main method caused an error: Failed to execute job 
'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)\n\tat
 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)\n\tat
 org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\t...
 10 more\nCaused by: org.apache.flink.util.FlinkException: Failed to execute 
job 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821)\n\tat
 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)\n\tat
 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n\tat
 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)\n\tat
 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)\n\tat
 com.man.ceon.cep.jobs.AnalyticService$.main(AnalyticService.scala:108)\n\tat 
com.man.ceon.cep.jobs.AnalyticService.main(AnalyticService.scala)\n\tat 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat
 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat
 java.lang.reflect.Method.invoke(Method.java:498)\n\tat 
org.apache.flink.client.p

[jira] [Created] (FLINK-20377) flink-1.11.2 -kerberos config on kafka connector not working

2020-11-26 Thread Jira
谢波 created FLINK-20377:
--

 Summary: flink-1.11.2 -kerberos config on kafka connector not 
working
 Key: FLINK-20377
 URL: https://issues.apache.org/jira/browse/FLINK-20377
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Table SQL / Ecosystem
Affects Versions: 1.11.2
 Environment: flink on yarn

kafka with kerberos 

flink-1.11.2_2.11
Reporter: 谢波
 Fix For: 1.12.0


I refer to the configuration on the official website to configure Kafka and 
flink-conf.yaml ,but the configuration does not work.

my table config :

WITH (
 'connector' = 'kafka',
 'properties.bootstrap.servers' = '',
 'topic' = 'kafka_hepecc_ekko_cut_json',
 'properties.group.id' = 'ekko.group',
 'properties.security.protocol' = 'SASL_PLAINTEXT',
 'properties.sasl.kerberos.service.name' = 'kafka',
-- 'properties.sasl.mechanism' = 'GSSAPI',
 'format' = 'json'
);

yaml:

security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /home/xiebo/module/flink/keytab/xiebo.keytab
security.kerberos.login.principal: xi...@yonghui.cn

# The configuration below defines which JAAS login contexts

security.kerberos.login.contexts: Client,KafkaClient

 

dir content:

[xiebo@ww021 keytab]$ pwd
/home/xiebo/module/flink/keytab
[xiebo@ww021 keytab]$ ll 
total 12
-rw-r--r-- 1 xiebo bigdata_dev 486 Nov 26 18:15 kafka_client_jaas.conf
-rw-r--r-- 1 xiebo bigdata_dev 568 Nov 26 14:10 krb5.conf
-rw-r--r-- 1 xiebo bigdata_dev 436 Nov 26 15:14 xiebo.keytab

 

I get an error:

 
2020-11-26 19:01:55
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:78)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1141)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1242)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1238)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:940)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: 
javax.security.auth.login.LoginException: Unable to obtain password from user

at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at 
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at 
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at 
org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:450)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaPro

[jira] [Created] (FLINK-20378) Watermark generation check TIMESTAMP_WITHOUT_TIME_ZONE

2020-11-26 Thread wxmimperio (Jira)
wxmimperio created FLINK-20378:
--

 Summary: Watermark generation check TIMESTAMP_WITHOUT_TIME_ZONE
 Key: FLINK-20378
 URL: https://issues.apache.org/jira/browse/FLINK-20378
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.11.1
Reporter: wxmimperio


 

 
{code:java}
 def generateWatermarkGenerator(
  config: TableConfig,
  inputType: RowType,
  watermarkExpr: RexNode): GeneratedWatermarkGenerator = {
// validation
val watermarkOutputType = 
FlinkTypeFactory.toLogicalType(watermarkExpr.getType)
if (watermarkOutputType.getTypeRoot != 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
  throw new CodeGenException(
"WatermarkGenerator only accepts output data type of TIMESTAMP," +
  " but is " + watermarkOutputType)
}
{code}
 

Why does watermark generation need to be detected as 
TIMESTAMP_WITHOUT_TIME_ZONE?

If I remove this check, what effect will it have on the watermark?

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20379) New Kafka Connector does not support DeserializationSchema

2020-11-26 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20379:


 Summary: New Kafka Connector does not support DeserializationSchema
 Key: FLINK-20379
 URL: https://issues.apache.org/jira/browse/FLINK-20379
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Reporter: Stephan Ewen
 Fix For: 1.12.0


The new Kafka Connector defines its own deserialization schema and is 
incompatible with the existing library of deserializers.

That means that users cannot use all of Flink's Formats (Avro, JSON, Csv, 
Protobuf, Confluent Schema Registry, ...) with the new Kafka Connector.

I think we should change the new Kafka Connector to use the existing 
Deserialization classes, so all formats can be used, and users can reuse their 
deserializer implementations.

It would also be good to use the existing KafkaDeserializationSchema. Otherwise 
all users need to migrate their sources again.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20380) Problems with creating iceberg Catalog (Catalog -type=hive) using flink-SQL shell

2020-11-26 Thread tianyu guo (Jira)
tianyu guo created FLINK-20380:
--

 Summary: Problems with creating iceberg Catalog (Catalog 
-type=hive) using flink-SQL shell
 Key: FLINK-20380
 URL: https://issues.apache.org/jira/browse/FLINK-20380
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.11.2
 Environment: cdh6.3.2 + flink1.11.2
Reporter: tianyu guo
 Attachments: image-2020-11-26-21-01-34-293.png

!image-2020-11-26-21-01-34-293.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20381) Incorrect use of yarn-session.sh -id vs -yid in log statements.

2020-11-26 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-20381:
--

 Summary: Incorrect use of yarn-session.sh -id vs -yid in log 
statements.
 Key: FLINK-20381
 URL: https://issues.apache.org/jira/browse/FLINK-20381
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.12.0
Reporter: Robert Metzger
Assignee: Robert Metzger
 Fix For: 1.12.0


The Yarn per-job modes log about the recommended shutdown of yarn, which 
doesn't work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20382) Exception thrown from JobMaster.startScheduling() may be ignored.

2020-11-26 Thread Jiangjie Qin (Jira)
Jiangjie Qin created FLINK-20382:


 Summary: Exception thrown from JobMaster.startScheduling() may be 
ignored.
 Key: FLINK-20382
 URL: https://issues.apache.org/jira/browse/FLINK-20382
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.11.2
Reporter: Jiangjie Qin


Currently {{JobMaster.resetAndStartScheduler()}} invokes {{startScheduling()}} 
in a {{thenRun}} clause without {{exceptionally}} or {{handle}} to handle 
exceptions. The job may hang if an exception is thrown when starting 
scheduling, e.g. failed to create operator coordinators. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20383) DataSet allround end-to-end test dies with NullPointerException

2020-11-26 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-20383:
--

 Summary: DataSet allround end-to-end test dies with 
NullPointerException
 Key: FLINK-20383
 URL: https://issues.apache.org/jira/browse/FLINK-20383
 Project: Flink
  Issue Type: Bug
  Components: API / DataSet
Affects Versions: 1.12.0, 1.13.0
Reporter: Robert Metzger
 Fix For: 1.12.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=10204&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=ff888d9b-cd34-53cc-d90f-3e446d355529
{code}
2020-11-26T19:45:51.5478354Z Nov 26 19:45:51 Running 'DataSet allround 
end-to-end test'
2020-11-26T19:45:51.5478763Z Nov 26 19:45:51 
==
2020-11-26T19:45:51.5490922Z Nov 26 19:45:51 TEST_DATA_DIR: 
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-51548553152
2020-11-26T19:45:51.7135919Z Nov 26 19:45:51 Flink dist directory: 
/home/vsts/work/1/s/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT
2020-11-26T19:45:51.7232126Z Nov 26 19:45:51 Run DataSet-Allround-Test Program
2020-11-26T19:45:51.7266761Z Nov 26 19:45:51 Setting up SSL with: internal JDK 
dynamic
2020-11-26T19:45:51.7300820Z Nov 26 19:45:51 Using SAN 
dns:fv-az679-746.omrnvntobknexcvwb1hiviload.fx.internal.cloudapp.net,ip:10.1.0.4,ip:172.17.0.1,ip:172.18.0.1
2020-11-26T19:45:52.8564809Z Certificate was added to keystore
2020-11-26T19:45:54.9192018Z Certificate was added to keystore
2020-11-26T19:45:55.5263075Z Certificate reply was installed in keystore
2020-11-26T19:45:55.7151814Z Nov 26 19:45:55 Setting up SSL with: rest JDK 
dynamic
2020-11-26T19:45:55.7186828Z Nov 26 19:45:55 Using SAN 
dns:fv-az679-746.omrnvntobknexcvwb1hiviload.fx.internal.cloudapp.net,ip:10.1.0.4,ip:172.17.0.1,ip:172.18.0.1
2020-11-26T19:45:56.8610453Z Certificate was added to keystore
2020-11-26T19:45:59.2956135Z Certificate was added to keystore
2020-11-26T19:45:59.9115744Z Certificate reply was installed in keystore
2020-11-26T19:46:00.0800993Z Nov 26 19:46:00 Mutual ssl auth: false
2020-11-26T19:46:00.1688426Z Nov 26 19:46:00 Starting cluster.
2020-11-26T19:46:00.8775396Z Nov 26 19:46:00 Starting standalonesession daemon 
on host fv-az679-746.
2020-11-26T19:46:02.4016699Z Nov 26 19:46:02 Starting taskexecutor daemon on 
host fv-az679-746.
2020-11-26T19:46:02.4595285Z Nov 26 19:46:02 Waiting for Dispatcher REST 
endpoint to come up...
2020-11-26T19:46:03.5161706Z Nov 26 19:46:03 Waiting for Dispatcher REST 
endpoint to come up...
2020-11-26T19:46:04.6157725Z Nov 26 19:46:04 Waiting for Dispatcher REST 
endpoint to come up...
2020-11-26T19:46:05.7153907Z Nov 26 19:46:05 Waiting for Dispatcher REST 
endpoint to come up...
2020-11-26T19:46:07.4507631Z Nov 26 19:46:07 Waiting for Dispatcher REST 
endpoint to come up...
2020-11-26T19:46:08.6436701Z Nov 26 19:46:08 Dispatcher REST endpoint is up.
2020-11-26T19:46:08.6437477Z Nov 26 19:46:08 Start 3 more task managers
2020-11-26T19:46:09.4234133Z Nov 26 19:46:09 [INFO] 1 instance(s) of 
taskexecutor are already running on fv-az679-746.
2020-11-26T19:46:09.4242548Z Nov 26 19:46:09 Starting taskexecutor daemon on 
host fv-az679-746.
2020-11-26T19:46:11.0906754Z Nov 26 19:46:11 [INFO] 2 instance(s) of 
taskexecutor are already running on fv-az679-746.
2020-11-26T19:46:11.0920876Z Nov 26 19:46:11 Starting taskexecutor daemon on 
host fv-az679-746.
2020-11-26T19:46:14.2637898Z Nov 26 19:46:14 [INFO] 3 instance(s) of 
taskexecutor are already running on fv-az679-746.
2020-11-26T19:46:14.2688442Z Nov 26 19:46:14 Starting taskexecutor daemon on 
host fv-az679-746.
2020-11-26T19:46:25.7685789Z Nov 26 19:46:25 Job has been submitted with JobID 
fa6775cf8a2ed83f2e4ba98930bc2cb2
2020-11-26T19:47:37.212Z 
2020-11-26T19:47:37.4448196Z 

2020-11-26T19:47:37.4448668Z  The program finished with the following exception:
2020-11-26T19:47:37.4448911Z 
2020-11-26T19:47:37.4453074Z 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: org.apache.flink.client.program.ProgramInvocationException: 
Job failed (JobID: fa6775cf8a2ed83f2e4ba98930bc2cb2)
2020-11-26T19:47:37.4458530Zat 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
2020-11-26T19:47:37.4459217Zat 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
2020-11-26T19:47:37.4459771Zat 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
2020-11-26T19:47:37.4460243Zat 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
2020-11-26T19:47:37.4460711Zat 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
2020-11-26T19:47:37.4461161Zat 
org.apache.flink.client.cli.CliFrontend.pars

Re: [DISCUSS] Releasing Apache Flink 1.11.3

2020-11-26 Thread Xintong Song
Hi devs,

Updates on the release progress.

There are a couple of new blockers emerged since the last update, all of
which have been resolved in the past week.

The release 1.11.3 is still blocked on FLIP-27.
Stephan and Becket are trying to first get the feature stabilized on the
master branch for release 1.12. After that, the remaining changes will be
backported to 1.11, and the release 1.11.3 will be unblocked.

We are hoping to deliver the first RC by the end of next week. During this
time, if there's any other things that you believe should be included in
this release, please reach out to either me or Gordon.

Sorry for the delay.

Thank you~

Xintong Song



On Mon, Nov 23, 2020 at 2:03 PM Xintong Song  wrote:

> Thanks for the notice, Zhu.
> Let's have the discussion on the jira ticket.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 23, 2020 at 12:10 PM Zhu Zhu  wrote:
>
>> Hi Xintong,
>> We just found an issue FLINK-20285 and I think it is a blocker for 1.11.3.
>>
>> Thanks,
>> Zhu
>>
>> Gyula Fóra  于2020年11月18日周三 下午9:36写道:
>>
>> > Hi All!
>> >
>> > I have found the following issue today which might be considered a
>> blocker
>> > for this release as well:
>> >
>> > https://issues.apache.org/jira/browse/FLINK-20221
>> >
>> > Could someone please quickly provide a second set of eyes and confirm
>> that
>> > this is indeed a big problem? :)
>> >
>> > Thank you!
>> > Gyula
>> >
>> > On Wed, Nov 18, 2020 at 5:12 AM Xintong Song 
>> > wrote:
>> >
>> > > Hi devs,
>> > >
>> > > Updates on the progress of preparing the 1.11.3 release.
>> > >
>> > > We are approaching the creation of our first release candidate. Thanks
>> > all
>> > > for the efforts so far.
>> > >
>> > > # Ongoing threads
>> > >
>> > >- *FLIP-27 backportings:* 74 out of 102 commits are already
>> > backported.
>> > >Stephan and Becket are actively working on this.
>> > >- *Blockers:* There are 2 remaining blockers, both have been fixed
>> on
>> > >the master branch and should be backported along with the FLIP-27
>> > > changes.
>> > >
>> > > As soon as the above threads are done, we will create our first
>> release
>> > > candidate. If there's any other issues that you believe should be a
>> > release
>> > > blocker, please reach out to either me or Gordon.
>> > >
>> > > # JIRA version
>> > >
>> > > Version 1.11.4 has been created on JIRA. I've already tagged some of
>> the
>> > > tickets with the new fix version. Please feel free to continue with
>> the
>> > > efforts and merge once they are ready. I'll double check and update
>> the
>> > fix
>> > > versions before the release.
>> > >
>> > > Thank you~
>> > >
>> > > Xintong Song
>> > >
>> > >
>> > >
>> > > On Thu, Nov 12, 2020 at 1:31 PM Xintong Song 
>> > > wrote:
>> > >
>> > > > Thanks for the notice and fix, Roman.
>> > > >
>> > > > Thank you~
>> > > >
>> > > > Xintong Song
>> > > >
>> > > >
>> > > >
>> > > > On Wed, Nov 11, 2020 at 5:53 PM Khachatryan Roman <
>> > > > khachatryan.ro...@gmail.com> wrote:
>> > > >
>> > > >> Hi,
>> > > >>
>> > > >> I'd like FLINK-20079 [1] to be merged into 1.11 and included in
>> > 1.11.3.
>> > > >>
>> > > >> [1] https://issues.apache.org/jira/browse/FLINK-20079
>> > > >>
>> > > >> Regards,
>> > > >> Roman
>> > > >>
>> > > >>
>> > > >> On Tue, Nov 10, 2020 at 8:21 AM Xintong Song <
>> tonysong...@gmail.com>
>> > > >> wrote:
>> > > >>
>> > > >> > Thanks for the notice, Dian.
>> > > >> >
>> > > >> > Thank you~
>> > > >> >
>> > > >> > Xintong Song
>> > > >> >
>> > > >> >
>> > > >> >
>> > > >> > On Tue, Nov 10, 2020 at 10:19 AM Dian Fu 
>> > > wrote:
>> > > >> >
>> > > >> > > Hi Xintong,
>> > > >> > >
>> > > >> > > I want to bring one more issue to your attention [1]. The test
>> > case
>> > > >> > > UnalignedCheckpointCompatibilityITCase.test failed several
>> times
>> > in
>> > > >> the
>> > > >> > > last nightly test of release-1.11. We need to figure out if
>> it's
>> > > just
>> > > >> an
>> > > >> > > instable test or caused by recent changes.
>> > > >> > >
>> > > >> > > [1] https://issues.apache.org/jira/browse/FLINK-20065
>> > > >> > >
>> > > >> > > > 在 2020年11月10日,上午9:24,Xintong Song 
>> 写道:
>> > > >> > > >
>> > > >> > > > Thanks for the replies.
>> > > >> > > >
>> > > >> > > > Thank you~
>> > > >> > > >
>> > > >> > > > Xintong Song
>> > > >> > > >
>> > > >> > > >
>> > > >> > > >
>> > > >> > > > On Tue, Nov 10, 2020 at 1:09 AM Becket Qin <
>> > becket@gmail.com>
>> > > >> > wrote:
>> > > >> > > >
>> > > >> > > >> Hi Xintong,
>> > > >> > > >>
>> > > >> > > >> Thanks for driving the release. Just want to sync up on the
>> > > FLIP-27
>> > > >> > > >> backporting. Stephan and I are still trying to backport a
>> bunch
>> > > of
>> > > >> > > patches
>> > > >> > > >> of Source to 1.11.3. Including:
>> > > >> > > >>
>> > > >> > > >> [FLINK-19698][connector/common] Add a close() method to the
>> > > >> > SplitReader.
>> > > >> > > >> [FLINK-19717] SourceReaderBase.pollNext may return
>> END_OF_INPUT
>> > > if
>> > > >> > > >> Spli

[jira] [Created] (FLINK-20384) Broken Link in deployment/ha/kubernetes_ha.zh.md

2020-11-26 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-20384:


 Summary: Broken Link in deployment/ha/kubernetes_ha.zh.md
 Key: FLINK-20384
 URL: https://issues.apache.org/jira/browse/FLINK-20384
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes, Documentation
Affects Versions: 1.12.0, 1.13.0
Reporter: Huang Xingbo


When executing the script build_docs.sh, it will throw the following exception:
{code:java}
Liquid Exception: Could not find document 
'deployment/resource-providers/standalone/kubernetes.md' in tag 'link'. Make 
sure the document exists and the path is correct. in 
deployment/ha/kubernetes_ha.zh.md Could not find document 
'deployment/resource-providers/standalone/kubernetes.md' in tag 'link'.
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20385) Allow to read metadata for Canal-json format

2020-11-26 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-20385:
--

 Summary: Allow to read metadata for Canal-json format
 Key: FLINK-20385
 URL: https://issues.apache.org/jira/browse/FLINK-20385
 Project: Flink
  Issue Type: New Feature
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
SQL / API
Reporter: Leonard Xu


In FLIP-107, we support read meta from CDC format Debezium, Canal-json is also

another widely used CDC format , we can support read it.

 

The requirement comes from user-zh mail list, the user want to read meta 
information(database table name) from canal-json.

[1] http://apache-flink.147419.n8.nabble.com/canal-json-tt8939.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20386) ClassCastException when lookup join a JDBC table on INT UNSIGNED column

2020-11-26 Thread Jark Wu (Jira)
Jark Wu created FLINK-20386:
---

 Summary: ClassCastException when lookup join a JDBC table on INT 
UNSIGNED column
 Key: FLINK-20386
 URL: https://issues.apache.org/jira/browse/FLINK-20386
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC, Table SQL / Ecosystem
Affects Versions: 1.11.2, 1.12.0
Reporter: Jark Wu


The primary key of the MySQL is an INT UNSIGNED column, but declared INT in 
Flink. 
I know the 
[docs|https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#data-type-mapping]
 say it should be decalred BIGINT in Flink, however, would be better not fail 
the job. 

{code}
java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer
at 
org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) 
~[flink-table-blink_2.11-1.11-vvr-2.1.1-SNAPSHOT.jar:1.11-vvr-2.1.1-SNAPSHOT]
at JoinTableFuncCollector$6460.collect(Unknown Source) ~[?:?]
at 
org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:203) 
~[flink-table-blink_2.11-1.11-vvr-2.1.1-SNAPSHOT.jar:1.11-vvr-2.1.1-SNAPSHOT]
at 
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:162)
 ~[?:?]
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20387) Support column of TIMESTAMP WITH LOCAL ZONE TIME type as rowtime attribute

2020-11-26 Thread Jark Wu (Jira)
Jark Wu created FLINK-20387:
---

 Summary: Support column of TIMESTAMP WITH LOCAL ZONE TIME type as 
rowtime attribute
 Key: FLINK-20387
 URL: https://issues.apache.org/jira/browse/FLINK-20387
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API, Table SQL / Planner
Reporter: Jark Wu


Currently, only {{TIMESTAMP}} type can be used as rowtime attribute. Would be 
better to support {{TIMESTAMP WITH LOCAL ZONE TIME}} as well.

As a workaround, users can cast the TIMESTAMP WITH LOCAL ZONE TIME  into 
TIMESTAMP, {{CAST(ts AS TIMESTAMP)}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20388) Supports users setting operators' metrics name

2020-11-26 Thread hailong wang (Jira)
hailong wang created FLINK-20388:


 Summary: Supports users setting operators' metrics name
 Key: FLINK-20388
 URL: https://issues.apache.org/jira/browse/FLINK-20388
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream, Runtime / Metrics
Reporter: hailong wang


Currently, we only support users setting operators name.

And we use those in the topology to distinguish operators, at the same time,  
as the operator metrics name.

If the operator name length is larger than 80, we truncate it simply.

I think we can allow users to set operator metrics name like operators name. If 
the user is not set, use the current way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20389) UnalignedCheckpointITCase failure caused by NullPointerException

2020-11-26 Thread Matthias (Jira)
Matthias created FLINK-20389:


 Summary: UnalignedCheckpointITCase failure caused by 
NullPointerException
 Key: FLINK-20389
 URL: https://issues.apache.org/jira/browse/FLINK-20389
 Project: Flink
  Issue Type: Test
  Components: Runtime / Checkpointing
Affects Versions: 1.12.0
Reporter: Matthias


[Build|https://dev.azure.com/mapohl/flink/_build/results?buildId=118&view=results]
 failed due to {{UnalignedCheckpointITCase}} caused by a 
{{NullPointerException}}:
{code:java}
Test execute[Parallel cogroup, p = 
10](org.apache.flink.test.checkpointing.UnalignedCheckpointITCase) failed with:
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5, 
backoffTimeMS=100)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInter