There is also the SQL:2003 MERGE statement that can be used to implement UPSERT logic. It is a bit verbose but supported by Derby [1].
Best, Fabian [1] https://issues.apache.org/jira/browse/DERBY-3155 2018-07-04 10:10 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi Chris, > > MySQL (and maybe other DBMS as well) offers special syntax for upserts. > > The answers to this SO question [1] recommend "INSERT INTO ... ON > DUPLICATE KEY UPDATE ..." or "REPLACE INTO ...". > However, AFAIK this syntax is not standardized and might vary from DBMS to > DBMS. > > Best, Fabian > > [1] https://stackoverflow.com/questions/4205181/insert-into- > a-mysql-table-or-update-if-exists > > 2018-07-03 12:14 GMT+02:00 Chris Ruegger <chris.rueg...@gmail.com>: > >> Fabian, Rong: >> Thanks for the help, greatly appreciated. >> >> I am currently using a Derby database for the append-only JDBC sink. >> So far I don't see a way to use a JDBC/relational database solution for a >> retract/upsert use case? >> Is it possible to set up JDBC sink with Derby or MySQL so that it goes >> back and updates or deletes/inserts previous rows and inserts new ones? >> I have not been able to find example source code that does that. >> Thanks again, >> Chris >> >> >> On Tue, Jul 3, 2018 at 5:24 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi, >>> >>> In addition to what Rong said: >>> >>> - The types look OK. >>> - You can also use Types.STRING, and Types.LONG instead of >>> BasicTypeInfo.xxx >>> - Beware that in the failure case, you might have multiple entries in >>> the database table. Some databases support an upsert syntax which (together >>> with key or uniqueness constraints) can ensure that each result is added >>> just once, even if the query recovers from a failure. >>> >>> Best, Fabian >>> >>> 2018-07-01 17:25 GMT+02:00 Rong Rong <walter...@gmail.com>: >>> >>>> Hi Chris, >>>> >>>> Looking at the code, seems like JDBCTypeUtil [1] is used for converting >>>> Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and >>>> SQL_TIME are both listed in the conversion mapping. However the JDBC types >>>> are different. >>>> >>>> Regarding the question whether your insert is correctly configured. It >>>> directly relates to how your DB executes the JDBC insert command. >>>> 1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems >>>> like you can even execute your command without type array or type mapping >>>> cannot be found, in this case the PrepareStatement will be written with >>>> plain Object type. I tired it on MySQL and it actually works pretty well. >>>> 2. Another question is whether your underlying DB can handle "implicit >>>> type cast": For example, inserting an INTEGER type into a BIGINT column. >>>> AFAIK JDBCAppendableSink does not check compatibilities before writeRecord, >>>> so it might be a good idea to include some sanity check beforehand. >>>> >>>> Thanks, >>>> Rong >>>> >>>> [1] https://github.com/apache/flink/blob/master/flink-connec >>>> tors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/j >>>> dbc/JDBCTypeUtil.java >>>> [2] https://github.com/apache/flink/blob/master/flink-connec >>>> tors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/j >>>> dbc/JDBCOutputFormat.java#L109 >>>> >>>> On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <chris.rueg...@gmail.com> >>>> wrote: >>>> >>>>> >>>>> Full Source except for mapper and timestamp assigner. >>>>> >>>>> Sample Input Stream record: >>>>> 1530447316589,Mary,./home >>>>> >>>>> >>>>> What are the correct parameters to pass for data types in the >>>>> JDBCAppendTableSink? >>>>> Am I doing this correctly? >>>>> >>>>> >>>>> // Get Execution Environment >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> env.setStreamTimeCharacteristi >>>>> c(TimeCharacteristic.EventTime); >>>>> StreamTableEnvironment tableEnvironment = >>>>> TableEnvironment.getTableEnvironment(env); >>>>> >>>>> // Get and Set execution parameters. >>>>> ParameterTool parms = ParameterTool.fromArgs(args); >>>>> env.getConfig().setGlobalJobParameters(parms); >>>>> >>>>> // Configure Checkpoint and Restart >>>>> // configureCheckpoint(env); >>>>> // configureRestart(env); >>>>> >>>>> // Get Our Data Stream >>>>> DataStream<Tuple3<Long,String,String>> eventStream >>>>> = env >>>>> .socketTextStream(parms.get("host"), >>>>> parms.getInt("port")) >>>>> .map(new TableStreamMapper()) >>>>> .assignTimestampsAndWatermarks(new >>>>> MyEventTimestampAssigner()); >>>>> >>>>> >>>>> // Register Table >>>>> // Dynamic Table From Stream >>>>> tableEnvironment.registerDataStream("pageViews", >>>>> eventStream, >>>>> "pageViewTime.rowtime, username, url"); >>>>> >>>>> // Continuous Query >>>>> String continuousQuery = >>>>> "SELECT TUMBLE_START(pageViewTime, >>>>> INTERVAL '1' MINUTE) as wstart, " + >>>>> "TUMBLE_END(pageViewTime, INTERVAL '1' >>>>> MINUTE) as wend, " + >>>>> "username, COUNT(url) as viewcount >>>>> FROM pageViews " + >>>>> "GROUP BY TUMBLE(pageViewTime, >>>>> INTERVAL '1' MINUTE), username"; >>>>> >>>>> // Dynamic Table from Continuous Query >>>>> Table windowedTable = tableEnvironment.sqlQuery(cont >>>>> inuousQuery); >>>>> windowedTable.printSchema(); >>>>> >>>>> // Convert Results to DataStream >>>>> Table resultTable = windowedTable >>>>> .select("wstart, wend, username,viewcount"); >>>>> >>>>> >>>>> TupleTypeInfo<Tuple4<Timestamp,Timestamp,String,Long>> >>>>> tupleTypeInfo = >>>>> new TupleTypeInfo<>( >>>>> Types.SQL_TIMESTAMP, >>>>> Types.SQL_TIMESTAMP, >>>>> Types.STRING, >>>>> Types.LONG); >>>>> DataStream<Tuple4<Timestamp,Timestamp,String,Long>> >>>>> resultDataStream = >>>>> tableEnvironment.toAppendStrea >>>>> m(resultTable,tupleTypeInfo); >>>>> resultDataStream.print(); >>>>> >>>>> >>>>> // Write Result Table to Sink >>>>> // Configure Sink >>>>> JDBCAppendTableSink pageViewSink = >>>>> JDBCAppendTableSink.builder() >>>>> .setDrivername("org.apache.der >>>>> by.jdbc.ClientDriver") >>>>> .setDBUrl("jdbc:derby://captai >>>>> n:1527/rueggerllc") >>>>> .setUsername("chris") >>>>> .setPassword("xxxx") >>>>> .setBatchSize(1) >>>>> .setQuery("INSERT INTO chris.pageclicks >>>>> (window_start,window_end,username,viewcount) VALUES (?,?,?,?)") >>>>> >>>>> .setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,B >>>>> asicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO) >>>>> .build(); >>>>> >>>>> >>>>> // Write Result Table to Sink >>>>> resultTable.writeToSink(pageViewSink); >>>>> System.out.println("WRITE TO SINK"); >>>>> >>>>> >>>>> // Execute >>>>> env.execute("PageViewsTumble"); >>>>> } >>>>> >>>>> >>>>> >>>>> -- >>>>> Sent from: http://apache-flink-user-maili >>>>> ng-list-archive.2336050.n4.nabble.com/ >>>>> >>>> >>> >> >> >> -- >> ------------------------------------------------------------ >> ------------------------------------------------------------ >> ---------------- >> Simplicity is the ultimate sophistication >> --Leonardo DaVinci >> >> www.rueggerconsultingllc.com >> >> > >