I opened an issue for this: https://issues.apache.org/jira/browse/FLINK-7605

On Wed, Sep 6, 2017 at 4:24 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Maybe this should be well documented also...is there any dedicated page to
> Flink and JDBC connectors?
>
> On Wed, Sep 6, 2017 at 4:12 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Great!
>>
>> If you want to, you can open a PR that adds
>>
>> if (!conn.getAutoCommit()) {
>>   conn.setAutoCommit(true);
>> }
>>
>> to JdbcOutputFormat.open().
>>
>> Cheers, Fabian
>>
>>
>>
>> 2017-09-06 15:55 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>
>>> Hi Fabian,
>>> thanks for the detailed answer. Obviously you are right :)
>>> As stated by https://phoenix.apache.org/tuning.html auto-commit is
>>> disabled by default in Phoenix, but it can be easily enabled just appending
>>> AutoCommit=true to the connection URL or, equivalently, setting the proper
>>> property in the conf object passed to the Phoenix
>>> QueryUtil.getConnectionUrl method that autogenerate the connection URL,
>>> i.e.:
>>>
>>> ----------------------
>>> Job job = Job.getInstance(HBaseConfiguration.create(),
>>> "phoenix-mr-job");
>>> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
>>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>>> final Properties phoenixProps = PropertiesUtil.extractProperties(new
>>> Properties(), jobConf);
>>> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>>> ----------------------
>>>
>>> Now my job works also with the standard Flink JDBCOutputformat.
>>> Just to help other people willing to play with Phoenix and HBase I paste
>>> below my simple test job:
>>>
>>> @Test
>>>   public void testPhoenixOutputFormat() throws Exception {
>>>
>>>     final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
>>>     senv.enableCheckpointing(5000);
>>>     DataStream<String> testStream = senv.fromElements("1,aaa,XXX",
>>> "2,bbb,YYY", "3,ccc,ZZZ");
>>>
>>>     // Set the target Phoenix table and the columns
>>>     DataStream<Row> rows = testStream.map(new MapFunction<String, Row>()
>>> {
>>>
>>>       private static final long serialVersionUID = 1L;
>>>
>>>       @Override
>>>       public Row map(String str) throws Exception {
>>>         String[] split = str.split(Pattern.quote(","));
>>>         Row ret = new Row(3);
>>>         ret.setField(0, split[0]);
>>>         ret.setField(1, split[1]);
>>>         ret.setField(2, split[2]);
>>>         return ret;
>>>       }
>>>     }).returns(new RowTypeInfo(BasicTypeInfo.STRI
>>> NG_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.
>>> STRING_TYPE_INFO));
>>>
>>>     Job job = Job.getInstance(HBaseConfiguration.create(),
>>> "phoenix-mr-job");
>>>     PhoenixMapReduceUtil.setOutput(job, "MY_TABLE",
>>> "FIELD_1,FIELD2,FIELD_3");
>>>     final org.apache.hadoop.conf.Configuration jobConf =
>>> job.getConfiguration();
>>>     jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
>>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>>>     final String upsertStatement = PhoenixConfigurationUtil.getUp
>>> sertStatement(jobConf);
>>>     final Properties phoenixProps = PropertiesUtil.extractProperties(new
>>> Properties(), jobConf);
>>>     String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>>>
>>>     rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
>>>         .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.g
>>> etCanonicalName())
>>>         .setDBUrl(connUrl)
>>>         .setQuery(upsertStatement)
>>>         .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR,
>>> Types.VARCHAR})
>>>         .finish());
>>>
>>>     senv.execute();
>>>   }
>>>
>>> Best,
>>> Flavio
>>>
>>> On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> According to the JavaDocs of java.sql.Connection, commit() will throw
>>>> an exception if the connection is in auto commit mode which should be the
>>>> default.
>>>> So adding this change to the JdbcOutputFormat seems a bit risky.
>>>>
>>>> Maybe the Phoenix JDBC connector does not enable auto commits by
>>>> default (or doesn't support it). Can you check that Flavio?
>>>> If the Phoenix connector supports but not activates auto commits by
>>>> default, we can enable it in JdbcOutputFormat.open().
>>>> If auto commits are not supported, we can add a check after execute()
>>>> and call commit() only if Connection.getAutoCommit() returns false.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>> 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>
>>>>> Hi to all,
>>>>> I'm writing a job that uses Apache Phoenix.
>>>>>
>>>>> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but
>>>>> it's not well suited to work with Table API because it cannot handle
>>>>> generic objects like Rows (it need a DBWritable Object that should be
>>>>> already present at compile time). So I've looked into the code of the
>>>>> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
>>>>> (basically).
>>>>>
>>>>> However, to make it work I had to slightly modify the Flink
>>>>> JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
>>>>> PreparedStatement. E.g:
>>>>>
>>>>>     upload.executeBatch();
>>>>>     dbConn.commit();
>>>>>
>>>>> For the moment I've just created a dedicated PhoenixJdbcOutpuFormat
>>>>> where I've added these 2 lines of code starting from the code of the
>>>>> JDBCOutputformat (it couldn't be extended in this case because all fields
>>>>> are private).
>>>>>
>>>>> What do you think about this? Should I open a ticket to add a
>>>>> connection commit after executeBatch (in order to be compatible with
>>>>> Phoenix) or something else (e.g. create a Phoenix connector that basically
>>>>> extend JDBCOutputConnector and ovewrite 2 methods, changing also the
>>>>> visibility of its fields to protected)?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Reply via email to