Hi, I am trying to calculate some different metrics using the state backend to control if events have been seen before. I am using the ProcessWindowFunction, but nothing gets through, it is as if the .process-function is ignored. Is it not possible to store a custom case class as ValueState? Or do I need to implement a serializer for the case-class? Or ...
Any help is much appreciated. My code: class MetricsProcessFunction extends ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]() { var pageviewMetricState: ValueState[PageviewBasedMetrics] = _ override def open(parameters: Configuration): Unit = { pageviewMetricState = this.getRuntimeContext.getState(new ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", classOf[PageviewBasedMetrics])) } override def process(key: PageviewBasedMetricsGroup, context: Context, elements: Iterable[Event], out: Collector[PageviewBasedMetrics]): Unit = { if(elements.head.event.getOrElse("") == "page_view"){ val tmp = pwbm.pageviews + 1 val tmpPBM = pwbm.copy(pageviews = tmp, startTime = Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant, endTime = Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant) pageviewMetricState.update(SnowplowPickler.write(tmpPBM)) } out.collect(SnowplowPickler.read(pageviewMetricState.value())) } } object AggregateMultipleMetrics { def main(args: Array[String]) { val env: StreamEnvironment = StreamEnvironment.getStreamEnvironment("AggregateMetrics") val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv val appProps: Properties = env.appProps val inputStream: String = appProps.getProperty("input_topic") val outputTopic1Min: String = appProps.getProperty("output_topic_1_min") val outputSerializer1Min: KafkaSerializationSchemaPageviewBasedMetrics = new KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min) val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new FlinkKafkaKeyPartitioner[PageviewBasedMetrics]() val snowplowEventSource = new SnowplowEventSource().getStream(inputStream, appProps, executionEnv) val target1Min: SinkFunction[PageviewBasedMetrics] = new KafkaSink[PageviewBasedMetrics, KafkaSerializationSchemaPageviewBasedMetrics]().getSinkFunction( outputTopic1Min, outputSerializer1Min, partitioner, appProps) mainDataStream .keyBy[PageviewBasedMetricsGroup]((e: Event) => Util.getPageviewBasedMetricsGroup(e)) .timeWindow(Time.minutes(1)) .process(new MetricsProcessFunction) .addSink(target1Min) // execute program executionEnv.execute("Count pageview-based metrics") } } -- Martin Frank Hansen Data Engineer Digital Service M: +45 25 57 14 18 E: m...@berlingskemedia.dk