Hello, I have a very simple stream where I window data using event-time. As a data source I’m using a CSV file, sorted by increasing timestamps.
Here’s the source: val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val lines = env.readTextFile(csvFileName) lines .flatMap { l => parseLine(l) } .assignAscendingTimestamps(t => t.timestampSeconds * 1000L) .keyBy(t => t.key) .timeWindow(Time.minutes(30), Time.minutes(5)) .fold(0)((c, _) => c+1) .addSink { c => println(c) } env.execute() This used to work fine in 1.0.3, that is the aggregate counts are printed to stdout. However after updating to 1.1, nothing happens - I can see the stages being initialized (switching state from SCHEDULED to DEPLOYING to RUNNING), but then immediately going to FINISHED, without printing anything out. If I add a .map {x => println(x); x} after .assignAscendingTimestamps I can see the data flowing - so data *is* being read, just somehow the windowing causes it to be lost? Any ideas on where to look for possible causes? Thanks! -- Adam Warski http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski> http://www.softwaremill.com <http://www.softwaremill.com/> http://www.warski.org <http://www.warski.org/>