[ https://issues.apache.org/jira/browse/FLINK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-24528: ----------------------------------- Labels: auto-deprioritized-major pull-request-available stale-minor (was: auto-deprioritized-major pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Flink HBase Asyc Lookup throw NPE if rowkey is null > --------------------------------------------------- > > Key: FLINK-24528 > URL: https://issues.apache.org/jira/browse/FLINK-24528 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase > Affects Versions: 1.13.0 > Reporter: zhisheng > Priority: Minor > Labels: auto-deprioritized-major, pull-request-available, > stale-minor > > Flink SQL DDL create HBase table, if set 'lookup.async' = 'true', when the > rowkey is null, may throw NPE: > {code:java} > 2021-10-12 21:11:07,100 INFO > org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction [] - > start close ...2021-10-12 21:11:07,100 INFO > org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction [] - > start close ...2021-10-12 21:11:07,103 WARN > org.apache.flink.runtime.taskmanager.Task [] - > LookupJoin(table=[default_catalog.default_database.dim_user_guid_relation], > joinType=[LeftOuterJoin], async=[true], lookup=[rowkey=userGuid], > select=[userGuid, last_time, rowkey, cf]) -> Calc(select=[userGuid AS > user_guid, cf.user_new_id AS user_new_id, last_time AS > usr_pwtx_ectx_driver_last_seek_order_time, _UTF-16LE'prfl.usr' AS metric]) -> > Sink: Sink(table=[default_catalog.default_database.print_table], > fields=[user_guid, user_new_id, usr_pwtx_ectx_driver_last_seek_order_time, > metric]) (1/1)#0 (06bf3d7b0c341101e070796e20f7e571) switched from RUNNING to > FAILED.java.lang.NullPointerException: null at > org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.RawAsyncTableImpl.get(RawAsyncTableImpl.java:249) > ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncTableImpl.get(AsyncTableImpl.java:96) > ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.fetchResult(HBaseRowDataAsyncLookupFunction.java:187) > ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.eval(HBaseRowDataAsyncLookupFunction.java:174) > ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at > LookupFunction$24.asyncInvoke(Unknown Source) ~[?:?] at > org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner.asyncInvoke(AsyncLookupJoinRunner.java:139) > ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner.asyncInvoke(AsyncLookupJoinRunner.java:53) > ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:195) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > [flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) > [?:1.8.0_201] Suppressed: java.lang.Exception: > java.lang.NoClassDefFoundError: > org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:723) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:643) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:552) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > [flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) > [?:1.8.0_201] Caused by: java.lang.NoClassDefFoundError: > org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils at > org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnectionImpl.close(AsyncConnectionImpl.java:193) > ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.close(HBaseRowDataAsyncLookupFunction.java:266) > ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at > LookupFunction$24.close(Unknown Source) ~[?:?] at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner.close(AsyncLookupJoinRunner.java:154) > ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:740) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:720) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 5 more Caused by: > java.lang.ClassNotFoundException: > org.apache.flink.hbase.shaded.org.apache.commons.io.IOUtils at > java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_201] at > java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_201] at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[?:1.8.0_201] > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_201] at > org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnectionImpl.close(AsyncConnectionImpl.java:193) > ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.close(HBaseRowDataAsyncLookupFunction.java:266) > ~[flink-sql-connector-hbase-2.2_2.11-1.12.0.jar:1.12.0] at > LookupFunction$24.close(Unknown Source) ~[?:?] at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner.close(AsyncLookupJoinRunner.java:154) > ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:740) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:720) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 5 more > {code} > the HBaseRowDataAsyncLookupFunction code in flink 1.13 is: > [https://github.com/apache/flink/blob/release-1.13/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java#L172] > > the HBaseRowDataLookupFunction code is : > [https://github.com/apache/flink/blob/release-1.13/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java#L108] > > the code will return null > {code:java} > Get get = serde.createGet(rowKey); > public Get createGet(Object rowKey) { > checkArgument(keyEncoder != null, "row key is not set."); > rowWithRowKey.setField(0, rowKey); > byte[] rowkey = keyEncoder.encode(rowWithRowKey, 0); > if (rowkey.length == 0) { > // drop dirty records, rowkey shouldn't be zero length > return null; > } > Get get = new Get(rowkey); > for (int f = 0; f < families.length; f++) { > byte[] family = families[f]; > for (byte[] qualifier : qualifiers[f]) { > get.addColumn(family, qualifier); > } > } > return get; > } > {code} > we should add > {code:java} > if(get != null) { > ... > }{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)