This is in fact a very strange behavior. To add to the discussion, when you mentioned: "raw Flink (windowed or not) nor when using Flink CEP", how were the comparisons being done? Also, were you able to get the results correct without the additional GROUP BY term of "foo" or "userId"?
-- Rong On Mon, Sep 17, 2018 at 12:30 PM Fabian Hueske <fhue...@gmail.com> wrote: > Hmm, that's interesting. > HOP and TUMBLE window aggregations are directly translated into their > corresponding DataStream counterparts (Sliding, Tumble). > There should be no filtering of records. > > I assume you tried a simple query like "SELECT * FROM MyEventTable" and > received all expected data? > > Fabian > > 2018-09-17 18:56 GMT+02:00 elliotst...@gmail.com <elliotst...@gmail.com>: > >> Yes, I am certain events are being ignored or dropped during the first >> five seconds. Further investigation on my part reveals that the "ignore" >> period is exactly the first five seconds of the stream - regardless of the >> size of the window. >> >> Situation >> >> I have a script which pushes an event into Kafka once every second >> structured as: >> >> {"userId": "use...@email.com", "timestamp": <timestamp from the >> producer>} >> >> My stream uses this Kafka queue as its source. JSON schema and table >> schema are as follows: >> >> final Json jsonFormat = new Json() >> .failOnMissingField(false) >> .jsonSchema("{" >> + " type: 'object'," >> + " properties: {" >> + " userId: { type: 'string' }," >> + " timestamp: { type: 'integer' }" >> + " }" >> + "}"); >> >> final Schema tableSchema = new Schema() >> .field("userId", Types.STRING()) >> .field("timestamp", TypeInformation.of(BigDecimal.class)) >> .field("proctime", Types.SQL_TIMESTAMP()) >> .proctime(); >> >> StreamTableEnvironment is configured to be in append mode, and table >> source is named "MyEventTable". The stream is using the following SQL >> query: >> >> final String sql = >> " SELECT userId, `timestamp` " >> + " FROM MyEventTable " >> + " GROUP BY HOP(proctime, INTERVAL '1' SECOND, INTERVAL '10' >> SECOND), userId, `timestamp` "; >> final Table resultTable = tableEnvironment.sqlQuery(sql); >> >> Code which I'm using to verify that events are being dropped: >> >> streamTableEnvironment.toAppendStream(sqlResultTable, Row.class) >> .map((MapFunction<Row, String>) row -> { >> final String userId = row.getField(0).toString(); >> final BigDecimal timestamp = (BigDecimal) row.getField(1); >> >> return String.format( >> "(%s, %s)", >> userId, timestamp.toString() >> ); >> }) >> .print(); >> >> >> No events produced during the first five seconds following a cold start >> of Flink are ever printed to the console. Any and all events produced >> after the first five seconds following a cold start of Flink are always >> printed to the console. All processes are running on the same system. >> >> This issue does not occur when using raw Flink (windowed or not) nor when >> using Flink CEP. Again, have not tried Table API. >> >> >