There is also the SQL:2003 MERGE statement that can be used to implement
UPSERT logic.
It is a bit verbose but supported by Derby [1].

Best, Fabian

[1] https://issues.apache.org/jira/browse/DERBY-3155

2018-07-04 10:10 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Chris,
>
> MySQL (and maybe other DBMS as well) offers special syntax for upserts.
>
> The answers to this SO question [1] recommend "INSERT INTO ... ON
> DUPLICATE KEY UPDATE ..." or "REPLACE INTO ...".
> However, AFAIK this syntax is not standardized and might vary from DBMS to
> DBMS.
>
> Best, Fabian
>
> [1] https://stackoverflow.com/questions/4205181/insert-into-
> a-mysql-table-or-update-if-exists
>
> 2018-07-03 12:14 GMT+02:00 Chris Ruegger <chris.rueg...@gmail.com>:
>
>> Fabian, Rong:
>> Thanks for the help, greatly appreciated.
>>
>> I am currently using a Derby database for the append-only JDBC sink.
>> So far I don't see a way to use a JDBC/relational database solution for a
>> retract/upsert use case?
>> Is it possible to set up JDBC sink with Derby or MySQL so that it goes
>> back and updates or deletes/inserts previous rows and inserts new ones?
>> I have not been able to find example source code that does that.
>> Thanks again,
>> Chris
>>
>>
>> On Tue, Jul 3, 2018 at 5:24 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> In addition to what Rong said:
>>>
>>> - The types look OK.
>>> - You can also use Types.STRING, and Types.LONG instead of
>>> BasicTypeInfo.xxx
>>> - Beware that in the failure case, you might have multiple entries in
>>> the database table. Some databases support an upsert syntax which (together
>>> with key or uniqueness constraints) can ensure that each result is added
>>> just once, even if the query recovers from a failure.
>>>
>>> Best, Fabian
>>>
>>> 2018-07-01 17:25 GMT+02:00 Rong Rong <walter...@gmail.com>:
>>>
>>>> Hi Chris,
>>>>
>>>> Looking at the code, seems like JDBCTypeUtil [1] is used for converting
>>>> Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and
>>>> SQL_TIME are both listed in the conversion mapping. However the JDBC types
>>>> are different.
>>>>
>>>> Regarding the question whether your insert is correctly configured. It
>>>> directly relates to how your DB executes the JDBC insert command.
>>>> 1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems
>>>> like you can even execute your command without type array or type mapping
>>>> cannot be found, in this case the PrepareStatement will be written with
>>>> plain Object type. I tired it on MySQL and it actually works pretty well.
>>>> 2. Another question is whether your underlying DB can handle "implicit
>>>> type cast": For example, inserting an INTEGER type into a BIGINT column.
>>>> AFAIK JDBCAppendableSink does not check compatibilities before writeRecord,
>>>> so it might be a good idea to include some sanity check beforehand.
>>>>
>>>> Thanks,
>>>> Rong
>>>>
>>>> [1] https://github.com/apache/flink/blob/master/flink-connec
>>>> tors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/j
>>>> dbc/JDBCTypeUtil.java
>>>> [2] https://github.com/apache/flink/blob/master/flink-connec
>>>> tors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/j
>>>> dbc/JDBCOutputFormat.java#L109
>>>>
>>>> On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <chris.rueg...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Full Source except for mapper and timestamp assigner.
>>>>>
>>>>> Sample Input Stream record:
>>>>> 1530447316589,Mary,./home
>>>>>
>>>>>
>>>>> What are the correct parameters to pass for data types in the
>>>>> JDBCAppendTableSink?
>>>>> Am I doing this correctly?
>>>>>
>>>>>
>>>>>                 // Get Execution Environment
>>>>>                 StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>                 env.setStreamTimeCharacteristi
>>>>> c(TimeCharacteristic.EventTime);
>>>>>                 StreamTableEnvironment tableEnvironment =
>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>
>>>>>                 // Get and Set execution parameters.
>>>>>                 ParameterTool parms = ParameterTool.fromArgs(args);
>>>>>                 env.getConfig().setGlobalJobParameters(parms);
>>>>>
>>>>>                 // Configure Checkpoint and Restart
>>>>>                 // configureCheckpoint(env);
>>>>>                 // configureRestart(env);
>>>>>
>>>>>                 // Get Our Data Stream
>>>>>                 DataStream<Tuple3&lt;Long,String,String>> eventStream
>>>>> = env
>>>>>                                 .socketTextStream(parms.get("host"),
>>>>> parms.getInt("port"))
>>>>>                                 .map(new TableStreamMapper())
>>>>>                                 .assignTimestampsAndWatermarks(new
>>>>> MyEventTimestampAssigner());
>>>>>
>>>>>
>>>>>                 // Register Table
>>>>>                 // Dynamic Table From Stream
>>>>>                 tableEnvironment.registerDataStream("pageViews",
>>>>> eventStream,
>>>>> "pageViewTime.rowtime, username, url");
>>>>>
>>>>>             // Continuous Query
>>>>>                 String continuousQuery =
>>>>>                                 "SELECT TUMBLE_START(pageViewTime,
>>>>> INTERVAL '1' MINUTE) as wstart, " +
>>>>>                                 "TUMBLE_END(pageViewTime, INTERVAL '1'
>>>>> MINUTE) as wend, " +
>>>>>                                 "username, COUNT(url) as viewcount
>>>>> FROM pageViews " +
>>>>>                                 "GROUP BY TUMBLE(pageViewTime,
>>>>> INTERVAL '1' MINUTE), username";
>>>>>
>>>>>                 // Dynamic Table from Continuous Query
>>>>>                 Table windowedTable = tableEnvironment.sqlQuery(cont
>>>>> inuousQuery);
>>>>>                 windowedTable.printSchema();
>>>>>
>>>>>                 // Convert Results to DataStream
>>>>>                 Table resultTable = windowedTable
>>>>>                         .select("wstart, wend, username,viewcount");
>>>>>
>>>>>
>>>>>                 TupleTypeInfo<Tuple4&lt;Timestamp,Timestamp,String,Long>>
>>>>> tupleTypeInfo =
>>>>> new TupleTypeInfo<>(
>>>>>                                 Types.SQL_TIMESTAMP,
>>>>>                                 Types.SQL_TIMESTAMP,
>>>>>                                 Types.STRING,
>>>>>                                 Types.LONG);
>>>>>                 DataStream<Tuple4&lt;Timestamp,Timestamp,String,Long>>
>>>>> resultDataStream =
>>>>>                 tableEnvironment.toAppendStrea
>>>>> m(resultTable,tupleTypeInfo);
>>>>>                 resultDataStream.print();
>>>>>
>>>>>
>>>>>                 // Write Result Table to Sink
>>>>>                 // Configure Sink
>>>>>                 JDBCAppendTableSink pageViewSink =
>>>>> JDBCAppendTableSink.builder()
>>>>>                         .setDrivername("org.apache.der
>>>>> by.jdbc.ClientDriver")
>>>>>                         .setDBUrl("jdbc:derby://captai
>>>>> n:1527/rueggerllc")
>>>>>                         .setUsername("chris")
>>>>>                         .setPassword("xxxx")
>>>>>                         .setBatchSize(1)
>>>>>                         .setQuery("INSERT INTO chris.pageclicks
>>>>> (window_start,window_end,username,viewcount) VALUES (?,?,?,?)")
>>>>>
>>>>> .setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,B
>>>>> asicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
>>>>>                         .build();
>>>>>
>>>>>
>>>>>                 // Write Result Table to Sink
>>>>>                 resultTable.writeToSink(pageViewSink);
>>>>>                 System.out.println("WRITE TO SINK");
>>>>>
>>>>>
>>>>>                 // Execute
>>>>>                 env.execute("PageViewsTumble");
>>>>>         }
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sent from: http://apache-flink-user-maili
>>>>> ng-list-archive.2336050.n4.nabble.com/
>>>>>
>>>>
>>>
>>
>>
>> --
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ----------------
>> Simplicity is the ultimate sophistication
>> --Leonardo DaVinci
>>
>> www.rueggerconsultingllc.com
>>
>>
>
>

Reply via email to