Yes, I am certain events are being ignored or dropped during the first five 
seconds.  Further investigation on my part reveals that the "ignore" period is 
exactly the first five seconds of the stream - regardless of the size of the 
window.

Situation

I have a script which pushes an event into Kafka once every second structured 
as:

{"userId": "use...@email.com", "timestamp": <timestamp from the producer>}

My stream uses this Kafka queue as its source.  JSON schema and table schema 
are as follows:

final Json jsonFormat = new Json()
        .failOnMissingField(false)
        .jsonSchema("{"
            + "  type: 'object',"
            + "  properties: {"
            + "    userId: { type: 'string' },"
            + "    timestamp: { type: 'integer' }"
            + "  }"
            + "}");

final Schema tableSchema = new Schema()
        .field("userId", Types.STRING())
        .field("timestamp", TypeInformation.of(BigDecimal.class))
        .field("proctime", Types.SQL_TIMESTAMP())
        .proctime();

StreamTableEnvironment is configured to be in append mode, and table source is 
named "MyEventTable".  The stream is using the following SQL query:

final String sql =
    " SELECT userId, `timestamp` "
        + " FROM MyEventTable "
        + " GROUP BY HOP(proctime, INTERVAL '1' SECOND, INTERVAL '10' SECOND), 
userId, `timestamp` ";
final Table resultTable = tableEnvironment.sqlQuery(sql);

Code which I'm using to verify that events are being dropped:

streamTableEnvironment.toAppendStream(sqlResultTable, Row.class)
    .map((MapFunction<Row, String>) row -> {
      final String userId = row.getField(0).toString();
      final BigDecimal timestamp = (BigDecimal) row.getField(1);

      return String.format(
          "(%s, %s)",
          userId, timestamp.toString()
      );
    })
    .print();


No events produced during the first five seconds following a cold start of 
Flink are ever printed to the console.  Any and all events produced after the 
first five seconds following a cold start of Flink are always printed to the 
console.  All processes are running on the same system.

This issue does not occur when using raw Flink (windowed or not) nor when using 
Flink CEP.  Again, have not tried Table API.

Reply via email to