[jira] [Created] (FLINK-20368) Supports custom operator name for Flink SQL
Danny Chen created FLINK-20368: -- Summary: Supports custom operator name for Flink SQL Key: FLINK-20368 URL: https://issues.apache.org/jira/browse/FLINK-20368 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.12.0 Reporter: Danny Chen A request from USER mailing list from Kevin Kwon: For SQLs, I know that the operator ID assignment is not possible now since the query optimizer may not be backward compatible in each release But are DDLs also affected by this? for example, CREATE TABLE mytable ( id BIGINT, data STRING ) with ( connector = 'kafka' ... id = 'mytable' name = 'mytable' ) and we can save all related checkpoint data -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20369) Improve the digest of TableSourceScan and Sink node
Jark Wu created FLINK-20369: --- Summary: Improve the digest of TableSourceScan and Sink node Key: FLINK-20369 URL: https://issues.apache.org/jira/browse/FLINK-20369 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Jark Wu Currently, 1. the digest of {{TableSourceScan}} and {{Sink}} doesn't contain the connector information which will be quite useful when debugging. 2. The table name is quite verbose when under default catalog and database, would be better to simplify it to only table name if under default catalog and database. 3. Maybe it's nicer to have changelog mode of source and sink, because it's a meta information of {{DynamicTableSource/Sink#getChangelogMode}}. {code} Sink(table=[default_catalog.default_database.sink_kafka_count_city], fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) +- Calc(select=[city_name, CAST(count_customer) AS count_customer, CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, count_customer, sum_gender, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS sum_gender], changelogMode=[I,UA,D]) : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], changelogMode=[I]) : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) : +- ChangelogNormalize(key=[customer_id], changelogMode=[I,UB,UA,D]) : +- Exchange(distribution=[hash[customer_id]], changelogMode=[UA,D]) :+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], changelogMode=[UA,D]) : +- TableSourceScan(table=[[default_catalog, default_database, source_customer]], fields=[customer_id, city_id, age, gender, update_time], changelogMode=[UA,D]) +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) +- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], changelogMode=[UA,D]) +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20370) Result is wrong when sink primary key is not the same with query
Jark Wu created FLINK-20370: --- Summary: Result is wrong when sink primary key is not the same with query Key: FLINK-20370 URL: https://issues.apache.org/jira/browse/FLINK-20370 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Jark Wu Both sources are upsert-kafka which synchronizes the changes from MySQL tables (source_city, source_customer). The sink is another MySQL table which is in upsert mode with "city_name" primary key. The join key is "city_id". In this case, the result will be wrong when updating {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored and the old city_name is retained in the sink table. {code} Sink(table=[default_catalog.default_database.sink_kafka_count_city], fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) +- Calc(select=[city_name, CAST(count_customer) AS count_customer, CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, count_customer, sum_gender, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS sum_gender], changelogMode=[I,UA,D]) : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], changelogMode=[I]) : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) : +- ChangelogNormalize(key=[customer_id], changelogMode=[I,UB,UA,D]) : +- Exchange(distribution=[hash[customer_id]], changelogMode=[UA,D]) :+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], changelogMode=[UA,D]) : +- TableSourceScan(table=[[default_catalog, default_database, source_customer]], fields=[customer_id, city_id, age, gender, update_time], changelogMode=[UA,D]) +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) +- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], changelogMode=[UA,D]) +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20372) flink sql kafka connector cannot config kerberos
谢波 created FLINK-20372: -- Summary: flink sql kafka connector cannot config kerberos Key: FLINK-20372 URL: https://issues.apache.org/jira/browse/FLINK-20372 Project: Flink Issue Type: Bug Components: Table SQL / Ecosystem Affects Versions: 1.11.2 Reporter: 谢波 Fix For: 1.12.0 I tried to configure kerberos on flink sql kafka connector, but i found that the connector did not provide options. eg. security.protocol, {{sasl.kerberos.service.name, sasl.mechanism.}} {{}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20371) Add docs for outer interval join
Timo Walther created FLINK-20371: Summary: Add docs for outer interval join Key: FLINK-20371 URL: https://issues.apache.org/jira/browse/FLINK-20371 Project: Flink Issue Type: Improvement Components: Documentation, Table SQL / API Reporter: Timo Walther Assignee: Timo Walther By looking at the docs, it looks like we only support inner interval joins but we also support outer joins according to the tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20373) Flink table jsonArray access all
HideOnBush created FLINK-20373: -- Summary: Flink table jsonArray access all Key: FLINK-20373 URL: https://issues.apache.org/jira/browse/FLINK-20373 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.11.2 Reporter: HideOnBush Fix For: 1.11.3 The official jsonArray is provided, and the array is also provided to access Row elements based on the subscript. Should we also consider the length of each jsonArray, and if the subscript is passed, the code will become longer -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20374) Wrong result when shuffling changelog stream on non-primary-key columns
Jark Wu created FLINK-20374: --- Summary: Wrong result when shuffling changelog stream on non-primary-key columns Key: FLINK-20374 URL: https://issues.apache.org/jira/browse/FLINK-20374 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Jark Wu This is reported from user-zh ML: http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html {code:sql} CREATE TABLE test ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 'table-name' = 'test' ) CREATE TABLE status ( `id` INT, `name` VARCHAR(255), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 'table-name' = 'status' ); -- output CREATE TABLE test_status ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, `status_name` VARCHAR(255) PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'xxx', 'index' = 'xxx', 'username' = 'xxx', 'password' = 'xxx', 'sink.bulk-flush.backoff.max-retries' = '10', 'sink.bulk-flush.backoff.strategy' = 'CONSTANT', 'sink.bulk-flush.max-actions' = '5000', 'sink.bulk-flush.max-size' = '10mb', 'sink.bulk-flush.interval' = '1s' ); INSERT into test_status SELECT t.*, s.name FROM test AS t LEFT JOIN status AS s ON t.status = s.id; {code} Data in mysql table: {code} test: 0, name0, 2020-07-06 00:00:00 , 0 1, name1, 2020-07-06 00:00:00 , 1 2, name2, 2020-07-06 00:00:00 , 1 . status 0, status0 1, status1 2, status2 . {code} Operations: 1. start job with paralleslim=40, result in test_status sink is correct: {code} 0, name0, 2020-07-06 00:00:00 , 0, status0 1, name1, 2020-07-06 00:00:00 , 1, status1 2, name2, 2020-07-06 00:00:00 , 1, status1 {code} 2. Update {{status}} of {{id=2}} record in table {{test}} from {{1}} to {{2}}. 3. Result is not correct because the {{id=2}} record is missing in the result. The reason is that it shuffles the changelog {{test}} on {{status}} column which is not the primary key. Therefore, the ordering can't be guaranteed, and the result is wrong. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20375) Add a
hailong wang created FLINK-20375: Summary: Add a Key: FLINK-20375 URL: https://issues.apache.org/jira/browse/FLINK-20375 Project: Flink Issue Type: Improvement Reporter: hailong wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20376) Error in restoring savepoint when Flink is upgraded from 1.9 to 1.11.2
Partha Pradeep Mishra created FLINK-20376: - Summary: Error in restoring savepoint when Flink is upgraded from 1.9 to 1.11.2 Key: FLINK-20376 URL: https://issues.apache.org/jira/browse/FLINK-20376 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Reporter: Partha Pradeep Mishra We tried to save checkpoints for one of the flink job (1.9 version) and then import/restore the checkpoints in the newer flink version (1.11.2). The import/resume operation failed with the below error. Please note that both the jobs(i.e. one running in 1.9 and other in 1.11.2) are same binary with no code difference or introduction of new operators. Still we got the below issue. _Cannot map checkpoint/savepoint state for operator fbb4ef531e002f8fb3a2052db255adf5 to the new program, because the operator is not available in the new program._ *Complete Stack Trace :* {"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application.\n\tat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tat java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)\n\t... 7 more\nCaused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application.\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\t... 7 more\nCaused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\t... 10 more\nCaused by: org.apache.flink.util.FlinkException: Failed to execute job 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821)\n\tat org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)\n\tat org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n\tat org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)\n\tat org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)\n\tat com.man.ceon.cep.jobs.AnalyticService$.main(AnalyticService.scala:108)\n\tat com.man.ceon.cep.jobs.AnalyticService.main(AnalyticService.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat org.apache.flink.client.p
[jira] [Created] (FLINK-20377) flink-1.11.2 -kerberos config on kafka connector not working
谢波 created FLINK-20377: -- Summary: flink-1.11.2 -kerberos config on kafka connector not working Key: FLINK-20377 URL: https://issues.apache.org/jira/browse/FLINK-20377 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Table SQL / Ecosystem Affects Versions: 1.11.2 Environment: flink on yarn kafka with kerberos flink-1.11.2_2.11 Reporter: 谢波 Fix For: 1.12.0 I refer to the configuration on the official website to configure Kafka and flink-conf.yaml ,but the configuration does not work. my table config : WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '', 'topic' = 'kafka_hepecc_ekko_cut_json', 'properties.group.id' = 'ekko.group', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.kerberos.service.name' = 'kafka', -- 'properties.sasl.mechanism' = 'GSSAPI', 'format' = 'json' ); yaml: security.kerberos.login.use-ticket-cache: false security.kerberos.login.keytab: /home/xiebo/module/flink/keytab/xiebo.keytab security.kerberos.login.principal: xi...@yonghui.cn # The configuration below defines which JAAS login contexts security.kerberos.login.contexts: Client,KafkaClient dir content: [xiebo@ww021 keytab]$ pwd /home/xiebo/module/flink/keytab [xiebo@ww021 keytab]$ ll total 12 -rw-r--r-- 1 xiebo bigdata_dev 486 Nov 26 18:15 kafka_client_jaas.conf -rw-r--r-- 1 xiebo bigdata_dev 568 Nov 26 14:10 krb5.conf -rw-r--r-- 1 xiebo bigdata_dev 436 Nov 26 15:14 xiebo.keytab I get an error: 2020-11-26 19:01:55 org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:78) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1242) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1238) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:940) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Unable to obtain password from user at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146) at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99) at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:450) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaPro
[jira] [Created] (FLINK-20378) Watermark generation check TIMESTAMP_WITHOUT_TIME_ZONE
wxmimperio created FLINK-20378: -- Summary: Watermark generation check TIMESTAMP_WITHOUT_TIME_ZONE Key: FLINK-20378 URL: https://issues.apache.org/jira/browse/FLINK-20378 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.11.1 Reporter: wxmimperio {code:java} def generateWatermarkGenerator( config: TableConfig, inputType: RowType, watermarkExpr: RexNode): GeneratedWatermarkGenerator = { // validation val watermarkOutputType = FlinkTypeFactory.toLogicalType(watermarkExpr.getType) if (watermarkOutputType.getTypeRoot != LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { throw new CodeGenException( "WatermarkGenerator only accepts output data type of TIMESTAMP," + " but is " + watermarkOutputType) } {code} Why does watermark generation need to be detected as TIMESTAMP_WITHOUT_TIME_ZONE? If I remove this check, what effect will it have on the watermark? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20379) New Kafka Connector does not support DeserializationSchema
Stephan Ewen created FLINK-20379: Summary: New Kafka Connector does not support DeserializationSchema Key: FLINK-20379 URL: https://issues.apache.org/jira/browse/FLINK-20379 Project: Flink Issue Type: Bug Components: Connectors / Kafka Reporter: Stephan Ewen Fix For: 1.12.0 The new Kafka Connector defines its own deserialization schema and is incompatible with the existing library of deserializers. That means that users cannot use all of Flink's Formats (Avro, JSON, Csv, Protobuf, Confluent Schema Registry, ...) with the new Kafka Connector. I think we should change the new Kafka Connector to use the existing Deserialization classes, so all formats can be used, and users can reuse their deserializer implementations. It would also be good to use the existing KafkaDeserializationSchema. Otherwise all users need to migrate their sources again. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20380) Problems with creating iceberg Catalog (Catalog -type=hive) using flink-SQL shell
tianyu guo created FLINK-20380: -- Summary: Problems with creating iceberg Catalog (Catalog -type=hive) using flink-SQL shell Key: FLINK-20380 URL: https://issues.apache.org/jira/browse/FLINK-20380 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.11.2 Environment: cdh6.3.2 + flink1.11.2 Reporter: tianyu guo Attachments: image-2020-11-26-21-01-34-293.png !image-2020-11-26-21-01-34-293.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20381) Incorrect use of yarn-session.sh -id vs -yid in log statements.
Robert Metzger created FLINK-20381: -- Summary: Incorrect use of yarn-session.sh -id vs -yid in log statements. Key: FLINK-20381 URL: https://issues.apache.org/jira/browse/FLINK-20381 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.12.0 Reporter: Robert Metzger Assignee: Robert Metzger Fix For: 1.12.0 The Yarn per-job modes log about the recommended shutdown of yarn, which doesn't work. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20382) Exception thrown from JobMaster.startScheduling() may be ignored.
Jiangjie Qin created FLINK-20382: Summary: Exception thrown from JobMaster.startScheduling() may be ignored. Key: FLINK-20382 URL: https://issues.apache.org/jira/browse/FLINK-20382 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.11.2 Reporter: Jiangjie Qin Currently {{JobMaster.resetAndStartScheduler()}} invokes {{startScheduling()}} in a {{thenRun}} clause without {{exceptionally}} or {{handle}} to handle exceptions. The job may hang if an exception is thrown when starting scheduling, e.g. failed to create operator coordinators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20383) DataSet allround end-to-end test dies with NullPointerException
Robert Metzger created FLINK-20383: -- Summary: DataSet allround end-to-end test dies with NullPointerException Key: FLINK-20383 URL: https://issues.apache.org/jira/browse/FLINK-20383 Project: Flink Issue Type: Bug Components: API / DataSet Affects Versions: 1.12.0, 1.13.0 Reporter: Robert Metzger Fix For: 1.12.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=10204&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=ff888d9b-cd34-53cc-d90f-3e446d355529 {code} 2020-11-26T19:45:51.5478354Z Nov 26 19:45:51 Running 'DataSet allround end-to-end test' 2020-11-26T19:45:51.5478763Z Nov 26 19:45:51 == 2020-11-26T19:45:51.5490922Z Nov 26 19:45:51 TEST_DATA_DIR: /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-51548553152 2020-11-26T19:45:51.7135919Z Nov 26 19:45:51 Flink dist directory: /home/vsts/work/1/s/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT 2020-11-26T19:45:51.7232126Z Nov 26 19:45:51 Run DataSet-Allround-Test Program 2020-11-26T19:45:51.7266761Z Nov 26 19:45:51 Setting up SSL with: internal JDK dynamic 2020-11-26T19:45:51.7300820Z Nov 26 19:45:51 Using SAN dns:fv-az679-746.omrnvntobknexcvwb1hiviload.fx.internal.cloudapp.net,ip:10.1.0.4,ip:172.17.0.1,ip:172.18.0.1 2020-11-26T19:45:52.8564809Z Certificate was added to keystore 2020-11-26T19:45:54.9192018Z Certificate was added to keystore 2020-11-26T19:45:55.5263075Z Certificate reply was installed in keystore 2020-11-26T19:45:55.7151814Z Nov 26 19:45:55 Setting up SSL with: rest JDK dynamic 2020-11-26T19:45:55.7186828Z Nov 26 19:45:55 Using SAN dns:fv-az679-746.omrnvntobknexcvwb1hiviload.fx.internal.cloudapp.net,ip:10.1.0.4,ip:172.17.0.1,ip:172.18.0.1 2020-11-26T19:45:56.8610453Z Certificate was added to keystore 2020-11-26T19:45:59.2956135Z Certificate was added to keystore 2020-11-26T19:45:59.9115744Z Certificate reply was installed in keystore 2020-11-26T19:46:00.0800993Z Nov 26 19:46:00 Mutual ssl auth: false 2020-11-26T19:46:00.1688426Z Nov 26 19:46:00 Starting cluster. 2020-11-26T19:46:00.8775396Z Nov 26 19:46:00 Starting standalonesession daemon on host fv-az679-746. 2020-11-26T19:46:02.4016699Z Nov 26 19:46:02 Starting taskexecutor daemon on host fv-az679-746. 2020-11-26T19:46:02.4595285Z Nov 26 19:46:02 Waiting for Dispatcher REST endpoint to come up... 2020-11-26T19:46:03.5161706Z Nov 26 19:46:03 Waiting for Dispatcher REST endpoint to come up... 2020-11-26T19:46:04.6157725Z Nov 26 19:46:04 Waiting for Dispatcher REST endpoint to come up... 2020-11-26T19:46:05.7153907Z Nov 26 19:46:05 Waiting for Dispatcher REST endpoint to come up... 2020-11-26T19:46:07.4507631Z Nov 26 19:46:07 Waiting for Dispatcher REST endpoint to come up... 2020-11-26T19:46:08.6436701Z Nov 26 19:46:08 Dispatcher REST endpoint is up. 2020-11-26T19:46:08.6437477Z Nov 26 19:46:08 Start 3 more task managers 2020-11-26T19:46:09.4234133Z Nov 26 19:46:09 [INFO] 1 instance(s) of taskexecutor are already running on fv-az679-746. 2020-11-26T19:46:09.4242548Z Nov 26 19:46:09 Starting taskexecutor daemon on host fv-az679-746. 2020-11-26T19:46:11.0906754Z Nov 26 19:46:11 [INFO] 2 instance(s) of taskexecutor are already running on fv-az679-746. 2020-11-26T19:46:11.0920876Z Nov 26 19:46:11 Starting taskexecutor daemon on host fv-az679-746. 2020-11-26T19:46:14.2637898Z Nov 26 19:46:14 [INFO] 3 instance(s) of taskexecutor are already running on fv-az679-746. 2020-11-26T19:46:14.2688442Z Nov 26 19:46:14 Starting taskexecutor daemon on host fv-az679-746. 2020-11-26T19:46:25.7685789Z Nov 26 19:46:25 Job has been submitted with JobID fa6775cf8a2ed83f2e4ba98930bc2cb2 2020-11-26T19:47:37.212Z 2020-11-26T19:47:37.4448196Z 2020-11-26T19:47:37.4448668Z The program finished with the following exception: 2020-11-26T19:47:37.4448911Z 2020-11-26T19:47:37.4453074Z org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: fa6775cf8a2ed83f2e4ba98930bc2cb2) 2020-11-26T19:47:37.4458530Zat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330) 2020-11-26T19:47:37.4459217Zat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) 2020-11-26T19:47:37.4459771Zat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 2020-11-26T19:47:37.4460243Zat org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743) 2020-11-26T19:47:37.4460711Zat org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242) 2020-11-26T19:47:37.4461161Zat org.apache.flink.client.cli.CliFrontend.pars
Re: [DISCUSS] Releasing Apache Flink 1.11.3
Hi devs, Updates on the release progress. There are a couple of new blockers emerged since the last update, all of which have been resolved in the past week. The release 1.11.3 is still blocked on FLIP-27. Stephan and Becket are trying to first get the feature stabilized on the master branch for release 1.12. After that, the remaining changes will be backported to 1.11, and the release 1.11.3 will be unblocked. We are hoping to deliver the first RC by the end of next week. During this time, if there's any other things that you believe should be included in this release, please reach out to either me or Gordon. Sorry for the delay. Thank you~ Xintong Song On Mon, Nov 23, 2020 at 2:03 PM Xintong Song wrote: > Thanks for the notice, Zhu. > Let's have the discussion on the jira ticket. > > Thank you~ > > Xintong Song > > > > On Mon, Nov 23, 2020 at 12:10 PM Zhu Zhu wrote: > >> Hi Xintong, >> We just found an issue FLINK-20285 and I think it is a blocker for 1.11.3. >> >> Thanks, >> Zhu >> >> Gyula Fóra 于2020年11月18日周三 下午9:36写道: >> >> > Hi All! >> > >> > I have found the following issue today which might be considered a >> blocker >> > for this release as well: >> > >> > https://issues.apache.org/jira/browse/FLINK-20221 >> > >> > Could someone please quickly provide a second set of eyes and confirm >> that >> > this is indeed a big problem? :) >> > >> > Thank you! >> > Gyula >> > >> > On Wed, Nov 18, 2020 at 5:12 AM Xintong Song >> > wrote: >> > >> > > Hi devs, >> > > >> > > Updates on the progress of preparing the 1.11.3 release. >> > > >> > > We are approaching the creation of our first release candidate. Thanks >> > all >> > > for the efforts so far. >> > > >> > > # Ongoing threads >> > > >> > >- *FLIP-27 backportings:* 74 out of 102 commits are already >> > backported. >> > >Stephan and Becket are actively working on this. >> > >- *Blockers:* There are 2 remaining blockers, both have been fixed >> on >> > >the master branch and should be backported along with the FLIP-27 >> > > changes. >> > > >> > > As soon as the above threads are done, we will create our first >> release >> > > candidate. If there's any other issues that you believe should be a >> > release >> > > blocker, please reach out to either me or Gordon. >> > > >> > > # JIRA version >> > > >> > > Version 1.11.4 has been created on JIRA. I've already tagged some of >> the >> > > tickets with the new fix version. Please feel free to continue with >> the >> > > efforts and merge once they are ready. I'll double check and update >> the >> > fix >> > > versions before the release. >> > > >> > > Thank you~ >> > > >> > > Xintong Song >> > > >> > > >> > > >> > > On Thu, Nov 12, 2020 at 1:31 PM Xintong Song >> > > wrote: >> > > >> > > > Thanks for the notice and fix, Roman. >> > > > >> > > > Thank you~ >> > > > >> > > > Xintong Song >> > > > >> > > > >> > > > >> > > > On Wed, Nov 11, 2020 at 5:53 PM Khachatryan Roman < >> > > > khachatryan.ro...@gmail.com> wrote: >> > > > >> > > >> Hi, >> > > >> >> > > >> I'd like FLINK-20079 [1] to be merged into 1.11 and included in >> > 1.11.3. >> > > >> >> > > >> [1] https://issues.apache.org/jira/browse/FLINK-20079 >> > > >> >> > > >> Regards, >> > > >> Roman >> > > >> >> > > >> >> > > >> On Tue, Nov 10, 2020 at 8:21 AM Xintong Song < >> tonysong...@gmail.com> >> > > >> wrote: >> > > >> >> > > >> > Thanks for the notice, Dian. >> > > >> > >> > > >> > Thank you~ >> > > >> > >> > > >> > Xintong Song >> > > >> > >> > > >> > >> > > >> > >> > > >> > On Tue, Nov 10, 2020 at 10:19 AM Dian Fu >> > > wrote: >> > > >> > >> > > >> > > Hi Xintong, >> > > >> > > >> > > >> > > I want to bring one more issue to your attention [1]. The test >> > case >> > > >> > > UnalignedCheckpointCompatibilityITCase.test failed several >> times >> > in >> > > >> the >> > > >> > > last nightly test of release-1.11. We need to figure out if >> it's >> > > just >> > > >> an >> > > >> > > instable test or caused by recent changes. >> > > >> > > >> > > >> > > [1] https://issues.apache.org/jira/browse/FLINK-20065 >> > > >> > > >> > > >> > > > 在 2020年11月10日,上午9:24,Xintong Song >> 写道: >> > > >> > > > >> > > >> > > > Thanks for the replies. >> > > >> > > > >> > > >> > > > Thank you~ >> > > >> > > > >> > > >> > > > Xintong Song >> > > >> > > > >> > > >> > > > >> > > >> > > > >> > > >> > > > On Tue, Nov 10, 2020 at 1:09 AM Becket Qin < >> > becket@gmail.com> >> > > >> > wrote: >> > > >> > > > >> > > >> > > >> Hi Xintong, >> > > >> > > >> >> > > >> > > >> Thanks for driving the release. Just want to sync up on the >> > > FLIP-27 >> > > >> > > >> backporting. Stephan and I are still trying to backport a >> bunch >> > > of >> > > >> > > patches >> > > >> > > >> of Source to 1.11.3. Including: >> > > >> > > >> >> > > >> > > >> [FLINK-19698][connector/common] Add a close() method to the >> > > >> > SplitReader. >> > > >> > > >> [FLINK-19717] SourceReaderBase.pollNext may return >> END_OF_INPUT >> > > if >> > > >> > > >> Spli
[jira] [Created] (FLINK-20384) Broken Link in deployment/ha/kubernetes_ha.zh.md
Huang Xingbo created FLINK-20384: Summary: Broken Link in deployment/ha/kubernetes_ha.zh.md Key: FLINK-20384 URL: https://issues.apache.org/jira/browse/FLINK-20384 Project: Flink Issue Type: Bug Components: Deployment / Kubernetes, Documentation Affects Versions: 1.12.0, 1.13.0 Reporter: Huang Xingbo When executing the script build_docs.sh, it will throw the following exception: {code:java} Liquid Exception: Could not find document 'deployment/resource-providers/standalone/kubernetes.md' in tag 'link'. Make sure the document exists and the path is correct. in deployment/ha/kubernetes_ha.zh.md Could not find document 'deployment/resource-providers/standalone/kubernetes.md' in tag 'link'. {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20385) Allow to read metadata for Canal-json format
Leonard Xu created FLINK-20385: -- Summary: Allow to read metadata for Canal-json format Key: FLINK-20385 URL: https://issues.apache.org/jira/browse/FLINK-20385 Project: Flink Issue Type: New Feature Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / API Reporter: Leonard Xu In FLIP-107, we support read meta from CDC format Debezium, Canal-json is also another widely used CDC format , we can support read it. The requirement comes from user-zh mail list, the user want to read meta information(database table name) from canal-json. [1] http://apache-flink.147419.n8.nabble.com/canal-json-tt8939.html -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20386) ClassCastException when lookup join a JDBC table on INT UNSIGNED column
Jark Wu created FLINK-20386: --- Summary: ClassCastException when lookup join a JDBC table on INT UNSIGNED column Key: FLINK-20386 URL: https://issues.apache.org/jira/browse/FLINK-20386 Project: Flink Issue Type: Bug Components: Connectors / JDBC, Table SQL / Ecosystem Affects Versions: 1.11.2, 1.12.0 Reporter: Jark Wu The primary key of the MySQL is an INT UNSIGNED column, but declared INT in Flink. I know the [docs|https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#data-type-mapping] say it should be decalred BIGINT in Flink, however, would be better not fail the job. {code} java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer at org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) ~[flink-table-blink_2.11-1.11-vvr-2.1.1-SNAPSHOT.jar:1.11-vvr-2.1.1-SNAPSHOT] at JoinTableFuncCollector$6460.collect(Unknown Source) ~[?:?] at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:203) ~[flink-table-blink_2.11-1.11-vvr-2.1.1-SNAPSHOT.jar:1.11-vvr-2.1.1-SNAPSHOT] at org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:162) ~[?:?] {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20387) Support column of TIMESTAMP WITH LOCAL ZONE TIME type as rowtime attribute
Jark Wu created FLINK-20387: --- Summary: Support column of TIMESTAMP WITH LOCAL ZONE TIME type as rowtime attribute Key: FLINK-20387 URL: https://issues.apache.org/jira/browse/FLINK-20387 Project: Flink Issue Type: New Feature Components: Table SQL / API, Table SQL / Planner Reporter: Jark Wu Currently, only {{TIMESTAMP}} type can be used as rowtime attribute. Would be better to support {{TIMESTAMP WITH LOCAL ZONE TIME}} as well. As a workaround, users can cast the TIMESTAMP WITH LOCAL ZONE TIME into TIMESTAMP, {{CAST(ts AS TIMESTAMP)}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20388) Supports users setting operators' metrics name
hailong wang created FLINK-20388: Summary: Supports users setting operators' metrics name Key: FLINK-20388 URL: https://issues.apache.org/jira/browse/FLINK-20388 Project: Flink Issue Type: Improvement Components: API / DataStream, Runtime / Metrics Reporter: hailong wang Currently, we only support users setting operators name. And we use those in the topology to distinguish operators, at the same time, as the operator metrics name. If the operator name length is larger than 80, we truncate it simply. I think we can allow users to set operator metrics name like operators name. If the user is not set, use the current way. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20389) UnalignedCheckpointITCase failure caused by NullPointerException
Matthias created FLINK-20389: Summary: UnalignedCheckpointITCase failure caused by NullPointerException Key: FLINK-20389 URL: https://issues.apache.org/jira/browse/FLINK-20389 Project: Flink Issue Type: Test Components: Runtime / Checkpointing Affects Versions: 1.12.0 Reporter: Matthias [Build|https://dev.azure.com/mapohl/flink/_build/results?buildId=118&view=results] failed due to {{UnalignedCheckpointITCase}} caused by a {{NullPointerException}}: {code:java} Test execute[Parallel cogroup, p = 10](org.apache.flink.test.checkpointing.UnalignedCheckpointITCase) failed with: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5, backoffTimeMS=100) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInter