大家好,

先说本人的理解,keyed(..).flatmap(mapFunc())
其中,每个具体mapFunc处理的数据,应该是相同的key数据。不知理解是否正确。

我的具体情况是
  我对数据对校验处理。首先根据设备id (uuid) 分组,然后针对不同分组进行数据校验。
部分代码如下:

    rowData.filter(legalData _)
   .map(data => BehaviorComVO(getText(data, "id"), getText(data, "uuid"),
getText(data, "session_id"), getText(data, "source"), getText(data,
"product_version")))
   *  .keyBy(_.uuid)*
*     .flatMap(new RepeatIdCheckDispatch())*
     .addSink(....)

*RepeatIdCheckDispatch*  细节:

*  override def flatMap(in: BehaviorComVO, out: Collector[String]): Unit =
{*

*    in match {*
*      case BehaviorComVO(_, _, _, "visit", _) =>*
*        if (!repeatIdChecker.isOK) out.collect(repeatIdChecker.result)*

*        repeatIdChecker = RepeatIdChecker(in)*

*      case _: BehaviorComVO => repeatIdChecker.doCheck(in)*
*    }*
*  }*

"visit" 是一个周期数据的开始。。但是运行之后,我发现,有其他uuid的数据,进入到同一个 *RepeatIdChecker 中*,

回复