Hmm, that's interesting.
HOP and TUMBLE window aggregations are directly translated into their
corresponding DataStream counterparts (Sliding, Tumble).
There should be no filtering of records.

I assume you tried a simple query like "SELECT * FROM MyEventTable" and
received all expected data?

Fabian

2018-09-17 18:56 GMT+02:00 elliotst...@gmail.com <elliotst...@gmail.com>:

> Yes, I am certain events are being ignored or dropped during the first
> five seconds.  Further investigation on my part reveals that the "ignore"
> period is exactly the first five seconds of the stream - regardless of the
> size of the window.
>
> Situation
>
> I have a script which pushes an event into Kafka once every second
> structured as:
>
> {"userId": "use...@email.com", "timestamp": <timestamp from the producer>}
>
> My stream uses this Kafka queue as its source.  JSON schema and table
> schema are as follows:
>
> final Json jsonFormat = new Json()
>         .failOnMissingField(false)
>         .jsonSchema("{"
>             + "  type: 'object',"
>             + "  properties: {"
>             + "    userId: { type: 'string' },"
>             + "    timestamp: { type: 'integer' }"
>             + "  }"
>             + "}");
>
> final Schema tableSchema = new Schema()
>         .field("userId", Types.STRING())
>         .field("timestamp", TypeInformation.of(BigDecimal.class))
>         .field("proctime", Types.SQL_TIMESTAMP())
>         .proctime();
>
> StreamTableEnvironment is configured to be in append mode, and table
> source is named "MyEventTable".  The stream is using the following SQL
> query:
>
> final String sql =
>     " SELECT userId, `timestamp` "
>         + " FROM MyEventTable "
>         + " GROUP BY HOP(proctime, INTERVAL '1' SECOND, INTERVAL '10'
> SECOND), userId, `timestamp` ";
> final Table resultTable = tableEnvironment.sqlQuery(sql);
>
> Code which I'm using to verify that events are being dropped:
>
> streamTableEnvironment.toAppendStream(sqlResultTable, Row.class)
>     .map((MapFunction<Row, String>) row -> {
>       final String userId = row.getField(0).toString();
>       final BigDecimal timestamp = (BigDecimal) row.getField(1);
>
>       return String.format(
>           "(%s, %s)",
>           userId, timestamp.toString()
>       );
>     })
>     .print();
>
>
> No events produced during the first five seconds following a cold start of
> Flink are ever printed to the console.  Any and all events produced after
> the first five seconds following a cold start of Flink are always printed
> to the console.  All processes are running on the same system.
>
> This issue does not occur when using raw Flink (windowed or not) nor when
> using Flink CEP.  Again, have not tried Table API.
>
>

Reply via email to