Yes, BucketingSink is a better option. You can start from looking at the 
BucketingSink java docs.

Please also take a look on this: 

https://stackoverflow.com/questions/47669729/how-to-write-to-orc-files-using-bucketingsink-in-apache-flink
 
<https://stackoverflow.com/questions/47669729/how-to-write-to-orc-files-using-bucketingsink-in-apache-flink>

Alternatively if you do not need to push a lot of data, you could write your 
own JDBC sink that bases on the JDBCAppendTableSink and adjusting it so that it 
works with hive’s JDBC client.

Piotrek

> On 11 Jun 2018, at 08:12, sagar loke <sagar...@gmail.com> wrote:
> 
> Thanks, 
> We are getting data in Avro format from Kafka and are planning to write data 
> in ORC format to Hive tables. 
> 
> 1. Is BucketingSink better option for this use case or something else ?
> 2. Is there a sample code example which we can refer ?
> 
> Thanks in advance,
> 
> On Sun, Jun 10, 2018 at 10:49 PM, Jörn Franke <jornfra...@gmail.com 
> <mailto:jornfra...@gmail.com>> wrote:
> Don’t use the JDBC driver to write to Hive. The performance of JDBC in 
> general for large volumes is suboptimal.
> Write it to a file in HDFS in a format supported by HIve and point the table 
> definition in Hive to it.
> 
> On 11. Jun 2018, at 04:47, sagar loke <sagar...@gmail.com 
> <mailto:sagar...@gmail.com>> wrote:
> 
>> I am trying to Sink data to Hive via Confluent Kafka -> Flink -> Hive using 
>> following code snippet:
>> 
>> But I am getting following error:
>> 
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream<GenericRecord> stream = readFromKafka(env);
>> 
>> 
>> private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
>>         BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
>> };
>> 
>>  JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
>>             .setDrivername("org.apache.hive.jdbc.HiveDriver")
>>             .setDBUrl("jdbc:hive2://hiveconnstring <>")
>>             .setUsername("myuser")
>>             .setPassword("mypass")
>>             .setQuery("INSERT INTO testHiveDriverTable (key,value) VALUES 
>> (?,?)")
>>             .setBatchSize(1000)
>>             .setParameterTypes(FIELD_TYPES)
>>             .build();
>> 
>>     DataStream<Row> rows = stream.map((MapFunction<GenericRecord, Row>) st1 
>> -> {
>>                 Row row = new Row(2); // 
>>                 row.setField(0, st1.get("SOME_ID")); 
>>                 row.setField(1, st1.get("SOME_ADDRESS"));
>>                 return row;
>>             });
>> 
>>     sink.emitDataStream(rows);
>>     env.execute("Flink101");
>> 
>> 
>> Caused by: java.lang.RuntimeException: Execution of JDBC statement failed.
>> at org.apache.flink.api.java.io 
>> <http://org.apache.flink.api.java.io/>.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
>> at org.apache.flink.api.java.io 
>> <http://org.apache.flink.api.java.io/>.jdbc.JDBCSinkFunction.snapshotState(JDBCSinkFunction.java:43)
>> at 
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>> at 
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356)
>> ... 12 more
>> 
>> Caused by: java.sql.SQLException: Method not supported
>> at org.apache.hive.jdbc.HiveStatement.executeBatch(HiveStatement.java:381)
>> at org.apache.flink.api.java.io 
>> <http://org.apache.flink.api.java.io/>.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
>> ... 17 more
>> I checked hive-jdbc driver and it seems that the Method is not supported in 
>> hive-jdbc driver.
>> 
>> public class HiveStatement implements java.sql.Statement {
>> ...
>> 
>>   @Override  
>>   public int[] executeBatch() throws SQLException {
>>         throw new SQLFeatureNotSupportedException("Method not supported");
>>   }
>> 
>> ..
>> }
>> Is there any way we can achieve this using JDBC Driver ?
>> 
>> Let me know,
>> 
>> Thanks in advance.
>> 
> 
> 
> 
> -- 
> Regards,
> SAGAR.

Reply via email to