Hi,

using event time and assigning timestamps does not order the stream
records. In order to do that you can define a window and sort the elements
in each window using Java sorting, for example. Alternatively, you can
write your own operator which has a priority queue and always emits the
elements up to the current watermark.

Cheers,
Till

On Wed, Sep 7, 2016 at 12:15 PM, jiecxy <253441...@qq.com> wrote:

> The program is to read the unordered records from a log file, and to print
> the record in order. But it doesn't change the  order, is there anything
> wrong in my code?  Can anyone give me an example?
>
>
> This is my program:
>
> Note: the class Tokenizer is to transfer the log to four parts. Like this:
>    Sep  6 09:28:01 master systemd: Stopping user-988.slice.
> to
>   Tuple4<time, master, systemd,  Stopping user-988.slice.>
>
> ------------------------------------------------------------
> ---------------------------------------------
>         // set up the execution environment
>         final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         env.setParallelism(1);
>
>         StreamTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
>
>         DataStream<String> text = env.addSource(new
> FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),
> properties));
>         DataStream<Tuple4&lt;Long, String, String, String>> messages =
>                 text.flatMap(new Tokenizer())
>                         .assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<Tuple4&lt;Long, String, String, String>>() {
>                             @Override
>                             public long
> extractAscendingTimestamp(Tuple4<Long, String, String, String> tuple4) {
>                                 return tuple4.f0;
>                             }
>                         });
>         messages.print().setParallelism(1);
> ------------------------------------------------------------
> ---------------------------------------------
>
> The inputs like this:
> Sep  6 09:28:01 master systemd: Stopping user-988.slice1.
> Sep  6 09:28:04 master systemd: Stopping user-988.slice4.
> Sep  6 09:28:03 master systemd: Stopping user-988.slice3.
> Sep  6 09:28:02 master systemd: Stopping user-988.slice2.
>
> But the outputs are same with input, it doesn't change them to 1 - 2 - 3 -
> 4
> (order by the time)...
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/How-to-assign-
> timestamp-for-event-time-in-a-stream-tp8944.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to