Hi Chris, MySQL (and maybe other DBMS as well) offers special syntax for upserts.
The answers to this SO question [1] recommend "INSERT INTO ... ON DUPLICATE KEY UPDATE ..." or "REPLACE INTO ...". However, AFAIK this syntax is not standardized and might vary from DBMS to DBMS. Best, Fabian [1] https://stackoverflow.com/questions/4205181/insert-into-a-mysql-table-or-update-if-exists 2018-07-03 12:14 GMT+02:00 Chris Ruegger <chris.rueg...@gmail.com>: > Fabian, Rong: > Thanks for the help, greatly appreciated. > > I am currently using a Derby database for the append-only JDBC sink. > So far I don't see a way to use a JDBC/relational database solution for a > retract/upsert use case? > Is it possible to set up JDBC sink with Derby or MySQL so that it goes > back and updates or deletes/inserts previous rows and inserts new ones? > I have not been able to find example source code that does that. > Thanks again, > Chris > > > On Tue, Jul 3, 2018 at 5:24 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> In addition to what Rong said: >> >> - The types look OK. >> - You can also use Types.STRING, and Types.LONG instead of >> BasicTypeInfo.xxx >> - Beware that in the failure case, you might have multiple entries in the >> database table. Some databases support an upsert syntax which (together >> with key or uniqueness constraints) can ensure that each result is added >> just once, even if the query recovers from a failure. >> >> Best, Fabian >> >> 2018-07-01 17:25 GMT+02:00 Rong Rong <walter...@gmail.com>: >> >>> Hi Chris, >>> >>> Looking at the code, seems like JDBCTypeUtil [1] is used for converting >>> Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and >>> SQL_TIME are both listed in the conversion mapping. However the JDBC types >>> are different. >>> >>> Regarding the question whether your insert is correctly configured. It >>> directly relates to how your DB executes the JDBC insert command. >>> 1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems >>> like you can even execute your command without type array or type mapping >>> cannot be found, in this case the PrepareStatement will be written with >>> plain Object type. I tired it on MySQL and it actually works pretty well. >>> 2. Another question is whether your underlying DB can handle "implicit >>> type cast": For example, inserting an INTEGER type into a BIGINT column. >>> AFAIK JDBCAppendableSink does not check compatibilities before writeRecord, >>> so it might be a good idea to include some sanity check beforehand. >>> >>> Thanks, >>> Rong >>> >>> [1] https://github.com/apache/flink/blob/master/flink-connec >>> tors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/j >>> dbc/JDBCTypeUtil.java >>> [2] https://github.com/apache/flink/blob/master/flink-connec >>> tors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/j >>> dbc/JDBCOutputFormat.java#L109 >>> >>> On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <chris.rueg...@gmail.com> >>> wrote: >>> >>>> >>>> Full Source except for mapper and timestamp assigner. >>>> >>>> Sample Input Stream record: >>>> 1530447316589,Mary,./home >>>> >>>> >>>> What are the correct parameters to pass for data types in the >>>> JDBCAppendTableSink? >>>> Am I doing this correctly? >>>> >>>> >>>> // Get Execution Environment >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> env.setStreamTimeCharacteristi >>>> c(TimeCharacteristic.EventTime); >>>> StreamTableEnvironment tableEnvironment = >>>> TableEnvironment.getTableEnvironment(env); >>>> >>>> // Get and Set execution parameters. >>>> ParameterTool parms = ParameterTool.fromArgs(args); >>>> env.getConfig().setGlobalJobParameters(parms); >>>> >>>> // Configure Checkpoint and Restart >>>> // configureCheckpoint(env); >>>> // configureRestart(env); >>>> >>>> // Get Our Data Stream >>>> DataStream<Tuple3<Long,String,String>> eventStream >>>> = env >>>> .socketTextStream(parms.get("host"), >>>> parms.getInt("port")) >>>> .map(new TableStreamMapper()) >>>> .assignTimestampsAndWatermarks(new >>>> MyEventTimestampAssigner()); >>>> >>>> >>>> // Register Table >>>> // Dynamic Table From Stream >>>> tableEnvironment.registerDataStream("pageViews", >>>> eventStream, >>>> "pageViewTime.rowtime, username, url"); >>>> >>>> // Continuous Query >>>> String continuousQuery = >>>> "SELECT TUMBLE_START(pageViewTime, >>>> INTERVAL '1' MINUTE) as wstart, " + >>>> "TUMBLE_END(pageViewTime, INTERVAL '1' >>>> MINUTE) as wend, " + >>>> "username, COUNT(url) as viewcount FROM >>>> pageViews " + >>>> "GROUP BY TUMBLE(pageViewTime, INTERVAL >>>> '1' MINUTE), username"; >>>> >>>> // Dynamic Table from Continuous Query >>>> Table windowedTable = tableEnvironment.sqlQuery(cont >>>> inuousQuery); >>>> windowedTable.printSchema(); >>>> >>>> // Convert Results to DataStream >>>> Table resultTable = windowedTable >>>> .select("wstart, wend, username,viewcount"); >>>> >>>> >>>> TupleTypeInfo<Tuple4<Timestamp,Timestamp,String,Long>> >>>> tupleTypeInfo = >>>> new TupleTypeInfo<>( >>>> Types.SQL_TIMESTAMP, >>>> Types.SQL_TIMESTAMP, >>>> Types.STRING, >>>> Types.LONG); >>>> DataStream<Tuple4<Timestamp,Timestamp,String,Long>> >>>> resultDataStream = >>>> tableEnvironment.toAppendStrea >>>> m(resultTable,tupleTypeInfo); >>>> resultDataStream.print(); >>>> >>>> >>>> // Write Result Table to Sink >>>> // Configure Sink >>>> JDBCAppendTableSink pageViewSink = >>>> JDBCAppendTableSink.builder() >>>> .setDrivername("org.apache.der >>>> by.jdbc.ClientDriver") >>>> .setDBUrl("jdbc:derby://captai >>>> n:1527/rueggerllc") >>>> .setUsername("chris") >>>> .setPassword("xxxx") >>>> .setBatchSize(1) >>>> .setQuery("INSERT INTO chris.pageclicks >>>> (window_start,window_end,username,viewcount) VALUES (?,?,?,?)") >>>> >>>> .setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,B >>>> asicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO) >>>> .build(); >>>> >>>> >>>> // Write Result Table to Sink >>>> resultTable.writeToSink(pageViewSink); >>>> System.out.println("WRITE TO SINK"); >>>> >>>> >>>> // Execute >>>> env.execute("PageViewsTumble"); >>>> } >>>> >>>> >>>> >>>> -- >>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>> ble.com/ >>>> >>> >> > > > -- > ------------------------------------------------------------ > ------------------------------------------------------------ > ---------------- > Simplicity is the ultimate sophistication > --Leonardo DaVinci > > www.rueggerconsultingllc.com > >