Hi John,

I suppose that was caused by the groupBy field “timestamp”. You were actually 
grouping on two time fields simultaneously, the processing time and the time 
from your producer. As @Rong suggested, try removing the additional groupBy 
field “timestamp” and check the result again.

Best,
Xingcan

> On Sep 18, 2018, at 6:50 AM, Rong Rong <walter...@gmail.com> wrote:
> 
> This is in fact a very strange behavior. 
> 
> To add to the discussion, when you mentioned: "raw Flink (windowed or not) 
> nor when using Flink CEP", how were the comparisons being done? 
> Also, were you able to get the results correct without the additional GROUP 
> BY term of "foo" or "userId"?
> 
> --
> Rong
> 
> On Mon, Sep 17, 2018 at 12:30 PM Fabian Hueske <fhue...@gmail.com 
> <mailto:fhue...@gmail.com>> wrote:
> Hmm, that's interesting. 
> HOP and TUMBLE window aggregations are directly translated into their 
> corresponding DataStream counterparts (Sliding, Tumble).
> There should be no filtering of records.
> 
> I assume you tried a simple query like "SELECT * FROM MyEventTable" and 
> received all expected data?
> 
> Fabian
> 
> 2018-09-17 18:56 GMT+02:00 elliotst...@gmail.com 
> <mailto:elliotst...@gmail.com> <elliotst...@gmail.com 
> <mailto:elliotst...@gmail.com>>:
> Yes, I am certain events are being ignored or dropped during the first five 
> seconds.  Further investigation on my part reveals that the "ignore" period 
> is exactly the first five seconds of the stream - regardless of the size of 
> the window.
> 
> Situation
> 
> I have a script which pushes an event into Kafka once every second structured 
> as:
> 
> {"userId": "use...@email.com <mailto:use...@email.com>", "timestamp": 
> <timestamp from the producer>}
> 
> My stream uses this Kafka queue as its source.  JSON schema and table schema 
> are as follows:
> 
> final Json jsonFormat = new Json()
>         .failOnMissingField(false)
>         .jsonSchema("{"
>             + "  type: 'object',"
>             + "  properties: {"
>             + "    userId: { type: 'string' },"
>             + "    timestamp: { type: 'integer' }"
>             + "  }"
>             + "}");
> 
> final Schema tableSchema = new Schema()
>         .field("userId", Types.STRING())
>         .field("timestamp", TypeInformation.of(BigDecimal.class))
>         .field("proctime", Types.SQL_TIMESTAMP())
>         .proctime();
> 
> StreamTableEnvironment is configured to be in append mode, and table source 
> is named "MyEventTable".  The stream is using the following SQL query:
> 
> final String sql =
>     " SELECT userId, `timestamp` "
>         + " FROM MyEventTable "
>         + " GROUP BY HOP(proctime, INTERVAL '1' SECOND, INTERVAL '10' 
> SECOND), userId, `timestamp` ";
> final Table resultTable = tableEnvironment.sqlQuery(sql);
> 
> Code which I'm using to verify that events are being dropped:
> 
> streamTableEnvironment.toAppendStream(sqlResultTable, Row.class)
>     .map((MapFunction<Row, String>) row -> {
>       final String userId = row.getField(0).toString();
>       final BigDecimal timestamp = (BigDecimal) row.getField(1);
> 
>       return String.format(
>           "(%s, %s)",
>           userId, timestamp.toString()
>       );
>     })
>     .print();
> 
> 
> No events produced during the first five seconds following a cold start of 
> Flink are ever printed to the console.  Any and all events produced after the 
> first five seconds following a cold start of Flink are always printed to the 
> console.  All processes are running on the same system.
> 
> This issue does not occur when using raw Flink (windowed or not) nor when 
> using Flink CEP.  Again, have not tried Table API.
> 
> 

Reply via email to