Hey guys, I have a pipeline that generates two different types of data (but both use the same trait) and I need to save each on a different sink.
So far, things were working with splits, but it seems using splits with side outputs (for the late data, which we'll plug a late arrival handling) causes errors, so I changed everything to side outputs. To select a side output based on type, I did the following: class MetricTypeSplitter(accountingTag:OutputTag[Metric], analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric, Metric] { val logger = LoggerFactory.getLogger(this.getClass) override def processElement( value:Metric, ctx:ProcessFunction[Metric, Metric]#Context, out:Collector[Metric] ): Unit = { out.collect(value) value match { case record:AccountingMetric => { logger.info(s"Sending ${record} to Accounting") ctx.output(accountingTag, record) } case record:AnalysingMetric => { logger.info(s"Sending ${record} to Analysis") ctx.output(analysingTag, record) } case _ => { logger.error(s"Don't know the type of ${value}") } } } } And at the end of the pipeline I add the splitter: pipeline .process(new MetricTypeSplitter(accountTag, analysisTag)) So far, this works and I can see the logs of which tag each metric in being sent being generated. The second part, in which I capture the side output and send the data to sink, doesn't seem to work, though: pipeline .getSideOutput(accountTag) .map { tuple => AccountingSink.toRow(tuple) }.name("Accounting rows") .writeUsingOutputFormat(accountingSink.output).name(s"${accountingSink}") And here is the problem: It seems .getSideOutput() is never actually getting the side output because a the logger in AccoutingSink.toRow() is never happening and the data is not showing on our database (toRow() convers the Metric to a Row and accountingSInk.output returns the JDBCOutputFormat). Any ideas what I need to do for side outputs to be actually captured? -- *Julio Biason*, Sofware Engineer *AZION* | Deliver. Accelerate. Protect. Office: +55 51 3083 8101 <callto:+555130838101> | Mobile: +55 51 <callto:+5551996209291>*99907 0554*