Hi guys,

I want to merge 2 diffrent stream, one is config stream and the other is the 
value json, to check again that config. Its seem like the CoFlatMapFunction 
should be used.
Here my sample:

    val filterStream: ConnectedStreams[ControlEvent, 
JsValue]=(specificControlStream).connect(eventStream)
    class FilterFunction() extends CoFlatMapFunction[ControlEvent, JsValue, 
FilteredEvent] {
      var configs = new ControlEvent(1, "a”) # default
      PPLogger.getActivityLogger.info("# init ")
      override def flatMap1(value: ControlEvent, out: 
Collector[FilteredEvent]): Unit = {
        PPLogger.getActivityLogger.info("# f1 value %s ".format(value.jsonPath))
        configs =  value
        PPLogger.getActivityLogger.info("# f1 current config %s 
".format(configs))
      }
      override def flatMap2(value: JsValue, out: Collector[FilteredEvent]): 
Unit = {
          PPLogger.getActivityLogger.info("# f2 current config %s 
".format(configs))
          PPLogger.getActivityLogger.info("# f2 current value %s 
".format(value.toString()))
      }
    }
    val x = new FilterFunction()
    filterStream.flatMap(x)
….


How I sent message (kafka)
——————
+send eventStream msg
+send configStream msg
+send eventStream msg
——————

My log result look like this
2019-06-06 10:15:21 INFO  activity                                              
       %PARSER_ERROR[x] - # f2 current config ControlEvent(1,a)
2019-06-06 10:15:21 INFO  activity                                              
       %PARSER_ERROR[x] - # f2 current value 
{"organization_id":1,"randomeText":"text_random35","value":{"to_compare":"zzzzz_xxxxxx_4"}}
2019-06-06 10:15:24 INFO  activity                                              
       %PARSER_ERROR[x] - # f1 value zzzzz_xxxxxx_2
2019-06-06 10:15:24 INFO  activity                                              
       %PARSER_ERROR[x] - # f1 current config ControlEvent(1,zzzzz_xxxxxx_2)
2019-06-06 10:15:30 INFO  activity                                              
       %PARSER_ERROR[x] - # f2 current config ControlEvent(1,a)
2019-06-06 10:15:30 INFO  activity                                              
       %PARSER_ERROR[x] - # f2 current value 
{"organization_id":1,"randomeText":"text_random35","value":{"to_compare":"zzzzz_xxxxxx_4"}}


My understanding is:
when flatmap1 is run, the config will be changed, and this change will share 
state (configs is updated) with flatmap2.
But the result log is really different

-----------
I tried use configs as mutable.ListBuffer to collect the history then tried to 
update configs in both flatmap1 and flatmap2, I see that those two configs 
variable from flatmap1 and flatmap2 is 2 different variable (but same name in 
the class!).


My env:
- Flink minicluster (sbt run)
- Flink 1.7.2
- Kafka 1.0
- Scala 2.11



Reply via email to