Don’t use the JDBC driver to write to Hive. The performance of JDBC in general 
for large volumes is suboptimal.
Write it to a file in HDFS in a format supported by HIve and point the table 
definition in Hive to it.

> On 11. Jun 2018, at 04:47, sagar loke <sagar...@gmail.com> wrote:
> 
> I am trying to Sink data to Hive via Confluent Kafka -> Flink -> Hive using 
> following code snippet:
> 
> But I am getting following error:
> 
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<GenericRecord> stream = readFromKafka(env);
> 
> 
> private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
>         BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
> };
> 
>  JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
>             .setDrivername("org.apache.hive.jdbc.HiveDriver")
>             .setDBUrl("jdbc:hive2://hiveconnstring")
>             .setUsername("myuser")
>             .setPassword("mypass")
>             .setQuery("INSERT INTO testHiveDriverTable (key,value) VALUES 
> (?,?)")
>             .setBatchSize(1000)
>             .setParameterTypes(FIELD_TYPES)
>             .build();
> 
>     DataStream<Row> rows = stream.map((MapFunction<GenericRecord, Row>) st1 
> -> {
>                 Row row = new Row(2); // 
>                 row.setField(0, st1.get("SOME_ID")); 
>                 row.setField(1, st1.get("SOME_ADDRESS"));
>                 return row;
>             });
> 
>     sink.emitDataStream(rows);
>     env.execute("Flink101");
> 
> 
> Caused by: java.lang.RuntimeException: Execution of JDBC statement failed.
> at 
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
> at 
> org.apache.flink.api.java.io.jdbc.JDBCSinkFunction.snapshotState(JDBCSinkFunction.java:43)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356)
> ... 12 more
> 
> Caused by: java.sql.SQLException: Method not supported
> at org.apache.hive.jdbc.HiveStatement.executeBatch(HiveStatement.java:381)
> at 
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
> ... 17 more
> I checked hive-jdbc driver and it seems that the Method is not supported in 
> hive-jdbc driver.
> 
> public class HiveStatement implements java.sql.Statement {
> ...
> 
>   @Override  
>   public int[] executeBatch() throws SQLException {
>         throw new SQLFeatureNotSupportedException("Method not supported");
>   }
> 
> ..
> }
> Is there any way we can achieve this using JDBC Driver ?
> 
> Let me know,
> 
> Thanks in advance.

Reply via email to