Hi Mäki,

some additions to Greg's answer: The flatMapWithState shortcut of the Scala
API uses Flink's key-value state while your TestCounters class uses the
Checkpointed interface.
As Greg said, the checkpointed interface operates on an operator level not
per key. The key-value state automatically switches the context to the
current key.
The documentation contains details [1].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html

2016-12-20 16:34 GMT+01:00 Greg Hogan <c...@greghogan.com>:

> Hi Mäki,
>
> This is the expected output. Your RichFlatMapFunction is opened once per
> task and you are sharing counterValue for all keys processed by that task.
>
> Greg
>
>
>
> On Mon, Dec 19, 2016 at 11:38 AM, Mäki Hanna <hanna.m...@comptel.com>
> wrote:
>
>> Hi,
>>
>>
>>
>> I'm trying to calculate stateful counts per key with checkpoints
>> following the example in https://ci.apache.org/projects
>> /flink/flink-docs-release-1.1/apis/streaming/state.html#
>> checkpointing-instance-fields. I would expect my test program to
>> calculate the counts per key, but it seems to group the data by task rather
>> than by key. Is this a Flink bug or have I misunderstood something?
>>
>>
>>
>> The output of inputData.keyBy(0).flatMap(new TestCounters).print is
>>
>>
>>
>> 1> (A,count=1)
>>
>> 1> (F,count=2)
>>
>> 2> (B,count=1)
>>
>> 2> (C,count=2)
>>
>> 2> (D,count=3)
>>
>> 2> (E,count=4)
>>
>> 2> (E,count=5)
>>
>> 2> (E,count=6)
>>
>> 2> (H,count=7)
>>
>> 4> (G,count=1)
>>
>>
>>
>> while the output of inputData.keyBy(0).flatMapWithState(...).print is
>> (as I would expect)
>>
>>
>>
>> 2> (B,1)
>>
>> 4> (G,1)
>>
>> 1> (A,1)
>>
>> 2> (C,1)
>>
>> 1> (F,1)
>>
>> 2> (D,1)
>>
>> 2> (E,1)
>>
>> 2> (E,2)
>>
>> 2> (E,3)
>>
>> 2> (H,1)
>>
>>
>>
>> I would expect both to give the same results.
>>
>>
>>
>> The full code:
>>
>>
>>
>> import org.apache.flink.api.common.functions.RichFlatMapFunction
>>
>> import org.apache.flink.streaming.api.scala._
>>
>> import org.apache.flink.configuration.Configuration
>>
>> import org.apache.flink.streaming.api.checkpoint.Checkpointed
>>
>> import org.apache.flink.util.Collector
>>
>>
>>
>> object FlinkStreamingTest {
>>
>>
>>
>>   def main(args: Array[String]) {
>>
>>
>>
>>     val env = StreamExecutionEnvironment.createLocalEnvironment()
>>
>>
>>
>>     val checkpointIntervalMillis = 10000
>>
>>     env.enableCheckpointing(checkpointIntervalMillis)
>>
>>
>>
>>     val inputData = env.fromElements(("A",0),("B",0),("C",0),("D",0),
>>
>>       ("E",0),("E",0),("E",0),
>>
>>       ("F",0),("G",0),("H",0))
>>
>>
>>
>>     inputData.keyBy(0).flatMap(new TestCounters).print
>>
>>
>>
>>     /*
>>
>>     inputData.keyBy(0).flatMapWithState((keyAndCount: (String, Int),
>> count: Option[Int]) =>
>>
>>       count match {
>>
>>         case None => (Iterator((keyAndCount._1, 1)), Some(1))
>>
>>         case Some(c) => (Iterator((keyAndCount._1, c+1)), Some(c+1))
>>
>>       }).print
>>
>>     */
>>
>>
>>
>>     env.execute("Counters test")
>>
>>   }
>>
>> }
>>
>>
>>
>> case class CounterClass(var count: Int)
>>
>>
>>
>> class TestCounters extends RichFlatMapFunction[(String, Int), (String,
>> String)] with Checkpointed[CounterClass] {
>>
>>
>>
>>   var counterValue: CounterClass = null
>>
>>
>>
>>   override def flatMap(in: (String, Int), out: Collector[(String,
>> String)]) = {
>>
>>     counterValue.count = counterValue.count + 1
>>
>>     out.collect((in._1,"count="+counterValue.count))
>>
>>   }
>>
>>
>>
>>   override def open(config: Configuration): Unit = {
>>
>>     if(counterValue == null) {
>>
>>       counterValue = new CounterClass(0)
>>
>>     }
>>
>>   }
>>
>>
>>
>>   override def snapshotState(l: Long, l1: Long): CounterClass = {
>>
>>     counterValue
>>
>>   }
>>
>>
>>
>>   override def restoreState(state: CounterClass): Unit = {
>>
>>     counterValue = state
>>
>>   }
>>
>> }
>> Disclaimer: This message and any attachments thereto are intended solely
>> for the addressed recipient(s) and may contain confidential information. If
>> you are not the intended recipient, please notify the sender by reply
>> e-mail and delete the e-mail (including any attachments thereto) without
>> producing, distributing or retaining any copies thereof. Any review,
>> dissemination or other use of, or taking of any action in reliance upon,
>> this information by persons or entities other than the intended
>> recipient(s) is prohibited. Thank you.
>>
>
>

Reply via email to