Hi Chris,

Looking at the code, seems like JDBCTypeUtil [1] is used for converting
Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and
SQL_TIME are both listed in the conversion mapping. However the JDBC types
are different.

Regarding the question whether your insert is correctly configured. It
directly relates to how your DB executes the JDBC insert command.
1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems like
you can even execute your command without type array or type mapping cannot
be found, in this case the PrepareStatement will be written with plain
Object type. I tired it on MySQL and it actually works pretty well.
2. Another question is whether your underlying DB can handle "implicit type
cast": For example, inserting an INTEGER type into a BIGINT column. AFAIK
JDBCAppendableSink does not check compatibilities before writeRecord, so it
might be a good idea to include some sanity check beforehand.

Thanks,
Rong

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java
[2]
https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java#L109

On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <chris.rueg...@gmail.com> wrote:

>
> Full Source except for mapper and timestamp assigner.
>
> Sample Input Stream record:
> 1530447316589,Mary,./home
>
>
> What are the correct parameters to pass for data types in the
> JDBCAppendTableSink?
> Am I doing this correctly?
>
>
>                 // Get Execution Environment
>                 StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>                 StreamTableEnvironment tableEnvironment =
> TableEnvironment.getTableEnvironment(env);
>
>                 // Get and Set execution parameters.
>                 ParameterTool parms = ParameterTool.fromArgs(args);
>                 env.getConfig().setGlobalJobParameters(parms);
>
>                 // Configure Checkpoint and Restart
>                 // configureCheckpoint(env);
>                 // configureRestart(env);
>
>                 // Get Our Data Stream
>                 DataStream<Tuple3&lt;Long,String,String>> eventStream = env
>                                 .socketTextStream(parms.get("host"),
> parms.getInt("port"))
>                                 .map(new TableStreamMapper())
>                                 .assignTimestampsAndWatermarks(new
> MyEventTimestampAssigner());
>
>
>                 // Register Table
>                 // Dynamic Table From Stream
>                 tableEnvironment.registerDataStream("pageViews",
> eventStream,
> "pageViewTime.rowtime, username, url");
>
>             // Continuous Query
>                 String continuousQuery =
>                                 "SELECT TUMBLE_START(pageViewTime,
> INTERVAL '1' MINUTE) as wstart, " +
>                                 "TUMBLE_END(pageViewTime, INTERVAL '1'
> MINUTE) as wend, " +
>                                 "username, COUNT(url) as viewcount FROM
> pageViews " +
>                                 "GROUP BY TUMBLE(pageViewTime, INTERVAL
> '1' MINUTE), username";
>
>                 // Dynamic Table from Continuous Query
>                 Table windowedTable =
> tableEnvironment.sqlQuery(continuousQuery);
>                 windowedTable.printSchema();
>
>                 // Convert Results to DataStream
>                 Table resultTable = windowedTable
>                         .select("wstart, wend, username,viewcount");
>
>
>                 TupleTypeInfo<Tuple4&lt;Timestamp,Timestamp,String,Long>>
> tupleTypeInfo =
> new TupleTypeInfo<>(
>                                 Types.SQL_TIMESTAMP,
>                                 Types.SQL_TIMESTAMP,
>                                 Types.STRING,
>                                 Types.LONG);
>                 DataStream<Tuple4&lt;Timestamp,Timestamp,String,Long>>
> resultDataStream =
>                 tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
>                 resultDataStream.print();
>
>
>                 // Write Result Table to Sink
>                 // Configure Sink
>                 JDBCAppendTableSink pageViewSink =
> JDBCAppendTableSink.builder()
>
> .setDrivername("org.apache.derby.jdbc.ClientDriver")
>                         .setDBUrl("jdbc:derby://captain:1527/rueggerllc")
>                         .setUsername("chris")
>                         .setPassword("xxxx")
>                         .setBatchSize(1)
>                         .setQuery("INSERT INTO chris.pageclicks
> (window_start,window_end,username,viewcount) VALUES (?,?,?,?)")
>
>
> .setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
>                         .build();
>
>
>                 // Write Result Table to Sink
>                 resultTable.writeToSink(pageViewSink);
>                 System.out.println("WRITE TO SINK");
>
>
>                 // Execute
>                 env.execute("PageViewsTumble");
>         }
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to