Thanks! I’ll be watching that issue then Adam
> On 08 Aug 2016, at 05:01, Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi Adam, > sorry for the inconvenience. This is caused by a new file read operator, > specifically how it treats watermarks/timestamps. I opened an issue here that > describes the situation: https://issues.apache.org/jira/browse/FLINK-4329 > <https://issues.apache.org/jira/browse/FLINK-4329>. > > I think this should be fixed for an upcoming 1.1.1 bug fixing release. > > Cheers, > Aljoscha > > On Sat, 6 Aug 2016 at 12:33 Adam Warski <a...@warski.org > <mailto:a...@warski.org>> wrote: > Hello, > > I have a very simple stream where I window data using event-time. > As a data source I’m using a CSV file, sorted by increasing timestamps. > > Here’s the source: > > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > val lines = env.readTextFile(csvFileName) > > lines > .flatMap { l => parseLine(l) } > .assignAscendingTimestamps(t => t.timestampSeconds * 1000L) > .keyBy(t => t.key) > .timeWindow(Time.minutes(30), Time.minutes(5)) > .fold(0)((c, _) => c+1) > .addSink { c => > println(c) > } > > env.execute() > > This used to work fine in 1.0.3, that is the aggregate counts are printed to > stdout. > > However after updating to 1.1, nothing happens - I can see the stages being > initialized (switching state from SCHEDULED to DEPLOYING to RUNNING), but > then immediately going to FINISHED, without printing anything out. > > If I add a .map {x => println(x); x} after .assignAscendingTimestamps I can > see the data flowing - so data *is* being read, just somehow the windowing > causes it to be lost? > > Any ideas on where to look for possible causes? > > Thanks! > > -- > Adam Warski > > http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski> > http://www.softwaremill.com <http://www.softwaremill.com/> > http://www.warski.org <http://www.warski.org/> -- Adam Warski http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski> http://www.softwaremill.com <http://www.softwaremill.com/> http://www.warski.org <http://www.warski.org/>