Not that I necessarily need that for this particular example, but is there a
Global State available?
IE, how can I make a state available across all parallel instances of an
operator?
On Tuesday, November 17, 2015 1:49 PM, Vladimir Stoyak <[email protected]>
wrote:
Perfect! It does explain my problem.
Thanks a lot
On Tuesday, November 17, 2015 1:43 PM, Stephan Ewen <[email protected]>
wrote:
Is the CoFlatMapFunction intended to be executed in parallel?
If yes, you need some way to deterministically assign which record goes to
which parallel instance. In some way the CoFlatMapFunction does a parallel
(partitions) join between the model and the result of the session windows, so
you need some form of key that selects which partition the elements go to. Does
that make sense?
If not, try to set it to parallelism 1 explicitly.
Greetings,Stephan
On Tue, Nov 17, 2015 at 1:11 PM, Vladimir Stoyak <[email protected]> wrote:
My model DataStream is not keyed and does not have any windows, only the main
stream has windows and apply function
I have two Kafka Streams, one for events and one for model
DataStream<Model> model_stream = env.addSource(new
FlinkKafkaConsumer082<Model>(model_topic, new
AvroDeserializationSchema(Model.class), properties));DataStream<Raw>
main_stream = env.addSource(new FlinkKafkaConsumer082<Raw>(raw_topic, new
AvroDeserializationSchema(Raw.class), properties));
My topology looks like this:main_stream.assignTimestamps(new
myTimeExtractor()).keyBy("event_key").window(GlobalWindows.create()).trigger(new
sessionTrigger(session_timeout)).apply(new
AggFunction()).connect(model_stream).flatMap(new applyModel()).print();
AggFunction is a simple aggregate function:Long start_ts=Long.MAX_VALUE;
Long end_ts=Long.MIN_VALUE; Long dwell_time=0L,last_event_ts=0L;
int size = Lists.newArrayList(values).size();
for (Raw value: values) { if(value.getTs() > end_ts) end_ts
= value.getTs(); if (value.getTs() < start_ts) start_ts =
value.getTs();
if(last_event_ts == 0L){ last_event_ts =
value.getTs(); } else { dwell_time += value.getTs() -
last_event_ts; last_event_ts = value.getTs(); }
}
out.collect(new Features(tuple.getField(0), tuple.getField(2),
tuple.getField(1), start_ts, end_ts, size, dwell_time, Boolean.FALSE));
On Tuesday, November 17, 2015 12:59 PM, Stephan Ewen <[email protected]>
wrote:
Hi!
Can you give us a bit more context? For example share the structure of the
program (what stream get windowed and connected in what way)?
I would guess that the following is the problem:
When you connect one stream to another, then partition n of the first stream
connects with partition n of the other stream.When you do a keyBy().window()
then the system reshuffles the data, and the records are in different
partitions, meaning that they arrive in other instances of the
CoFlatMapFunction.
You can also call keyBy() before both inputs to make sure that the records are
properly routed...
Greetings,Stephan
On Tue, Nov 17, 2015 at 12:29 PM, Vladimir Stoyak <[email protected]> wrote:
Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place it on
the DataStream before window but fails if placed after window's “apply”
function.I was testing two streams, main “Features” on flatMap1 constantly
ingesting data and control stream “Model” on flatMap2 changing the model on
request.I am able to set and see b0/b1 properly set in flatMap2, but flatMap1
always see b0 and b1 as was set to 0 at the initialization.Am I missing
something obvious here?Thanks a lot, Vladimirpublic static class applyModel
implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
private static final long serialVersionUID = 1L;
Double b0;
Double b1;
public applyModel(){
b0=0.0;
b1=0.0;
}
@Override
public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
System.out.print("Main: " + this + "\n");
}
@Override
public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
System.out.print("Old Model: " + this + "\n");
b0 = value.getB0();
b1 = value.getB1();
System.out.print("New Model: " + this + "\n");
}
@Override
public String toString(){
return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
}
}