Hmm, that's interesting. HOP and TUMBLE window aggregations are directly translated into their corresponding DataStream counterparts (Sliding, Tumble). There should be no filtering of records.
I assume you tried a simple query like "SELECT * FROM MyEventTable" and received all expected data? Fabian 2018-09-17 18:56 GMT+02:00 elliotst...@gmail.com <elliotst...@gmail.com>: > Yes, I am certain events are being ignored or dropped during the first > five seconds. Further investigation on my part reveals that the "ignore" > period is exactly the first five seconds of the stream - regardless of the > size of the window. > > Situation > > I have a script which pushes an event into Kafka once every second > structured as: > > {"userId": "use...@email.com", "timestamp": <timestamp from the producer>} > > My stream uses this Kafka queue as its source. JSON schema and table > schema are as follows: > > final Json jsonFormat = new Json() > .failOnMissingField(false) > .jsonSchema("{" > + " type: 'object'," > + " properties: {" > + " userId: { type: 'string' }," > + " timestamp: { type: 'integer' }" > + " }" > + "}"); > > final Schema tableSchema = new Schema() > .field("userId", Types.STRING()) > .field("timestamp", TypeInformation.of(BigDecimal.class)) > .field("proctime", Types.SQL_TIMESTAMP()) > .proctime(); > > StreamTableEnvironment is configured to be in append mode, and table > source is named "MyEventTable". The stream is using the following SQL > query: > > final String sql = > " SELECT userId, `timestamp` " > + " FROM MyEventTable " > + " GROUP BY HOP(proctime, INTERVAL '1' SECOND, INTERVAL '10' > SECOND), userId, `timestamp` "; > final Table resultTable = tableEnvironment.sqlQuery(sql); > > Code which I'm using to verify that events are being dropped: > > streamTableEnvironment.toAppendStream(sqlResultTable, Row.class) > .map((MapFunction<Row, String>) row -> { > final String userId = row.getField(0).toString(); > final BigDecimal timestamp = (BigDecimal) row.getField(1); > > return String.format( > "(%s, %s)", > userId, timestamp.toString() > ); > }) > .print(); > > > No events produced during the first five seconds following a cold start of > Flink are ever printed to the console. Any and all events produced after > the first five seconds following a cold start of Flink are always printed > to the console. All processes are running on the same system. > > This issue does not occur when using raw Flink (windowed or not) nor when > using Flink CEP. Again, have not tried Table API. > >