Hi Dawid,

Thanks for your reply, much appreciated.

I tried using your implementation for TypeInformation, but still nothing
gets through. There are no errors either, but it simply runs without
sending data to the sink. I have checked that there is data in the input
topic, and I have used the code to run a similar job (with a simple string
type as ValueState).

I have added a print-statement to the process function, but nothing gets
written to the console which suggests that the method is never called,
which it should be by this:

mainDataStream
  .keyBy[PageviewBasedMetricsGroup]((e: Event) =>
Util.getPageviewBasedMetricsGroup(e))
  .timeWindow(Time.minutes(1))
  .process(new MetricsProcessFunction)
  .addSink(target1Min)


Could the problem be in the open-method?

best regards


Den fre. 18. sep. 2020 kl. 12.30 skrev Dawid Wysakowicz <
dwysakow...@apache.org>:

> Hi Martin,
>
> I am not sure what is the exact problem. Is it that the ProcessFunction is
> not invoked or is the problem with values in your state?
>
> As for the question of the case class and ValueState. The best way to do
> it, is to provide the TypeInformation explicitly. If you do not provide the
> TypeInformation, the ValueStateDescriptor will use java type extraction
> stack, which can not handle case classes well and if I am not mistaken they
> will end up serialized with a generic serializer using Kryo.
>
> You can create a proper TypeInformation for a scala case class like this:
>
>    import org.apache.flink.streaming.api.scala._ // important to import
> for the implicit scala type extraction
>
>     val caseClassTypeInfo =
> implicitly[TypeInformation[PageviewBasedMetrics]]
>     this.pageviewMetricState = this.getRuntimeContext
>       .getState(new
> ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics",
> caseClassTypeInfo))
>
> If the problem is that the function is not being invoked, I'd recommend
> checking what TimeCharacteristic you are using (I don't necessarily know
> what is going on in the StreamEnvironment.getStreamEnvironment). If you use
> the ProcessingTime the results will be emitted only after a minute passes.
> (You are using TimeWindow.minutes(1))
>
> Hope that helps.
>
> Best,
>
> Dawid
> On 18/09/2020 10:42, Martin Frank Hansen wrote:
>
> Hi,
>
> I am trying to calculate some different metrics using the state backend to
> control if events have been seen before. I am using the
> ProcessWindowFunction, but nothing gets through, it is as if the
> .process-function is ignored. Is it not possible to store a custom case
> class as ValueState? Or do I need to implement a serializer for the
> case-class? Or ...
>
> Any help is much appreciated.
>
> My code:
>
> class MetricsProcessFunction extends 
> ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]()
>   {
>
>   var pageviewMetricState: ValueState[PageviewBasedMetrics] = _
>
>   override def open(parameters: Configuration): Unit = {
>     pageviewMetricState = this.getRuntimeContext.getState(new 
> ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", 
> classOf[PageviewBasedMetrics]))  }
>
>   override def process(key: PageviewBasedMetricsGroup, context: Context, 
> elements: Iterable[Event], out: Collector[PageviewBasedMetrics]): Unit = {
>
>     if(elements.head.event.getOrElse("") == "page_view"){
>       val tmp = pwbm.pageviews + 1      val tmpPBM = pwbm.copy(pageviews = 
> tmp,
>         startTime = 
> Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant,
>         endTime = 
> Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant)
>
>       pageviewMetricState.update(SnowplowPickler.write(tmpPBM))
>     }    out.collect(SnowplowPickler.read(pageviewMetricState.value()))
>   }
> }
>
>
> object AggregateMultipleMetrics  {
>
>
>   def main(args: Array[String]) {
>     val env: StreamEnvironment = 
> StreamEnvironment.getStreamEnvironment("AggregateMetrics")
>     val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv
>     val appProps: Properties = env.appProps
>
>     val inputStream: String = appProps.getProperty("input_topic")
>     val outputTopic1Min: String = appProps.getProperty("output_topic_1_min")
>     val outputSerializer1Min: KafkaSerializationSchemaPageviewBasedMetrics = 
> new KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min)
>     val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new 
> FlinkKafkaKeyPartitioner[PageviewBasedMetrics]()
>
>     val snowplowEventSource = new 
> SnowplowEventSource().getStream(inputStream, appProps, executionEnv)
>
>     val target1Min: SinkFunction[PageviewBasedMetrics] = new 
> KafkaSink[PageviewBasedMetrics, 
> KafkaSerializationSchemaPageviewBasedMetrics]().getSinkFunction(
>       outputTopic1Min,
>       outputSerializer1Min,
>       partitioner,
>       appProps)
>
>     mainDataStream      .keyBy[PageviewBasedMetricsGroup]((e: Event) => 
> Util.getPageviewBasedMetricsGroup(e))
>       .timeWindow(Time.minutes(1))      .process(new MetricsProcessFunction)
>       .addSink(target1Min)
>
>     // execute program    executionEnv.execute("Count pageview-based metrics")
>
>   }
> }
>
>
>
>
>
> --
>
> Martin Frank Hansen
>
> Data Engineer
> Digital Service
> M: +45 25 57 14 18
> E: m...@berlingskemedia.dk
>
>
>

-- 

Martin Frank Hansen

Data Engineer
Digital Service
M: +45 25 57 14 18
E: m...@berlingskemedia.dk

Reply via email to