Yes, BucketingSink is a better option. You can start from looking at the BucketingSink java docs.
Please also take a look on this: https://stackoverflow.com/questions/47669729/how-to-write-to-orc-files-using-bucketingsink-in-apache-flink <https://stackoverflow.com/questions/47669729/how-to-write-to-orc-files-using-bucketingsink-in-apache-flink> Alternatively if you do not need to push a lot of data, you could write your own JDBC sink that bases on the JDBCAppendTableSink and adjusting it so that it works with hive’s JDBC client. Piotrek > On 11 Jun 2018, at 08:12, sagar loke <sagar...@gmail.com> wrote: > > Thanks, > We are getting data in Avro format from Kafka and are planning to write data > in ORC format to Hive tables. > > 1. Is BucketingSink better option for this use case or something else ? > 2. Is there a sample code example which we can refer ? > > Thanks in advance, > > On Sun, Jun 10, 2018 at 10:49 PM, Jörn Franke <jornfra...@gmail.com > <mailto:jornfra...@gmail.com>> wrote: > Don’t use the JDBC driver to write to Hive. The performance of JDBC in > general for large volumes is suboptimal. > Write it to a file in HDFS in a format supported by HIve and point the table > definition in Hive to it. > > On 11. Jun 2018, at 04:47, sagar loke <sagar...@gmail.com > <mailto:sagar...@gmail.com>> wrote: > >> I am trying to Sink data to Hive via Confluent Kafka -> Flink -> Hive using >> following code snippet: >> >> But I am getting following error: >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> DataStream<GenericRecord> stream = readFromKafka(env); >> >> >> private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{ >> BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO >> }; >> >> JDBCAppendTableSink sink = JDBCAppendTableSink.builder() >> .setDrivername("org.apache.hive.jdbc.HiveDriver") >> .setDBUrl("jdbc:hive2://hiveconnstring <>") >> .setUsername("myuser") >> .setPassword("mypass") >> .setQuery("INSERT INTO testHiveDriverTable (key,value) VALUES >> (?,?)") >> .setBatchSize(1000) >> .setParameterTypes(FIELD_TYPES) >> .build(); >> >> DataStream<Row> rows = stream.map((MapFunction<GenericRecord, Row>) st1 >> -> { >> Row row = new Row(2); // >> row.setField(0, st1.get("SOME_ID")); >> row.setField(1, st1.get("SOME_ADDRESS")); >> return row; >> }); >> >> sink.emitDataStream(rows); >> env.execute("Flink101"); >> >> >> Caused by: java.lang.RuntimeException: Execution of JDBC statement failed. >> at org.apache.flink.api.java.io >> <http://org.apache.flink.api.java.io/>.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219) >> at org.apache.flink.api.java.io >> <http://org.apache.flink.api.java.io/>.jdbc.JDBCSinkFunction.snapshotState(JDBCSinkFunction.java:43) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356) >> ... 12 more >> >> Caused by: java.sql.SQLException: Method not supported >> at org.apache.hive.jdbc.HiveStatement.executeBatch(HiveStatement.java:381) >> at org.apache.flink.api.java.io >> <http://org.apache.flink.api.java.io/>.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216) >> ... 17 more >> I checked hive-jdbc driver and it seems that the Method is not supported in >> hive-jdbc driver. >> >> public class HiveStatement implements java.sql.Statement { >> ... >> >> @Override >> public int[] executeBatch() throws SQLException { >> throw new SQLFeatureNotSupportedException("Method not supported"); >> } >> >> .. >> } >> Is there any way we can achieve this using JDBC Driver ? >> >> Let me know, >> >> Thanks in advance. >> > > > > -- > Regards, > SAGAR.