Hi Julio,

I tried to reproduce your problem locally but everything run correctly. Could you share a little example job with us?

This worked for me:

class TestingClass {
  var hello:Int =0 }

class TestAextends TestingClass {
  var test:String = _
}

def main(args: Array[String]) {

  // set up the execution environment val env = 
StreamExecutionEnvironment.getExecutionEnvironment // get input data val text = 
env.fromElements(WordCountData.WORDS: _*)

  val outputTag =OutputTag[(String, Int)]("side-output")
  val outputTag2 =OutputTag[TestingClass]("side-output2")

  val counts: DataStream[(String, Int)] = text
    // split up the lines in pairs (2-tuples) containing: (word,1) 
.flatMap(_.toLowerCase.split("\\W+"))
    .filter(_.nonEmpty)
    .map((_, 1))
    // group by the tuple field "0" and sum up tuple field "1" .keyBy(0)
    .sum(1)
      .process(new ProcessFunction[(String, Int), (String, Int)] {
        override def processElement(value: (String, Int), ctx: 
ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, 
Int)]):Unit = {
          ctx.output(outputTag, value)
          ctx.output(outputTag2, new TestingClass)
          ctx.output(outputTag2, new TestA)
        }
      })

  counts.getSideOutput(outputTag).print()
  counts.getSideOutput(outputTag2).print()

  // execute program env.execute("Streaming WordCount")
}


Are the Metric classes proper POJO types?

Regards,
Timo


Am 02.04.18 um 21:53 schrieb Julio Biason:
Hey guys,

I have a pipeline that generates two different types of data (but both use the same trait) and I need to save each on a different sink.

So far, things were working with splits, but it seems using splits with side outputs (for the late data, which we'll plug a late arrival handling) causes errors, so I changed everything to side outputs.

To select a side output based on type, I did the following:

class MetricTypeSplitter(accountingTag:OutputTag[Metric], analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric, Metric] {

  val logger = LoggerFactory.getLogger(this.getClass)

  override def processElement(
    value:Metric,
    ctx:ProcessFunction[Metric, Metric]#Context,
    out:Collector[Metric]
  ): Unit = {
    out.collect(value)
    value match {
      case record:AccountingMetric => {
logger.info <http://logger.info>(s"Sending ${record} to Accounting")
        ctx.output(accountingTag, record)
      }
      case record:AnalysingMetric => {
logger.info <http://logger.info>(s"Sending ${record} to Analysis")
        ctx.output(analysingTag, record)
      }
      case _ => {
        logger.error(s"Don't know the type of ${value}")
      }
    }
  }
}

And at the end of the pipeline I add the splitter:

    pipeline
      .process(new MetricTypeSplitter(accountTag, analysisTag))

So far, this works and I can see the logs of which tag each metric in being sent being generated. The second part, in which I capture the side output and send the data to sink, doesn't seem to work, though:

    pipeline
      .getSideOutput(accountTag)
      .map { tuple => AccountingSink.toRow(tuple) }.name("Accounting rows")
.writeUsingOutputFormat(accountingSink.output).name(s"${accountingSink}")

And here is the problem: It seems .getSideOutput() is never actually getting the side output because a the logger in AccoutingSink.toRow() is never happening and the data is not showing on our database (toRow() convers the Metric to a Row and accountingSInk.output returns the JDBCOutputFormat).

Any ideas what I need to do for side outputs to be actually captured?

--
*Julio Biason*,Sofware Engineer
*AZION*  | Deliver. Accelerate. Protect.
Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51 <callto:+5551996209291>_99907 0554_


Reply via email to