Hi,

I am trying to calculate some different metrics using the state backend to
control if events have been seen before. I am using the
ProcessWindowFunction, but nothing gets through, it is as if the
.process-function is ignored. Is it not possible to store a custom case
class as ValueState? Or do I need to implement a serializer for the
case-class? Or ...

Any help is much appreciated.

My code:

class MetricsProcessFunction extends
ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]()
 {

  var pageviewMetricState: ValueState[PageviewBasedMetrics] = _

  override def open(parameters: Configuration): Unit = {
    pageviewMetricState = this.getRuntimeContext.getState(new
ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics",
classOf[PageviewBasedMetrics]))
  }

  override def process(key: PageviewBasedMetricsGroup, context:
Context, elements: Iterable[Event], out:
Collector[PageviewBasedMetrics]): Unit = {

    if(elements.head.event.getOrElse("") == "page_view"){
      val tmp = pwbm.pageviews + 1
      val tmpPBM = pwbm.copy(pageviews = tmp,
        startTime =
Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant,
        endTime =
Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant)

      pageviewMetricState.update(SnowplowPickler.write(tmpPBM))
    }
    out.collect(SnowplowPickler.read(pageviewMetricState.value()))
  }
}


object AggregateMultipleMetrics  {


  def main(args: Array[String]) {
    val env: StreamEnvironment =
StreamEnvironment.getStreamEnvironment("AggregateMetrics")
    val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv
    val appProps: Properties = env.appProps

    val inputStream: String = appProps.getProperty("input_topic")
    val outputTopic1Min: String = appProps.getProperty("output_topic_1_min")
    val outputSerializer1Min:
KafkaSerializationSchemaPageviewBasedMetrics = new
KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min)
    val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new
FlinkKafkaKeyPartitioner[PageviewBasedMetrics]()

    val snowplowEventSource = new
SnowplowEventSource().getStream(inputStream, appProps, executionEnv)

    val target1Min: SinkFunction[PageviewBasedMetrics] = new
KafkaSink[PageviewBasedMetrics,
KafkaSerializationSchemaPageviewBasedMetrics]().getSinkFunction(
      outputTopic1Min,
      outputSerializer1Min,
      partitioner,
      appProps)

    mainDataStream
      .keyBy[PageviewBasedMetricsGroup]((e: Event) =>
Util.getPageviewBasedMetricsGroup(e))
      .timeWindow(Time.minutes(1))
      .process(new MetricsProcessFunction)
      .addSink(target1Min)

    // execute program
    executionEnv.execute("Count pageview-based metrics")

  }
}





-- 

Martin Frank Hansen

Data Engineer
Digital Service
M: +45 25 57 14 18
E: m...@berlingskemedia.dk

Reply via email to