Another note, the case class in hand has about 40 fields in it, is there a
maximum limit for the number of fields?

best regards

Den fre. 18. sep. 2020 kl. 13.05 skrev Martin Frank Hansen <
m...@berlingskemedia.dk>:

> Hi Dawid,
>
> Thanks for your reply, much appreciated.
>
> I tried using your implementation for TypeInformation, but still nothing
> gets through. There are no errors either, but it simply runs without
> sending data to the sink. I have checked that there is data in the input
> topic, and I have used the code to run a similar job (with a simple string
> type as ValueState).
>
> I have added a print-statement to the process function, but nothing gets
> written to the console which suggests that the method is never called,
> which it should be by this:
>
> mainDataStream
>   .keyBy[PageviewBasedMetricsGroup]((e: Event) => 
> Util.getPageviewBasedMetricsGroup(e))
>   .timeWindow(Time.minutes(1))
>   .process(new MetricsProcessFunction)
>   .addSink(target1Min)
>
>
> Could the problem be in the open-method?
>
> best regards
>
>
> Den fre. 18. sep. 2020 kl. 12.30 skrev Dawid Wysakowicz <
> dwysakow...@apache.org>:
>
>> Hi Martin,
>>
>> I am not sure what is the exact problem. Is it that the ProcessFunction
>> is not invoked or is the problem with values in your state?
>>
>> As for the question of the case class and ValueState. The best way to do
>> it, is to provide the TypeInformation explicitly. If you do not provide the
>> TypeInformation, the ValueStateDescriptor will use java type extraction
>> stack, which can not handle case classes well and if I am not mistaken they
>> will end up serialized with a generic serializer using Kryo.
>>
>> You can create a proper TypeInformation for a scala case class like this:
>>
>>    import org.apache.flink.streaming.api.scala._ // important to import
>> for the implicit scala type extraction
>>
>>     val caseClassTypeInfo =
>> implicitly[TypeInformation[PageviewBasedMetrics]]
>>     this.pageviewMetricState = this.getRuntimeContext
>>       .getState(new
>> ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics",
>> caseClassTypeInfo))
>>
>> If the problem is that the function is not being invoked, I'd recommend
>> checking what TimeCharacteristic you are using (I don't necessarily know
>> what is going on in the StreamEnvironment.getStreamEnvironment). If you use
>> the ProcessingTime the results will be emitted only after a minute passes.
>> (You are using TimeWindow.minutes(1))
>>
>> Hope that helps.
>>
>> Best,
>>
>> Dawid
>> On 18/09/2020 10:42, Martin Frank Hansen wrote:
>>
>> Hi,
>>
>> I am trying to calculate some different metrics using the state backend
>> to control if events have been seen before. I am using the
>> ProcessWindowFunction, but nothing gets through, it is as if the
>> .process-function is ignored. Is it not possible to store a custom case
>> class as ValueState? Or do I need to implement a serializer for the
>> case-class? Or ...
>>
>> Any help is much appreciated.
>>
>> My code:
>>
>> class MetricsProcessFunction extends 
>> ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]()
>>   {
>>
>>   var pageviewMetricState: ValueState[PageviewBasedMetrics] = _
>>
>>   override def open(parameters: Configuration): Unit = {
>>     pageviewMetricState = this.getRuntimeContext.getState(new 
>> ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", 
>> classOf[PageviewBasedMetrics]))  }
>>
>>   override def process(key: PageviewBasedMetricsGroup, context: Context, 
>> elements: Iterable[Event], out: Collector[PageviewBasedMetrics]): Unit = {
>>
>>     if(elements.head.event.getOrElse("") == "page_view"){
>>       val tmp = pwbm.pageviews + 1      val tmpPBM = pwbm.copy(pageviews = 
>> tmp,
>>         startTime = 
>> Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant,
>>         endTime = 
>> Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant)
>>
>>       pageviewMetricState.update(SnowplowPickler.write(tmpPBM))
>>     }    out.collect(SnowplowPickler.read(pageviewMetricState.value()))
>>   }
>> }
>>
>>
>> object AggregateMultipleMetrics  {
>>
>>
>>   def main(args: Array[String]) {
>>     val env: StreamEnvironment = 
>> StreamEnvironment.getStreamEnvironment("AggregateMetrics")
>>     val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv
>>     val appProps: Properties = env.appProps
>>
>>     val inputStream: String = appProps.getProperty("input_topic")
>>     val outputTopic1Min: String = appProps.getProperty("output_topic_1_min")
>>     val outputSerializer1Min: KafkaSerializationSchemaPageviewBasedMetrics = 
>> new KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min)
>>     val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new 
>> FlinkKafkaKeyPartitioner[PageviewBasedMetrics]()
>>
>>     val snowplowEventSource = new 
>> SnowplowEventSource().getStream(inputStream, appProps, executionEnv)
>>
>>     val target1Min: SinkFunction[PageviewBasedMetrics] = new 
>> KafkaSink[PageviewBasedMetrics, 
>> KafkaSerializationSchemaPageviewBasedMetrics]().getSinkFunction(
>>       outputTopic1Min,
>>       outputSerializer1Min,
>>       partitioner,
>>       appProps)
>>
>>     mainDataStream      .keyBy[PageviewBasedMetricsGroup]((e: Event) => 
>> Util.getPageviewBasedMetricsGroup(e))
>>       .timeWindow(Time.minutes(1))      .process(new MetricsProcessFunction)
>>       .addSink(target1Min)
>>
>>     // execute program    executionEnv.execute("Count pageview-based 
>> metrics")
>>
>>   }
>> }
>>
>>
>>
>>
>>
>> --
>>
>> Martin Frank Hansen
>>
>> Data Engineer
>> Digital Service
>> M: +45 25 57 14 18
>> E: m...@berlingskemedia.dk
>>
>>
>>
>
> --
>
> Martin Frank Hansen
>
> Data Engineer
> Digital Service
> M: +45 25 57 14 18
> E: m...@berlingskemedia.dk
>


-- 

Martin Frank Hansen

Data Engineer
Digital Service
M: +45 25 57 14 18
E: m...@berlingskemedia.dk

Reply via email to