Goodmorning :-),

Thank you for your answer. Let me explain my problem more thoroughly (maybe other options are possible here, not necessary with allowedLateness).

The most compact description of my problem would be Stream Enrichment. More concrete, suppose I have two streams, where I want to enrich one stream with another stream:

val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b", "a", "c", "b", "a", "c")

val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A"))


The goal here, is to mix information from the infoStream into the mainStream so the enriched stream would contain the following tuples:

("a", "Whoops, it is A")
("a", "Whoops, it is A")
("b", "It is B")
("a", "Whoops, it is A")
("a", "Whoops, it is A")
("b", "It is B")
("b", "It is B")
("a", "Whoops, it is A")
("c", "It is C")
("b", "It is B")
("a", "Whoops, it is A")
("c", "It is C")


It is not a requirement that the enriched stream has the same ordering on the elements as the mainStream. However, it is important that new elements in the infoStream override older elements from the infoStream. You can see here that (4, "a", "Whoops, it is A") arrived later than (1, "a", "It is F") (if you look at the event time, which is the first element of every tuple). So, the infoStream (at t=4) should contain the following tuples:

(4, "a", "Whoops, it is A")
(2, "b", "It is B")
(3, "c", "It is C")


So, what I thought of, is iterating over the infoStream can keep relevant records. Then coGroup it with the mainStream to enrich it. This approach works fine:

val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b", "a", "c", "b", "a", "c") val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A"))
.keyBy(1)
.iterate(iteration => {
    val filtered = iteration.keyBy(1).maxBy(0)
    (iteration, filtered)
})

mainStream
.coGroup(infoStream)
    .where[String]((x: String) => x)
    .equalTo(_._2)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(50))) {
(first: Iterator[String], second: Iterator[(Int, String, String)], out: Collector[(String, String)]) => { val records = scala.collection.mutable.MutableList[(Int, String, String)]()
            for ((record: (Int, String, String)) <- second) {
                records += record
            }

            for ((key: String) <- first) {
                var bestDescription = "?"
                for (record <- records) {
                    if (record._2 == key) {
                        bestDescription = record._3
                    }
                }
                out.collect((key, bestDescription))
            }
        }
    }
.print()


My questions for you:
- Can I make this more efficient?
- Is there a way of mixing datasets and datastreams? That would be really awesome (for at least this use case). - Is there a way to ensure checkpoints, since I am using an iterative stream here? - Can I get rid of the TumblingProcessingTimeWindows? Because in fact, all of this can be done by Apache Spark. It would be great if Apache Flink could archieve a higher throughput rate than Apache Spark in this use case.

I am curious to your answers!

Cheers,
Kevin


On 29.07.2016 10:40, Maximilian Michels wrote:
Hi!

I'm not sure whether I understand your question. The purpose of Event
Time is to be able to process out-of-order events. Do you want to
discard late elements? In the upcoming Flink 1.1.0 you can set the
`allowedLateness` on a windowed stream. The default is 0, so late
elements are discarded; late elements are elements which arrive after
the Watermark has reached the operator.

Cheers,
Max

On Thu, Jul 28, 2016 at 6:49 PM, Kevin Jacobs <kevin.jac...@cern.ch> wrote:
Is it possible to discard events that are out-of-order (in terms of event
time)?


Reply via email to