Full Source except for mapper and timestamp assigner.
Sample Input Stream record:
1530447316589,Mary,./home
What are the correct parameters to pass for data types in the
JDBCAppendTableSink?
Am I doing this correctly?
// Get Execution Environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnvironment =
TableEnvironment.getTableEnvironment(env);
// Get and Set execution parameters.
ParameterTool parms = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(parms);
// Configure Checkpoint and Restart
// configureCheckpoint(env);
// configureRestart(env);
// Get Our Data Stream
DataStream<Tuple3<Long,String,String>> eventStream = env
.socketTextStream(parms.get("host"),
parms.getInt("port"))
.map(new TableStreamMapper())
.assignTimestampsAndWatermarks(new
MyEventTimestampAssigner());
// Register Table
// Dynamic Table From Stream
tableEnvironment.registerDataStream("pageViews", eventStream,
"pageViewTime.rowtime, username, url");
// Continuous Query
String continuousQuery =
"SELECT TUMBLE_START(pageViewTime, INTERVAL '1'
MINUTE) as wstart, " +
"TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE)
as wend, " +
"username, COUNT(url) as viewcount FROM
pageViews " +
"GROUP BY TUMBLE(pageViewTime, INTERVAL '1'
MINUTE), username";
// Dynamic Table from Continuous Query
Table windowedTable =
tableEnvironment.sqlQuery(continuousQuery);
windowedTable.printSchema();
// Convert Results to DataStream
Table resultTable = windowedTable
.select("wstart, wend, username,viewcount");
TupleTypeInfo<Tuple4<Timestamp,Timestamp,String,Long>>
tupleTypeInfo =
new TupleTypeInfo<>(
Types.SQL_TIMESTAMP,
Types.SQL_TIMESTAMP,
Types.STRING,
Types.LONG);
DataStream<Tuple4<Timestamp,Timestamp,String,Long>>
resultDataStream =
tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
resultDataStream.print();
// Write Result Table to Sink
// Configure Sink
JDBCAppendTableSink pageViewSink = JDBCAppendTableSink.builder()
.setDrivername("org.apache.derby.jdbc.ClientDriver")
.setDBUrl("jdbc:derby://captain:1527/rueggerllc")
.setUsername("chris")
.setPassword("xxxx")
.setBatchSize(1)
.setQuery("INSERT INTO chris.pageclicks
(window_start,window_end,username,viewcount) VALUES (?,?,?,?)")
.setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
.build();
// Write Result Table to Sink
resultTable.writeToSink(pageViewSink);
System.out.println("WRITE TO SINK");
// Execute
env.execute("PageViewsTumble");
}
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/