Hi Aljoscha, Here's the code:
private static class DataFilterFunImpl extends RichCoFlatMapFunction<KVTuple6, String, KVTuple6> { private JSONParser parser; private Map<String, Map<String, ControlJsonConfig>> whiteListMap = new HashMap<>(); @Override // tuple5(domain, device_type, type, key, count_or_sum) public void flatMap1(KVTuple6 dataTuple, Collector<KVTuple6> collector) throws Exception { String type = dataTuple.f2; String[] keyValue = dataTuple.f3.split(RawEventExtractor.Constants.DEFAULT_VALUE_SP); String key = keyValue[0]; switch (type) { case RawEventExtractor.Constants.VALUE_COUNT: { if (whiteListMap.containsKey(key)) { ControlJsonConfig ruleConfig = whiteListMap.get(key).get(RawEventExtractor.Constants.VALUE_COUNT); if (ruleConfig != null) { String value = keyValue.length > 1 ? keyValue[1] : ""; String bucket = ruleConfig.getBucketName(value); if (bucket != null) { dataTuple.setField(String.join(RawEventExtractor.Constants.DEFAULT_VALUE_SP, key, bucket), 3); collector.collect(dataTuple); } } else { collector.collect(dataTuple); } } break; } case RawEventExtractor.Constants.VALUE_SUM: { if (whiteListMap.containsKey(key) && whiteListMap.get(key).containsKey(RawEventExtractor.Constants.VALUE_SUM)) { collector.collect(dataTuple); } break; } default: collector.collect(dataTuple); } } @Override public void flatMap2(String jsonStr, Collector<KVTuple6> collector) throws Exception { // Map<String, Map<String, ControlJsonConfig>> whiteListMap = whiteListMapState.value(); try { if (parser == null) { parser = new JSONParser(); } JSONObject jsonConfig = (JSONObject) parser.parse(jsonStr); Tuple2<String, Map<String, ControlJsonConfig>> config = RawEventExtractor.getKeyConfig(jsonConfig); if (config.f1 == null) { whiteListMap.remove(config.f0); } else { whiteListMap.put(config.f0, config.f1); } } catch (Exception e) {} } } FYI, if I setParallelism of both the control stream and data stream, the window function works. Is it necessary to do so for broadcast() function? On Thu, Mar 9, 2017 at 2:26 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi Sam, > could you please also send the code for the DataFilterFunImpl and your > timestamps/watermark assigner. That could help in figuring out the problem. > > Best, > Aljoscha > > > On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote: > > Hi Timo, > > The window function sinks the data into InfluxDB, and it's not triggered. > If I comment the ".timeWindow", and print results after the reduce > function, it works > Code for window function is here: > > private static class WindowFunImpl implements > WindowFunction<KVTuple6,Point,Tuple,TimeWindow> { > @Override > public void apply(Tuple tuple, TimeWindow window, Iterable<KVTuple6> > iterable, > Collector<Point> collector) throws Exception { > KVTuple6 kvTypeTuple = iterable.iterator().next(); > System.*out*.println("window: " + kvTypeTuple); > // Doesn't work here if use broadcast > Point.Builder builder = Point.*measurement*(*INFLUXDB_MEASUREMENT*) > .time(window.getStart(), TimeUnit.*MILLISECONDS*) > .tag(*TAG_DOMAIN*, kvTypeTuple.f0) > .tag(*TAG_DEVICE*, kvTypeTuple.f1) > .tag(*TAG_TYPE*, kvTypeTuple.f2) > .tag(*TAG_KEY*, kvTypeTuple.f3) > .addField(*FIELD*, kvTypeTuple.f4); > > collector.collect(builder.build()); > } > } > > > On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <twal...@apache.org> wrote: > > Hi Sam, > > could you explain the behavior a bit more? How does the window function > behave? Is it not triggered or what is the content? What is the result if > you don't use a window function? > > Timo > > > Am 08/03/17 um 02:59 schrieb Sam Huang: > > btw, the reduce function works well, I've printed out the data, and they > are > all correct. So are the timestamps and watermarks. And if I remove > ".broadcast()", the data is successfully sinked. > > Any help? > > > > -- > View this message in context: http://apache-flink-user-maili > ng-list-archive.2336050.n4.nabble.com/window-function-not- > working-when-control-stream-broadcast-tp12093p12094.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. > > > >