Fabian, Rong: Thanks for the help, greatly appreciated. I am currently using a Derby database for the append-only JDBC sink. So far I don't see a way to use a JDBC/relational database solution for a retract/upsert use case? Is it possible to set up JDBC sink with Derby or MySQL so that it goes back and updates or deletes/inserts previous rows and inserts new ones? I have not been able to find example source code that does that. Thanks again, Chris
On Tue, Jul 3, 2018 at 5:24 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > In addition to what Rong said: > > - The types look OK. > - You can also use Types.STRING, and Types.LONG instead of > BasicTypeInfo.xxx > - Beware that in the failure case, you might have multiple entries in the > database table. Some databases support an upsert syntax which (together > with key or uniqueness constraints) can ensure that each result is added > just once, even if the query recovers from a failure. > > Best, Fabian > > 2018-07-01 17:25 GMT+02:00 Rong Rong <walter...@gmail.com>: > >> Hi Chris, >> >> Looking at the code, seems like JDBCTypeUtil [1] is used for converting >> Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and >> SQL_TIME are both listed in the conversion mapping. However the JDBC types >> are different. >> >> Regarding the question whether your insert is correctly configured. It >> directly relates to how your DB executes the JDBC insert command. >> 1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems >> like you can even execute your command without type array or type mapping >> cannot be found, in this case the PrepareStatement will be written with >> plain Object type. I tired it on MySQL and it actually works pretty well. >> 2. Another question is whether your underlying DB can handle "implicit >> type cast": For example, inserting an INTEGER type into a BIGINT column. >> AFAIK JDBCAppendableSink does not check compatibilities before writeRecord, >> so it might be a good idea to include some sanity check beforehand. >> >> Thanks, >> Rong >> >> [1] https://github.com/apache/flink/blob/master/flink-connec >> tors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/ >> jdbc/JDBCTypeUtil.java >> [2] https://github.com/apache/flink/blob/master/flink-connec >> tors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/ >> jdbc/JDBCOutputFormat.java#L109 >> >> On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <chris.rueg...@gmail.com> wrote: >> >>> >>> Full Source except for mapper and timestamp assigner. >>> >>> Sample Input Stream record: >>> 1530447316589,Mary,./home >>> >>> >>> What are the correct parameters to pass for data types in the >>> JDBCAppendTableSink? >>> Am I doing this correctly? >>> >>> >>> // Get Execution Environment >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setStreamTimeCharacteristic(TimeCharacteristic. >>> EventTime); >>> StreamTableEnvironment tableEnvironment = >>> TableEnvironment.getTableEnvironment(env); >>> >>> // Get and Set execution parameters. >>> ParameterTool parms = ParameterTool.fromArgs(args); >>> env.getConfig().setGlobalJobParameters(parms); >>> >>> // Configure Checkpoint and Restart >>> // configureCheckpoint(env); >>> // configureRestart(env); >>> >>> // Get Our Data Stream >>> DataStream<Tuple3<Long,String,String>> eventStream = >>> env >>> .socketTextStream(parms.get("host"), >>> parms.getInt("port")) >>> .map(new TableStreamMapper()) >>> .assignTimestampsAndWatermarks(new >>> MyEventTimestampAssigner()); >>> >>> >>> // Register Table >>> // Dynamic Table From Stream >>> tableEnvironment.registerDataStream("pageViews", >>> eventStream, >>> "pageViewTime.rowtime, username, url"); >>> >>> // Continuous Query >>> String continuousQuery = >>> "SELECT TUMBLE_START(pageViewTime, >>> INTERVAL '1' MINUTE) as wstart, " + >>> "TUMBLE_END(pageViewTime, INTERVAL '1' >>> MINUTE) as wend, " + >>> "username, COUNT(url) as viewcount FROM >>> pageViews " + >>> "GROUP BY TUMBLE(pageViewTime, INTERVAL >>> '1' MINUTE), username"; >>> >>> // Dynamic Table from Continuous Query >>> Table windowedTable = tableEnvironment.sqlQuery(cont >>> inuousQuery); >>> windowedTable.printSchema(); >>> >>> // Convert Results to DataStream >>> Table resultTable = windowedTable >>> .select("wstart, wend, username,viewcount"); >>> >>> >>> TupleTypeInfo<Tuple4<Timestamp,Timestamp,String,Long>> >>> tupleTypeInfo = >>> new TupleTypeInfo<>( >>> Types.SQL_TIMESTAMP, >>> Types.SQL_TIMESTAMP, >>> Types.STRING, >>> Types.LONG); >>> DataStream<Tuple4<Timestamp,Timestamp,String,Long>> >>> resultDataStream = >>> tableEnvironment.toAppendStrea >>> m(resultTable,tupleTypeInfo); >>> resultDataStream.print(); >>> >>> >>> // Write Result Table to Sink >>> // Configure Sink >>> JDBCAppendTableSink pageViewSink = >>> JDBCAppendTableSink.builder() >>> .setDrivername("org.apache.der >>> by.jdbc.ClientDriver") >>> .setDBUrl("jdbc:derby://captai >>> n:1527/rueggerllc") >>> .setUsername("chris") >>> .setPassword("xxxx") >>> .setBatchSize(1) >>> .setQuery("INSERT INTO chris.pageclicks >>> (window_start,window_end,username,viewcount) VALUES (?,?,?,?)") >>> >>> .setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,B >>> asicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO) >>> .build(); >>> >>> >>> // Write Result Table to Sink >>> resultTable.writeToSink(pageViewSink); >>> System.out.println("WRITE TO SINK"); >>> >>> >>> // Execute >>> env.execute("PageViewsTumble"); >>> } >>> >>> >>> >>> -- >>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4. >>> nabble.com/ >>> >> > -- ---------------------------------------------------------------------------------------------------------------------------------------- Simplicity is the ultimate sophistication --Leonardo DaVinci www.rueggerconsultingllc.com