Hi, using event time and assigning timestamps does not order the stream records. In order to do that you can define a window and sort the elements in each window using Java sorting, for example. Alternatively, you can write your own operator which has a priority queue and always emits the elements up to the current watermark.
Cheers, Till On Wed, Sep 7, 2016 at 12:15 PM, jiecxy <253441...@qq.com> wrote: > The program is to read the unordered records from a log file, and to print > the record in order. But it doesn't change the order, is there anything > wrong in my code? Can anyone give me an example? > > > This is my program: > > Note: the class Tokenizer is to transfer the log to four parts. Like this: > Sep 6 09:28:01 master systemd: Stopping user-988.slice. > to > Tuple4<time, master, systemd, Stopping user-988.slice.> > > ------------------------------------------------------------ > --------------------------------------------- > // set up the execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.setParallelism(1); > > StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > > DataStream<String> text = env.addSource(new > FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(), > properties)); > DataStream<Tuple4<Long, String, String, String>> messages = > text.flatMap(new Tokenizer()) > .assignTimestampsAndWatermarks(new > AscendingTimestampExtractor<Tuple4<Long, String, String, String>>() { > @Override > public long > extractAscendingTimestamp(Tuple4<Long, String, String, String> tuple4) { > return tuple4.f0; > } > }); > messages.print().setParallelism(1); > ------------------------------------------------------------ > --------------------------------------------- > > The inputs like this: > Sep 6 09:28:01 master systemd: Stopping user-988.slice1. > Sep 6 09:28:04 master systemd: Stopping user-988.slice4. > Sep 6 09:28:03 master systemd: Stopping user-988.slice3. > Sep 6 09:28:02 master systemd: Stopping user-988.slice2. > > But the outputs are same with input, it doesn't change them to 1 - 2 - 3 - > 4 > (order by the time)... > > > > -- > View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/How-to-assign- > timestamp-for-event-time-in-a-stream-tp8944.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >