Fabian, Rong:
Thanks for the help, greatly appreciated.

I am currently using a Derby database for the append-only JDBC sink.
So far I don't see a way to use a JDBC/relational database solution for a
retract/upsert use case?
Is it possible to set up JDBC sink with Derby or MySQL so that it goes back
and updates or deletes/inserts previous rows and inserts new ones?
I have not been able to find example source code that does that.
Thanks again,
Chris


On Tue, Jul 3, 2018 at 5:24 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> In addition to what Rong said:
>
> - The types look OK.
> - You can also use Types.STRING, and Types.LONG instead of
> BasicTypeInfo.xxx
> - Beware that in the failure case, you might have multiple entries in the
> database table. Some databases support an upsert syntax which (together
> with key or uniqueness constraints) can ensure that each result is added
> just once, even if the query recovers from a failure.
>
> Best, Fabian
>
> 2018-07-01 17:25 GMT+02:00 Rong Rong <walter...@gmail.com>:
>
>> Hi Chris,
>>
>> Looking at the code, seems like JDBCTypeUtil [1] is used for converting
>> Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and
>> SQL_TIME are both listed in the conversion mapping. However the JDBC types
>> are different.
>>
>> Regarding the question whether your insert is correctly configured. It
>> directly relates to how your DB executes the JDBC insert command.
>> 1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems
>> like you can even execute your command without type array or type mapping
>> cannot be found, in this case the PrepareStatement will be written with
>> plain Object type. I tired it on MySQL and it actually works pretty well.
>> 2. Another question is whether your underlying DB can handle "implicit
>> type cast": For example, inserting an INTEGER type into a BIGINT column.
>> AFAIK JDBCAppendableSink does not check compatibilities before writeRecord,
>> so it might be a good idea to include some sanity check beforehand.
>>
>> Thanks,
>> Rong
>>
>> [1] https://github.com/apache/flink/blob/master/flink-connec
>> tors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/
>> jdbc/JDBCTypeUtil.java
>> [2] https://github.com/apache/flink/blob/master/flink-connec
>> tors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/
>> jdbc/JDBCOutputFormat.java#L109
>>
>> On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <chris.rueg...@gmail.com> wrote:
>>
>>>
>>> Full Source except for mapper and timestamp assigner.
>>>
>>> Sample Input Stream record:
>>> 1530447316589,Mary,./home
>>>
>>>
>>> What are the correct parameters to pass for data types in the
>>> JDBCAppendTableSink?
>>> Am I doing this correctly?
>>>
>>>
>>>                 // Get Execution Environment
>>>                 StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>                 env.setStreamTimeCharacteristic(TimeCharacteristic.
>>> EventTime);
>>>                 StreamTableEnvironment tableEnvironment =
>>> TableEnvironment.getTableEnvironment(env);
>>>
>>>                 // Get and Set execution parameters.
>>>                 ParameterTool parms = ParameterTool.fromArgs(args);
>>>                 env.getConfig().setGlobalJobParameters(parms);
>>>
>>>                 // Configure Checkpoint and Restart
>>>                 // configureCheckpoint(env);
>>>                 // configureRestart(env);
>>>
>>>                 // Get Our Data Stream
>>>                 DataStream<Tuple3&lt;Long,String,String>> eventStream =
>>> env
>>>                                 .socketTextStream(parms.get("host"),
>>> parms.getInt("port"))
>>>                                 .map(new TableStreamMapper())
>>>                                 .assignTimestampsAndWatermarks(new
>>> MyEventTimestampAssigner());
>>>
>>>
>>>                 // Register Table
>>>                 // Dynamic Table From Stream
>>>                 tableEnvironment.registerDataStream("pageViews",
>>> eventStream,
>>> "pageViewTime.rowtime, username, url");
>>>
>>>             // Continuous Query
>>>                 String continuousQuery =
>>>                                 "SELECT TUMBLE_START(pageViewTime,
>>> INTERVAL '1' MINUTE) as wstart, " +
>>>                                 "TUMBLE_END(pageViewTime, INTERVAL '1'
>>> MINUTE) as wend, " +
>>>                                 "username, COUNT(url) as viewcount FROM
>>> pageViews " +
>>>                                 "GROUP BY TUMBLE(pageViewTime, INTERVAL
>>> '1' MINUTE), username";
>>>
>>>                 // Dynamic Table from Continuous Query
>>>                 Table windowedTable = tableEnvironment.sqlQuery(cont
>>> inuousQuery);
>>>                 windowedTable.printSchema();
>>>
>>>                 // Convert Results to DataStream
>>>                 Table resultTable = windowedTable
>>>                         .select("wstart, wend, username,viewcount");
>>>
>>>
>>>                 TupleTypeInfo<Tuple4&lt;Timestamp,Timestamp,String,Long>>
>>> tupleTypeInfo =
>>> new TupleTypeInfo<>(
>>>                                 Types.SQL_TIMESTAMP,
>>>                                 Types.SQL_TIMESTAMP,
>>>                                 Types.STRING,
>>>                                 Types.LONG);
>>>                 DataStream<Tuple4&lt;Timestamp,Timestamp,String,Long>>
>>> resultDataStream =
>>>                 tableEnvironment.toAppendStrea
>>> m(resultTable,tupleTypeInfo);
>>>                 resultDataStream.print();
>>>
>>>
>>>                 // Write Result Table to Sink
>>>                 // Configure Sink
>>>                 JDBCAppendTableSink pageViewSink =
>>> JDBCAppendTableSink.builder()
>>>                         .setDrivername("org.apache.der
>>> by.jdbc.ClientDriver")
>>>                         .setDBUrl("jdbc:derby://captai
>>> n:1527/rueggerllc")
>>>                         .setUsername("chris")
>>>                         .setPassword("xxxx")
>>>                         .setBatchSize(1)
>>>                         .setQuery("INSERT INTO chris.pageclicks
>>> (window_start,window_end,username,viewcount) VALUES (?,?,?,?)")
>>>
>>> .setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,B
>>> asicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
>>>                         .build();
>>>
>>>
>>>                 // Write Result Table to Sink
>>>                 resultTable.writeToSink(pageViewSink);
>>>                 System.out.println("WRITE TO SINK");
>>>
>>>
>>>                 // Execute
>>>                 env.execute("PageViewsTumble");
>>>         }
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>>> nabble.com/
>>>
>>
>


-- 
----------------------------------------------------------------------------------------------------------------------------------------
Simplicity is the ultimate sophistication
--Leonardo DaVinci

www.rueggerconsultingllc.com

Reply via email to