Hi John, I suppose that was caused by the groupBy field “timestamp”. You were actually grouping on two time fields simultaneously, the processing time and the time from your producer. As @Rong suggested, try removing the additional groupBy field “timestamp” and check the result again.
Best, Xingcan > On Sep 18, 2018, at 6:50 AM, Rong Rong <walter...@gmail.com> wrote: > > This is in fact a very strange behavior. > > To add to the discussion, when you mentioned: "raw Flink (windowed or not) > nor when using Flink CEP", how were the comparisons being done? > Also, were you able to get the results correct without the additional GROUP > BY term of "foo" or "userId"? > > -- > Rong > > On Mon, Sep 17, 2018 at 12:30 PM Fabian Hueske <fhue...@gmail.com > <mailto:fhue...@gmail.com>> wrote: > Hmm, that's interesting. > HOP and TUMBLE window aggregations are directly translated into their > corresponding DataStream counterparts (Sliding, Tumble). > There should be no filtering of records. > > I assume you tried a simple query like "SELECT * FROM MyEventTable" and > received all expected data? > > Fabian > > 2018-09-17 18:56 GMT+02:00 elliotst...@gmail.com > <mailto:elliotst...@gmail.com> <elliotst...@gmail.com > <mailto:elliotst...@gmail.com>>: > Yes, I am certain events are being ignored or dropped during the first five > seconds. Further investigation on my part reveals that the "ignore" period > is exactly the first five seconds of the stream - regardless of the size of > the window. > > Situation > > I have a script which pushes an event into Kafka once every second structured > as: > > {"userId": "use...@email.com <mailto:use...@email.com>", "timestamp": > <timestamp from the producer>} > > My stream uses this Kafka queue as its source. JSON schema and table schema > are as follows: > > final Json jsonFormat = new Json() > .failOnMissingField(false) > .jsonSchema("{" > + " type: 'object'," > + " properties: {" > + " userId: { type: 'string' }," > + " timestamp: { type: 'integer' }" > + " }" > + "}"); > > final Schema tableSchema = new Schema() > .field("userId", Types.STRING()) > .field("timestamp", TypeInformation.of(BigDecimal.class)) > .field("proctime", Types.SQL_TIMESTAMP()) > .proctime(); > > StreamTableEnvironment is configured to be in append mode, and table source > is named "MyEventTable". The stream is using the following SQL query: > > final String sql = > " SELECT userId, `timestamp` " > + " FROM MyEventTable " > + " GROUP BY HOP(proctime, INTERVAL '1' SECOND, INTERVAL '10' > SECOND), userId, `timestamp` "; > final Table resultTable = tableEnvironment.sqlQuery(sql); > > Code which I'm using to verify that events are being dropped: > > streamTableEnvironment.toAppendStream(sqlResultTable, Row.class) > .map((MapFunction<Row, String>) row -> { > final String userId = row.getField(0).toString(); > final BigDecimal timestamp = (BigDecimal) row.getField(1); > > return String.format( > "(%s, %s)", > userId, timestamp.toString() > ); > }) > .print(); > > > No events produced during the first five seconds following a cold start of > Flink are ever printed to the console. Any and all events produced after the > first five seconds following a cold start of Flink are always printed to the > console. All processes are running on the same system. > > This issue does not occur when using raw Flink (windowed or not) nor when > using Flink CEP. Again, have not tried Table API. > >