Don’t use the JDBC driver to write to Hive. The performance of JDBC in general for large volumes is suboptimal. Write it to a file in HDFS in a format supported by HIve and point the table definition in Hive to it.
> On 11. Jun 2018, at 04:47, sagar loke <sagar...@gmail.com> wrote: > > I am trying to Sink data to Hive via Confluent Kafka -> Flink -> Hive using > following code snippet: > > But I am getting following error: > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<GenericRecord> stream = readFromKafka(env); > > > private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{ > BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO > }; > > JDBCAppendTableSink sink = JDBCAppendTableSink.builder() > .setDrivername("org.apache.hive.jdbc.HiveDriver") > .setDBUrl("jdbc:hive2://hiveconnstring") > .setUsername("myuser") > .setPassword("mypass") > .setQuery("INSERT INTO testHiveDriverTable (key,value) VALUES > (?,?)") > .setBatchSize(1000) > .setParameterTypes(FIELD_TYPES) > .build(); > > DataStream<Row> rows = stream.map((MapFunction<GenericRecord, Row>) st1 > -> { > Row row = new Row(2); // > row.setField(0, st1.get("SOME_ID")); > row.setField(1, st1.get("SOME_ADDRESS")); > return row; > }); > > sink.emitDataStream(rows); > env.execute("Flink101"); > > > Caused by: java.lang.RuntimeException: Execution of JDBC statement failed. > at > org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219) > at > org.apache.flink.api.java.io.jdbc.JDBCSinkFunction.snapshotState(JDBCSinkFunction.java:43) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356) > ... 12 more > > Caused by: java.sql.SQLException: Method not supported > at org.apache.hive.jdbc.HiveStatement.executeBatch(HiveStatement.java:381) > at > org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216) > ... 17 more > I checked hive-jdbc driver and it seems that the Method is not supported in > hive-jdbc driver. > > public class HiveStatement implements java.sql.Statement { > ... > > @Override > public int[] executeBatch() throws SQLException { > throw new SQLFeatureNotSupportedException("Method not supported"); > } > > .. > } > Is there any way we can achieve this using JDBC Driver ? > > Let me know, > > Thanks in advance.