zhanglu153 created FLINK-30075: ---------------------------------- Summary: Failed to load data to the cache after the hive lookup join task is restarted Key: FLINK-30075 URL: https://issues.apache.org/jira/browse/FLINK-30075 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.12.2 Reporter: zhanglu153
When I test kafka and hive lookup join, only the connection between tm and zookeeper is disconnected after the data is successfully loaded into the cache. In this case, the task will be restarted because the flink task restart policy is configured (note that the task process does not change at this time, and the task restarts in the same container). The task sql is as follows: {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); tableEnv.executeSql("CREATE TABLE kafka_table_1 (\n" + " a string,\n" + " b int,\n" + " c string,\n" + " log_ts TIMESTAMP(0),\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'properties.bootstrap.servers' = '172.16.144.208:9092',\n" + " 'format' = 'csv'\n" + ")"); String name = "hive_catalog"; String hiveConfDir = "/cloud/service/flink/conf"; HiveCatalog hive = new HiveCatalog(name, null, hiveConfDir); hive.open(); tableEnv.registerCatalog(name, hive); tableEnv.useCatalog(name); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql("create table if not exists ttt(a string,b int,c string)"); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); tableEnv.executeSql("select * from default_catalog.default_database.kafka_table_1 as u join ttt for system_time as of u.proctime as o on u.a = o.a").print();{code} After the task is successfully restarted, kafka produces data. When some data comes in, an error message is displayed indicating that loading data into the cache fails. The error message is as follows: {code:java} 2022-11-17 14:09:16,041 INFO org.apache.flink.table.filesystem.FileSystemLookupFunction [] - Populating lookup join cache 2022-11-17 14:09:16,051 INFO org.apache.hadoop.conf.Configuration [] - getProps 1230612033 2022-11-17 14:09:16,053 INFO org.apache.hadoop.io.retry.RetryInvocationHandler [] - java.io.IOException: org.apache.flink.shaded.hadoop2.com.google.protobuf.ServiceException: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'., while invoking ClientNamenodeProtocolTranslatorPB.getFileInfo over hdp-hadoop-hdp-namenode-1.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.6.85:9000. Trying to failover immediately. 2022-11-17 14:09:16,053 DEBUG org.apache.hadoop.ipc.Client [] - The ping interval is 60000 ms. 2022-11-17 14:09:16,053 DEBUG org.apache.hadoop.ipc.Client [] - Connecting to hdp-hadoop-hdp-namenode-0.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.0.8:9000 2022-11-17 14:09:16,055 DEBUG org.apache.hadoop.security.UserGroupInformation [] - PrivilegedAction as:hadoop/hdp-flinkhistory-hdp-flink-history-6754db4f9c-8b8nd.hdp-flinkhistory-hdp-flink-history.hdp-dev-env-3.svc.cluster.lo...@dahua.com (auth:KERBEROS) from:org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:825) 2022-11-17 14:09:16,056 DEBUG org.apache.hadoop.security.SaslRpcClient [] - Sending sasl message state: NEGOTIATE 2022-11-17 14:09:16,057 DEBUG org.apache.hadoop.security.SaslRpcClient [] - Get token info proto:interface org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB info:@org.apache.hadoop.security.token.TokenInfo(value=class org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector) 2022-11-17 14:09:16,057 INFO org.apache.hadoop.security.SaslRpcClient [] - getServerPrincipal 1230612033 2022-11-17 14:09:16,057 DEBUG org.apache.hadoop.security.SaslRpcClient [] - Get kerberos info proto:interface org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB info:@org.apache.hadoop.security.KerberosInfo(clientPrincipal=, serverPrincipal=dfs.namenode.kerberos.principal) 2022-11-17 14:09:16,059 DEBUG org.apache.hadoop.security.SaslRpcClient [] - getting serverKey: dfs.namenode.kerberos.principal conf value: null principal: null 2022-11-17 14:09:16,059 DEBUG org.apache.hadoop.ipc.Client [] - closing ipc connection to hdp-hadoop-hdp-namenode-0.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.0.8:9000: Couldn't set up IO streams: java.lang.IllegalArgumentException: Failed to specify server's Kerberos principal name java.io.IOException: Couldn't set up IO streams: java.lang.IllegalArgumentException: Failed to specify server's Kerberos principal name at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:891) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.ipc.Client$Connection.access$3700(Client.java:423) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.ipc.Client.getConnection(Client.java:1615) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.ipc.Client.call(Client.java:1446) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.ipc.Client.call(Client.java:1399) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at com.sun.proxy.$Proxy27.getFileInfo(Unknown Source) ~[?:?] at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:800) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_342] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_342] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_342] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342] at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at com.sun.proxy.$Proxy28.getFileInfo(Unknown Source) ~[?:?] at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1673) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1524) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1521) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1536) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1632) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:334) ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28] at org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:318) ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28] at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:83) ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28] at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.table.filesystem.security.HdpFileSystemLookupFunction.lambda$eval$1(HdpFileSystemLookupFunction.java:59) ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.runtime.security.SecurityUtils.runAtOneContext(SecurityUtils.java:100) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.table.filesystem.security.HdpFileSystemLookupFunction.eval(HdpFileSystemLookupFunction.java:58) [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at LookupFunction$10.flatMap(Unknown Source) [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:?] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at StreamExecCalc$7.processElement(Unknown Source) [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:?] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09] Caused by: java.lang.IllegalArgumentException: Failed to specify server's Kerberos principal name at org.apache.hadoop.security.SaslRpcClient.getServerPrincipal(SaslRpcClient.java:326) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.security.SaslRpcClient.createSaslClient(SaslRpcClient.java:231) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.security.SaslRpcClient.selectSaslClient(SaslRpcClient.java:159) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:391) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:617) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.ipc.Client$Connection.access$2200(Client.java:423) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:829) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:825) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_342] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_342] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2012) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:825) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0] ... 56 more{code} The task did not fail, but hive failed to load data into the cache. -- This message was sent by Atlassian Jira (v8.20.10#820010)