Hi Aljoscha,

Here's the code:

private static class DataFilterFunImpl extends
RichCoFlatMapFunction<KVTuple6, String, KVTuple6> {
        private JSONParser parser;
        private Map<String, Map<String, ControlJsonConfig>>
whiteListMap = new HashMap<>();

        @Override
        // tuple5(domain, device_type, type, key, count_or_sum)
        public void flatMap1(KVTuple6 dataTuple, Collector<KVTuple6>
collector) throws Exception {
            String type = dataTuple.f2;
            String[] keyValue =
dataTuple.f3.split(RawEventExtractor.Constants.DEFAULT_VALUE_SP);
            String key = keyValue[0];
            switch (type) {
                case RawEventExtractor.Constants.VALUE_COUNT: {
                    if (whiteListMap.containsKey(key)) {
                        ControlJsonConfig ruleConfig =
whiteListMap.get(key).get(RawEventExtractor.Constants.VALUE_COUNT);
                        if (ruleConfig != null) {
                            String value = keyValue.length > 1 ?
keyValue[1] : "";
                            String bucket = ruleConfig.getBucketName(value);
                            if (bucket != null) {

dataTuple.setField(String.join(RawEventExtractor.Constants.DEFAULT_VALUE_SP,
key, bucket), 3);
                                collector.collect(dataTuple);
                            }
                        } else {
                            collector.collect(dataTuple);
                        }
                    }
                    break;
                }
                case RawEventExtractor.Constants.VALUE_SUM: {
                    if (whiteListMap.containsKey(key) &&
whiteListMap.get(key).containsKey(RawEventExtractor.Constants.VALUE_SUM))
{
                        collector.collect(dataTuple);
                    }
                    break;
                }
                default: collector.collect(dataTuple);
            }
        }


        @Override
        public void flatMap2(String jsonStr, Collector<KVTuple6>
collector) throws Exception {
//            Map<String, Map<String, ControlJsonConfig>> whiteListMap
= whiteListMapState.value();
            try {
                if (parser == null) {
                     parser = new JSONParser();
                }
                JSONObject jsonConfig = (JSONObject) parser.parse(jsonStr);
                Tuple2<String, Map<String, ControlJsonConfig>> config
= RawEventExtractor.getKeyConfig(jsonConfig);
                if (config.f1 == null) {
                    whiteListMap.remove(config.f0);
                } else {
                    whiteListMap.put(config.f0, config.f1);
                }
            } catch (Exception e) {}
        }
    }


FYI, if I setParallelism of both the control stream and data stream, the
window function works. Is it necessary to do so for broadcast() function?


On Thu, Mar 9, 2017 at 2:26 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Sam,
> could you please also send the code for the DataFilterFunImpl and your
> timestamps/watermark assigner. That could help in figuring out the problem.
>
> Best,
> Aljoscha
>
>
> On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:
>
> Hi Timo,
>
> The window function sinks the data into InfluxDB, and it's not triggered.
> If I comment the ".timeWindow", and print results after the reduce
> function, it works
> Code for window function is here:
>
> private static class WindowFunImpl implements 
> WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {
>     @Override
>     public void apply(Tuple tuple, TimeWindow window, Iterable<KVTuple6> 
> iterable,
>                       Collector<Point> collector) throws Exception {
>         KVTuple6 kvTypeTuple = iterable.iterator().next();
>         System.*out*.println("window: " + kvTypeTuple);                     
> // Doesn't work here if use broadcast
>         Point.Builder builder = Point.*measurement*(*INFLUXDB_MEASUREMENT*)
>                 .time(window.getStart(), TimeUnit.*MILLISECONDS*)
>                 .tag(*TAG_DOMAIN*, kvTypeTuple.f0)
>                 .tag(*TAG_DEVICE*, kvTypeTuple.f1)
>                 .tag(*TAG_TYPE*, kvTypeTuple.f2)
>                 .tag(*TAG_KEY*, kvTypeTuple.f3)
>                 .addField(*FIELD*, kvTypeTuple.f4);
>
>         collector.collect(builder.build());
>     }
> }
>
>
> On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <twal...@apache.org> wrote:
>
> Hi Sam,
>
> could you explain the behavior a bit more? How does the window function
> behave? Is it not triggered or what is the content? What is the result if
> you don't use a window function?
>
> Timo
>
>
> Am 08/03/17 um 02:59 schrieb Sam Huang:
>
> btw, the reduce function works well, I've printed out the data, and they
> are
> all correct. So are the timestamps and watermarks. And if I remove
> ".broadcast()", the data is successfully sinked.
>
> Any help?
>
>
>
> --
> View this message in context: http://apache-flink-user-maili
> ng-list-archive.2336050.n4.nabble.com/window-function-not-
> working-when-control-stream-broadcast-tp12093p12094.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
>
>
>

Reply via email to