[ https://issues.apache.org/jira/browse/FLINK-20330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17336012#comment-17336012 ]
Flink Jira Bot commented on FLINK-20330: ---------------------------------------- This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Flink connector has error in support hive external tables (hbase or es) > ----------------------------------------------------------------------- > > Key: FLINK-20330 > URL: https://issues.apache.org/jira/browse/FLINK-20330 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive > Affects Versions: 1.11.0, 1.11.1, 1.11.2 > Environment: TEST CODE LIKE THIS: > CREATE EXTERNAL TABLE hive_to_es ( > key string, > value string > ) > STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' > TBLPROPERTIES( > 'es.resource' = 'hive_to_es/_doc', > 'es.index.auto.create' = 'TRUE', > 'es.nodes'='192.168.1.111:9200,192.168.1.112:9200,192.168.1.113:9200' > ); > insert into hive_to_es (key, value) values ('name','tom'); > insert into hive_to_es (key, value) values ('yes','aaa'); > select * from hive_to_es; > > !image-2020-11-25-09-51-00-100.png|width=807,height=134! > Reporter: liu > Priority: Major > Labels: stale-major > Attachments: image-2020-11-25-09-42-13-102.png, > image-2020-11-25-09-51-00-100.png > > > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate > the hadoop > > input format !image-2020-11-25-09-42-13-102.png|width=384,height=288! > > > we add a patch like this: > > flink-connector-hive_2.12-1.11.2.jar > org/apache/flink/connectors/hive/HiveTableSink.java +134 > > ADD PATCH: > > {code:java} > // code placeholder > if (sd.getOutputFormat() == null && > "org.apache.hadoop.hive.hbase.HBaseSerDe".equals(sd.getSerdeInfo().getSerializationLib())) > { > sd.setOutputFormat("org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat"); > } > if (sd.getOutputFormat() == null && > "org.elasticsearch.hadoop.hive.EsSerDe".equals(sd.getSerdeInfo().getSerializationLib())) > { sd.setOutputFormat("org.elasticsearch.hadoop.hive.EsHiveOutputFormat"); > } > {code} > org/apache/flink/connectors/hive/read/HiveTableInputFormat.java + 305 > ADD PATCH: > {code:java} > // code placeholder > if (sd.getInputFormat() == null && > "org.apache.hadoop.hive.hbase.HBaseSerDe".equals(sd.getSerdeInfo().getSerializationLib())) > { > sd.setInputFormat("org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat"); > jobConf.set("hbase.table.name", > partition.getTableProps().getProperty("hbase.table.name")); > jobConf.set("hbase.columns.mapping", > partition.getTableProps().getProperty("hbase.columns.mapping")); > } > if (sd.getInputFormat() == null && > "org.elasticsearch.hadoop.hive.EsSerDe".equals(sd.getSerdeInfo().getSerializationLib())) > { sd.setInputFormat("org.elasticsearch.hadoop.hive.EsHiveInputFormat"); > jobConf.set("location", sd.getLocation()); for (Enumeration en = > partition.getTableProps().keys(); en.hasMoreElements();) { String key = > en.nextElement().toString(); if(key.startsWith("es.")){ jobConf.set(key, > partition.getTableProps().getProperty(key)); } } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)