Hi guys, I want to merge 2 diffrent stream, one is config stream and the other is the value json, to check again that config. Its seem like the CoFlatMapFunction should be used. Here my sample:
val filterStream: ConnectedStreams[ControlEvent, JsValue]=(specificControlStream).connect(eventStream) class FilterFunction() extends CoFlatMapFunction[ControlEvent, JsValue, FilteredEvent] { var configs = new ControlEvent(1, "a”) # default PPLogger.getActivityLogger.info("# init ") override def flatMap1(value: ControlEvent, out: Collector[FilteredEvent]): Unit = { PPLogger.getActivityLogger.info("# f1 value %s ".format(value.jsonPath)) configs = value PPLogger.getActivityLogger.info("# f1 current config %s ".format(configs)) } override def flatMap2(value: JsValue, out: Collector[FilteredEvent]): Unit = { PPLogger.getActivityLogger.info("# f2 current config %s ".format(configs)) PPLogger.getActivityLogger.info("# f2 current value %s ".format(value.toString())) } } val x = new FilterFunction() filterStream.flatMap(x) …. How I sent message (kafka) —————— +send eventStream msg +send configStream msg +send eventStream msg —————— My log result look like this 2019-06-06 10:15:21 INFO activity %PARSER_ERROR[x] - # f2 current config ControlEvent(1,a) 2019-06-06 10:15:21 INFO activity %PARSER_ERROR[x] - # f2 current value {"organization_id":1,"randomeText":"text_random35","value":{"to_compare":"zzzzz_xxxxxx_4"}} 2019-06-06 10:15:24 INFO activity %PARSER_ERROR[x] - # f1 value zzzzz_xxxxxx_2 2019-06-06 10:15:24 INFO activity %PARSER_ERROR[x] - # f1 current config ControlEvent(1,zzzzz_xxxxxx_2) 2019-06-06 10:15:30 INFO activity %PARSER_ERROR[x] - # f2 current config ControlEvent(1,a) 2019-06-06 10:15:30 INFO activity %PARSER_ERROR[x] - # f2 current value {"organization_id":1,"randomeText":"text_random35","value":{"to_compare":"zzzzz_xxxxxx_4"}} My understanding is: when flatmap1 is run, the config will be changed, and this change will share state (configs is updated) with flatmap2. But the result log is really different ----------- I tried use configs as mutable.ListBuffer to collect the history then tried to update configs in both flatmap1 and flatmap2, I see that those two configs variable from flatmap1 and flatmap2 is 2 different variable (but same name in the class!). My env: - Flink minicluster (sbt run) - Flink 1.7.2 - Kafka 1.0 - Scala 2.11