This code runs and returns the correct result on the initial query, but fails 
to trigger as data continues to stream in on Kafka.  Is there anything obvious 
I’m missing?

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
tableEnv = StreamTableEnvironment.create(env);

// Consume RedactionResults from Kafka into DataStream
DataStream<RedactionResult> rrStream =
    env.addSource(kafkaConsumer, "Kafka source for topic: " + getTopic());
 Table rawTable = tableEnv.fromDataStream(rrStream, "lastTry, pid, tid, status, 
UserActionTime.proctime");
rawTable.printSchema();

// This works on initial query, but fails to trigger afterwards.
String query = "SELECT UserActionTime, lastTry, LOCALTIMESTAMP from " + 
rawTable +
    " WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 1)";
logger.debug("Query: " + query);

Table qTable = tableEnv.sqlQuery(query);

Reply via email to