Hi Martin,

I am not sure what is the exact problem. Is it that the ProcessFunction
is not invoked or is the problem with values in your state?

As for the question of the case class and ValueState. The best way to do
it, is to provide the TypeInformation explicitly. If you do not provide
the TypeInformation, the ValueStateDescriptor will use java type
extraction stack, which can not handle case classes well and if I am not
mistaken they will end up serialized with a generic serializer using Kryo.

You can create a proper TypeInformation for a scala case class like this:

   import org.apache.flink.streaming.api.scala._ // important to import
for the implicit scala type extraction

    val caseClassTypeInfo =
implicitly[TypeInformation[PageviewBasedMetrics]]
    this.pageviewMetricState = this.getRuntimeContext
      .getState(new
ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics",
caseClassTypeInfo))

If the problem is that the function is not being invoked, I'd recommend
checking what TimeCharacteristic you are using (I don't necessarily know
what is going on in the StreamEnvironment.getStreamEnvironment). If you
use the ProcessingTime the results will be emitted only after a minute
passes. (You are using TimeWindow.minutes(1))

Hope that helps.

Best,

Dawid

On 18/09/2020 10:42, Martin Frank Hansen wrote:
> Hi,
>
> I am trying to calculate some different metrics using the state
> backend to control if events have been seen before. I am using the
> ProcessWindowFunction, but nothing gets through, it is as if the
> .process-function is ignored. Is it not possible to store a custom
> case class as ValueState? Or do I need to implement a serializer for
> the case-class? Or ...
>
> Any help is much appreciated.
>
> My code: 
>
> class MetricsProcessFunction extends 
> ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]()
>   {
>
>   var pageviewMetricState: ValueState[PageviewBasedMetrics] = _
>  override def open(parameters: Configuration): Unit = {
>     pageviewMetricState = this.getRuntimeContext.getState(new 
> ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", 
> classOf[PageviewBasedMetrics]))}
>
>   override def process(key: PageviewBasedMetricsGroup, context: Context, 
> elements: Iterable[Event], out: Collector[PageviewBasedMetrics]): Unit = {
>
>     if(elements.head.event.getOrElse("") == "page_view"){
>       val tmp = pwbm.pageviews + 1val tmpPBM = pwbm.copy(pageviews = tmp,
>         startTime = 
> Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant,
>         endTime = 
> Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant)
>
>       pageviewMetricState.update(SnowplowPickler.write(tmpPBM))
>     }out.collect(SnowplowPickler.read(pageviewMetricState.value()))
>   }
> }
>
> object AggregateMultipleMetrics {
>
>
>   def main(args: Array[String]) {
>     val env: StreamEnvironment = 
> StreamEnvironment.getStreamEnvironment("AggregateMetrics")
>     val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv
>     val appProps: Properties = env.appProps
>
>     val inputStream: String = appProps.getProperty("input_topic")
>     val outputTopic1Min: String = appProps.getProperty("output_topic_1_min")
>     val outputSerializer1Min: KafkaSerializationSchemaPageviewBasedMetrics = 
> new KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min)
>     val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new 
> FlinkKafkaKeyPartitioner[PageviewBasedMetrics]()
>
>     val snowplowEventSource = new 
> SnowplowEventSource().getStream(inputStream, appProps, executionEnv)
>
>     val target1Min: SinkFunction[PageviewBasedMetrics] = new 
> KafkaSink[PageviewBasedMetrics, 
> KafkaSerializationSchemaPageviewBasedMetrics]().getSinkFunction(
>       outputTopic1Min,
>       outputSerializer1Min,
>       partitioner,
>       appProps)
>
>     mainDataStream .keyBy[PageviewBasedMetricsGroup]((e: Event) => 
> Util.getPageviewBasedMetricsGroup(e))
>       .timeWindow(Time.minutes(1)).process(new MetricsProcessFunction)
>       .addSink(target1Min)
>
>     // execute program executionEnv.execute("Count pageview-based metrics")
>
>   }
> }
>
>
>
>
> -- 
>
> Martin Frank Hansen
>
> Data Engineer
> Digital Service
> M: +45 25 57 14 18
> E: m...@berlingskemedia.dk <mailto:m...@berlingskemedia.dk>
>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to