Hi Dawid, Thanks for your reply, much appreciated.
I tried using your implementation for TypeInformation, but still nothing gets through. There are no errors either, but it simply runs without sending data to the sink. I have checked that there is data in the input topic, and I have used the code to run a similar job (with a simple string type as ValueState). I have added a print-statement to the process function, but nothing gets written to the console which suggests that the method is never called, which it should be by this: mainDataStream .keyBy[PageviewBasedMetricsGroup]((e: Event) => Util.getPageviewBasedMetricsGroup(e)) .timeWindow(Time.minutes(1)) .process(new MetricsProcessFunction) .addSink(target1Min) Could the problem be in the open-method? best regards Den fre. 18. sep. 2020 kl. 12.30 skrev Dawid Wysakowicz < dwysakow...@apache.org>: > Hi Martin, > > I am not sure what is the exact problem. Is it that the ProcessFunction is > not invoked or is the problem with values in your state? > > As for the question of the case class and ValueState. The best way to do > it, is to provide the TypeInformation explicitly. If you do not provide the > TypeInformation, the ValueStateDescriptor will use java type extraction > stack, which can not handle case classes well and if I am not mistaken they > will end up serialized with a generic serializer using Kryo. > > You can create a proper TypeInformation for a scala case class like this: > > import org.apache.flink.streaming.api.scala._ // important to import > for the implicit scala type extraction > > val caseClassTypeInfo = > implicitly[TypeInformation[PageviewBasedMetrics]] > this.pageviewMetricState = this.getRuntimeContext > .getState(new > ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", > caseClassTypeInfo)) > > If the problem is that the function is not being invoked, I'd recommend > checking what TimeCharacteristic you are using (I don't necessarily know > what is going on in the StreamEnvironment.getStreamEnvironment). If you use > the ProcessingTime the results will be emitted only after a minute passes. > (You are using TimeWindow.minutes(1)) > > Hope that helps. > > Best, > > Dawid > On 18/09/2020 10:42, Martin Frank Hansen wrote: > > Hi, > > I am trying to calculate some different metrics using the state backend to > control if events have been seen before. I am using the > ProcessWindowFunction, but nothing gets through, it is as if the > .process-function is ignored. Is it not possible to store a custom case > class as ValueState? Or do I need to implement a serializer for the > case-class? Or ... > > Any help is much appreciated. > > My code: > > class MetricsProcessFunction extends > ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]() > { > > var pageviewMetricState: ValueState[PageviewBasedMetrics] = _ > > override def open(parameters: Configuration): Unit = { > pageviewMetricState = this.getRuntimeContext.getState(new > ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", > classOf[PageviewBasedMetrics])) } > > override def process(key: PageviewBasedMetricsGroup, context: Context, > elements: Iterable[Event], out: Collector[PageviewBasedMetrics]): Unit = { > > if(elements.head.event.getOrElse("") == "page_view"){ > val tmp = pwbm.pageviews + 1 val tmpPBM = pwbm.copy(pageviews = > tmp, > startTime = > Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant, > endTime = > Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant) > > pageviewMetricState.update(SnowplowPickler.write(tmpPBM)) > } out.collect(SnowplowPickler.read(pageviewMetricState.value())) > } > } > > > object AggregateMultipleMetrics { > > > def main(args: Array[String]) { > val env: StreamEnvironment = > StreamEnvironment.getStreamEnvironment("AggregateMetrics") > val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv > val appProps: Properties = env.appProps > > val inputStream: String = appProps.getProperty("input_topic") > val outputTopic1Min: String = appProps.getProperty("output_topic_1_min") > val outputSerializer1Min: KafkaSerializationSchemaPageviewBasedMetrics = > new KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min) > val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new > FlinkKafkaKeyPartitioner[PageviewBasedMetrics]() > > val snowplowEventSource = new > SnowplowEventSource().getStream(inputStream, appProps, executionEnv) > > val target1Min: SinkFunction[PageviewBasedMetrics] = new > KafkaSink[PageviewBasedMetrics, > KafkaSerializationSchemaPageviewBasedMetrics]().getSinkFunction( > outputTopic1Min, > outputSerializer1Min, > partitioner, > appProps) > > mainDataStream .keyBy[PageviewBasedMetricsGroup]((e: Event) => > Util.getPageviewBasedMetricsGroup(e)) > .timeWindow(Time.minutes(1)) .process(new MetricsProcessFunction) > .addSink(target1Min) > > // execute program executionEnv.execute("Count pageview-based metrics") > > } > } > > > > > > -- > > Martin Frank Hansen > > Data Engineer > Digital Service > M: +45 25 57 14 18 > E: m...@berlingskemedia.dk > > > -- Martin Frank Hansen Data Engineer Digital Service M: +45 25 57 14 18 E: m...@berlingskemedia.dk