Another note, the case class in hand has about 40 fields in it, is there a maximum limit for the number of fields?
best regards Den fre. 18. sep. 2020 kl. 13.05 skrev Martin Frank Hansen < m...@berlingskemedia.dk>: > Hi Dawid, > > Thanks for your reply, much appreciated. > > I tried using your implementation for TypeInformation, but still nothing > gets through. There are no errors either, but it simply runs without > sending data to the sink. I have checked that there is data in the input > topic, and I have used the code to run a similar job (with a simple string > type as ValueState). > > I have added a print-statement to the process function, but nothing gets > written to the console which suggests that the method is never called, > which it should be by this: > > mainDataStream > .keyBy[PageviewBasedMetricsGroup]((e: Event) => > Util.getPageviewBasedMetricsGroup(e)) > .timeWindow(Time.minutes(1)) > .process(new MetricsProcessFunction) > .addSink(target1Min) > > > Could the problem be in the open-method? > > best regards > > > Den fre. 18. sep. 2020 kl. 12.30 skrev Dawid Wysakowicz < > dwysakow...@apache.org>: > >> Hi Martin, >> >> I am not sure what is the exact problem. Is it that the ProcessFunction >> is not invoked or is the problem with values in your state? >> >> As for the question of the case class and ValueState. The best way to do >> it, is to provide the TypeInformation explicitly. If you do not provide the >> TypeInformation, the ValueStateDescriptor will use java type extraction >> stack, which can not handle case classes well and if I am not mistaken they >> will end up serialized with a generic serializer using Kryo. >> >> You can create a proper TypeInformation for a scala case class like this: >> >> import org.apache.flink.streaming.api.scala._ // important to import >> for the implicit scala type extraction >> >> val caseClassTypeInfo = >> implicitly[TypeInformation[PageviewBasedMetrics]] >> this.pageviewMetricState = this.getRuntimeContext >> .getState(new >> ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", >> caseClassTypeInfo)) >> >> If the problem is that the function is not being invoked, I'd recommend >> checking what TimeCharacteristic you are using (I don't necessarily know >> what is going on in the StreamEnvironment.getStreamEnvironment). If you use >> the ProcessingTime the results will be emitted only after a minute passes. >> (You are using TimeWindow.minutes(1)) >> >> Hope that helps. >> >> Best, >> >> Dawid >> On 18/09/2020 10:42, Martin Frank Hansen wrote: >> >> Hi, >> >> I am trying to calculate some different metrics using the state backend >> to control if events have been seen before. I am using the >> ProcessWindowFunction, but nothing gets through, it is as if the >> .process-function is ignored. Is it not possible to store a custom case >> class as ValueState? Or do I need to implement a serializer for the >> case-class? Or ... >> >> Any help is much appreciated. >> >> My code: >> >> class MetricsProcessFunction extends >> ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]() >> { >> >> var pageviewMetricState: ValueState[PageviewBasedMetrics] = _ >> >> override def open(parameters: Configuration): Unit = { >> pageviewMetricState = this.getRuntimeContext.getState(new >> ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", >> classOf[PageviewBasedMetrics])) } >> >> override def process(key: PageviewBasedMetricsGroup, context: Context, >> elements: Iterable[Event], out: Collector[PageviewBasedMetrics]): Unit = { >> >> if(elements.head.event.getOrElse("") == "page_view"){ >> val tmp = pwbm.pageviews + 1 val tmpPBM = pwbm.copy(pageviews = >> tmp, >> startTime = >> Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant, >> endTime = >> Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant) >> >> pageviewMetricState.update(SnowplowPickler.write(tmpPBM)) >> } out.collect(SnowplowPickler.read(pageviewMetricState.value())) >> } >> } >> >> >> object AggregateMultipleMetrics { >> >> >> def main(args: Array[String]) { >> val env: StreamEnvironment = >> StreamEnvironment.getStreamEnvironment("AggregateMetrics") >> val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv >> val appProps: Properties = env.appProps >> >> val inputStream: String = appProps.getProperty("input_topic") >> val outputTopic1Min: String = appProps.getProperty("output_topic_1_min") >> val outputSerializer1Min: KafkaSerializationSchemaPageviewBasedMetrics = >> new KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min) >> val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new >> FlinkKafkaKeyPartitioner[PageviewBasedMetrics]() >> >> val snowplowEventSource = new >> SnowplowEventSource().getStream(inputStream, appProps, executionEnv) >> >> val target1Min: SinkFunction[PageviewBasedMetrics] = new >> KafkaSink[PageviewBasedMetrics, >> KafkaSerializationSchemaPageviewBasedMetrics]().getSinkFunction( >> outputTopic1Min, >> outputSerializer1Min, >> partitioner, >> appProps) >> >> mainDataStream .keyBy[PageviewBasedMetricsGroup]((e: Event) => >> Util.getPageviewBasedMetricsGroup(e)) >> .timeWindow(Time.minutes(1)) .process(new MetricsProcessFunction) >> .addSink(target1Min) >> >> // execute program executionEnv.execute("Count pageview-based >> metrics") >> >> } >> } >> >> >> >> >> >> -- >> >> Martin Frank Hansen >> >> Data Engineer >> Digital Service >> M: +45 25 57 14 18 >> E: m...@berlingskemedia.dk >> >> >> > > -- > > Martin Frank Hansen > > Data Engineer > Digital Service > M: +45 25 57 14 18 > E: m...@berlingskemedia.dk > -- Martin Frank Hansen Data Engineer Digital Service M: +45 25 57 14 18 E: m...@berlingskemedia.dk