Hi Chris,

MySQL (and maybe other DBMS as well) offers special syntax for upserts.

The answers to this SO question [1] recommend "INSERT INTO ... ON DUPLICATE
KEY UPDATE ..." or "REPLACE INTO ...".
However, AFAIK this syntax is not standardized and might vary from DBMS to
DBMS.

Best, Fabian

[1]
https://stackoverflow.com/questions/4205181/insert-into-a-mysql-table-or-update-if-exists

2018-07-03 12:14 GMT+02:00 Chris Ruegger <chris.rueg...@gmail.com>:

> Fabian, Rong:
> Thanks for the help, greatly appreciated.
>
> I am currently using a Derby database for the append-only JDBC sink.
> So far I don't see a way to use a JDBC/relational database solution for a
> retract/upsert use case?
> Is it possible to set up JDBC sink with Derby or MySQL so that it goes
> back and updates or deletes/inserts previous rows and inserts new ones?
> I have not been able to find example source code that does that.
> Thanks again,
> Chris
>
>
> On Tue, Jul 3, 2018 at 5:24 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> In addition to what Rong said:
>>
>> - The types look OK.
>> - You can also use Types.STRING, and Types.LONG instead of
>> BasicTypeInfo.xxx
>> - Beware that in the failure case, you might have multiple entries in the
>> database table. Some databases support an upsert syntax which (together
>> with key or uniqueness constraints) can ensure that each result is added
>> just once, even if the query recovers from a failure.
>>
>> Best, Fabian
>>
>> 2018-07-01 17:25 GMT+02:00 Rong Rong <walter...@gmail.com>:
>>
>>> Hi Chris,
>>>
>>> Looking at the code, seems like JDBCTypeUtil [1] is used for converting
>>> Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and
>>> SQL_TIME are both listed in the conversion mapping. However the JDBC types
>>> are different.
>>>
>>> Regarding the question whether your insert is correctly configured. It
>>> directly relates to how your DB executes the JDBC insert command.
>>> 1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems
>>> like you can even execute your command without type array or type mapping
>>> cannot be found, in this case the PrepareStatement will be written with
>>> plain Object type. I tired it on MySQL and it actually works pretty well.
>>> 2. Another question is whether your underlying DB can handle "implicit
>>> type cast": For example, inserting an INTEGER type into a BIGINT column.
>>> AFAIK JDBCAppendableSink does not check compatibilities before writeRecord,
>>> so it might be a good idea to include some sanity check beforehand.
>>>
>>> Thanks,
>>> Rong
>>>
>>> [1] https://github.com/apache/flink/blob/master/flink-connec
>>> tors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/j
>>> dbc/JDBCTypeUtil.java
>>> [2] https://github.com/apache/flink/blob/master/flink-connec
>>> tors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/j
>>> dbc/JDBCOutputFormat.java#L109
>>>
>>> On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <chris.rueg...@gmail.com>
>>> wrote:
>>>
>>>>
>>>> Full Source except for mapper and timestamp assigner.
>>>>
>>>> Sample Input Stream record:
>>>> 1530447316589,Mary,./home
>>>>
>>>>
>>>> What are the correct parameters to pass for data types in the
>>>> JDBCAppendTableSink?
>>>> Am I doing this correctly?
>>>>
>>>>
>>>>                 // Get Execution Environment
>>>>                 StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>                 env.setStreamTimeCharacteristi
>>>> c(TimeCharacteristic.EventTime);
>>>>                 StreamTableEnvironment tableEnvironment =
>>>> TableEnvironment.getTableEnvironment(env);
>>>>
>>>>                 // Get and Set execution parameters.
>>>>                 ParameterTool parms = ParameterTool.fromArgs(args);
>>>>                 env.getConfig().setGlobalJobParameters(parms);
>>>>
>>>>                 // Configure Checkpoint and Restart
>>>>                 // configureCheckpoint(env);
>>>>                 // configureRestart(env);
>>>>
>>>>                 // Get Our Data Stream
>>>>                 DataStream<Tuple3&lt;Long,String,String>> eventStream
>>>> = env
>>>>                                 .socketTextStream(parms.get("host"),
>>>> parms.getInt("port"))
>>>>                                 .map(new TableStreamMapper())
>>>>                                 .assignTimestampsAndWatermarks(new
>>>> MyEventTimestampAssigner());
>>>>
>>>>
>>>>                 // Register Table
>>>>                 // Dynamic Table From Stream
>>>>                 tableEnvironment.registerDataStream("pageViews",
>>>> eventStream,
>>>> "pageViewTime.rowtime, username, url");
>>>>
>>>>             // Continuous Query
>>>>                 String continuousQuery =
>>>>                                 "SELECT TUMBLE_START(pageViewTime,
>>>> INTERVAL '1' MINUTE) as wstart, " +
>>>>                                 "TUMBLE_END(pageViewTime, INTERVAL '1'
>>>> MINUTE) as wend, " +
>>>>                                 "username, COUNT(url) as viewcount FROM
>>>> pageViews " +
>>>>                                 "GROUP BY TUMBLE(pageViewTime, INTERVAL
>>>> '1' MINUTE), username";
>>>>
>>>>                 // Dynamic Table from Continuous Query
>>>>                 Table windowedTable = tableEnvironment.sqlQuery(cont
>>>> inuousQuery);
>>>>                 windowedTable.printSchema();
>>>>
>>>>                 // Convert Results to DataStream
>>>>                 Table resultTable = windowedTable
>>>>                         .select("wstart, wend, username,viewcount");
>>>>
>>>>
>>>>                 TupleTypeInfo<Tuple4&lt;Timestamp,Timestamp,String,Long>>
>>>> tupleTypeInfo =
>>>> new TupleTypeInfo<>(
>>>>                                 Types.SQL_TIMESTAMP,
>>>>                                 Types.SQL_TIMESTAMP,
>>>>                                 Types.STRING,
>>>>                                 Types.LONG);
>>>>                 DataStream<Tuple4&lt;Timestamp,Timestamp,String,Long>>
>>>> resultDataStream =
>>>>                 tableEnvironment.toAppendStrea
>>>> m(resultTable,tupleTypeInfo);
>>>>                 resultDataStream.print();
>>>>
>>>>
>>>>                 // Write Result Table to Sink
>>>>                 // Configure Sink
>>>>                 JDBCAppendTableSink pageViewSink =
>>>> JDBCAppendTableSink.builder()
>>>>                         .setDrivername("org.apache.der
>>>> by.jdbc.ClientDriver")
>>>>                         .setDBUrl("jdbc:derby://captai
>>>> n:1527/rueggerllc")
>>>>                         .setUsername("chris")
>>>>                         .setPassword("xxxx")
>>>>                         .setBatchSize(1)
>>>>                         .setQuery("INSERT INTO chris.pageclicks
>>>> (window_start,window_end,username,viewcount) VALUES (?,?,?,?)")
>>>>
>>>> .setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,B
>>>> asicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
>>>>                         .build();
>>>>
>>>>
>>>>                 // Write Result Table to Sink
>>>>                 resultTable.writeToSink(pageViewSink);
>>>>                 System.out.println("WRITE TO SINK");
>>>>
>>>>
>>>>                 // Execute
>>>>                 env.execute("PageViewsTumble");
>>>>         }
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/
>>>>
>>>
>>
>
>
> --
> ------------------------------------------------------------
> ------------------------------------------------------------
> ----------------
> Simplicity is the ultimate sophistication
> --Leonardo DaVinci
>
> www.rueggerconsultingllc.com
>
>

Reply via email to