Nope. I added log in End. but there is same log. is there any fault in my code?
thank you. > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org> wrote: > > You're clearing the "handState" on "GameEndHistory". I'm assuming this > event comes in before "CommCardHistory" where you check the state. > > On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote: >> in my code, is the config of ExecutionEnv alright? >> >> >>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote: >>> >>> >>> my code and log is as below. >>> >>> >>> val getExecuteEnv: StreamExecutionEnvironment = { >>> val env = >>> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000) >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) >>> >>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) >>> env.getCheckpointConfig.setCheckpointTimeout(60000) >>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000)) >>> env >>> } >>> >>> def transform(target: DataStream[(String, String, String, String, Long)]): >>> DataStream[WinLossBase] = >>> target.keyBy(_._3).flatMap(new StateOperator) >>> >>> def main(args: Array[String]) { >>> val env = getExecuteEnv >>> val source: DataStream[String] = >>> extractFromKafka(env).name("KafkaSource") >>> val json = deserializeToJsonObj(source).name("ConvertToJson") >>> val target: DataStream[(String, String, String, String, Long)] = >>> preTransform(json) >>> val result: DataStream[WinLossBase] = >>> transform(target).name("ToKeyedStream”) >>> … >>> } >>> >>> class StateOperator extends RichFlatMapFunction[(String, String, String, >>> String, Long), WinLossBase] { >>> var playerState: ValueState[util.Map[String, PotPlayer]] = _ >>> var handState: ValueState[HandHistoryInfo] = _ >>> >>> override def open(param: Configuration): Unit = { >>> val playerValueStateDescriptor = new >>> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss", >>> classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, >>> PotPlayer]()) >>> playerState = >>> getRuntimeContext.getState(playerValueStateDescriptor) >>> handState = getRuntimeContext.getState(new >>> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null)) >>> } >>> >>> override def flatMap(in: (String, String, String, String, Long), out: >>> Collector[WinLossBase]): Unit = { >>> in._2 match { >>> case "GameStartHistory" => >>> val players = playerState.value() >>> val obj = _convertJsonToRecord(in._4, >>> classOf[GameStartHistoryRecord]) >>> val record = obj.asInstanceOf[GameStartHistoryRecord] >>> val handHistoryInfo: HandHistoryInfo = >>> _setUpHandHistoryInfo(record) >>> if (LOG.isInfoEnabled()) >>> LOG.info("hand start {}", if (handHistoryInfo != >>> null) handHistoryInfo.handHistoryId else "NULL”) >>> …. >>> playerState.update(players) >>> handState.update(handHistoryInfo) >>> case "HoleCardHistory" => >>> val players = playerState.value() >>> if (players != null) { >>> ... >>> playerState.update(players) >>> } else LOG.warn("there is no player[hole card]. {}", >>> in._4) >>> case "PlayerStateHistory" => >>> val players = playerState.value() >>> if (players != null) { >>> …. >>> playerState.update(players) >>> } else LOG.warn("there is no player[player state]. {}", >>> in._4) >>> case "CommCardHistory" => >>> val handHistoryInfo = handState.value() >>> val commCardHistory: CommCardHistory = >>> commCardState.value() >>> if (handHistoryInfo != null) { >>> ... >>> handState.update(handHistoryInfo) >>> commCardState.update(commCardHistory) >>> } else LOG.warn("there is no handhistory info[comm card]. >>> {}", in._4) >>> case "PlayerActionHistory" => >>> val handHistoryInfo = handState.value() >>> val players = playerState.value() >>> >>> if (handHistoryInfo != null) { >>> ... >>> } else LOG.warn("there is no handhistory info[player >>> action]. {}", in._4) >>> case "PotHistory" => >>> val players = playerState.value() >>> val handHistoryInfo = handState.value() >>> val commCardHistory: CommCardHistory = >>> commCardState.value() >>> if (handHistoryInfo != null && handHistoryInfo.playType >>> == PlayType.Cash && players != null && players.size > 1) { >>> ... >>> } else LOG.warn("there is no handhistory info[pot]. {}", >>> in._4) >>> case "GameEndHistory" => >>> val players = playerState.value() >>> val handHistoryInfo = handState.value() >>> ... >>> if (LOG.isTraceEnabled()) LOG.trace("end {}", >>> record.getHandHistoryId) >>> playerState.clear() >>> handState.clear() >>> case _ => >>> } >>> } >>> >>> —— log —— >>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, >>> Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) >>> (3/4)] INFO com.nsuslab.denma.stream.winloss.flow.Main$ - hand start >>> 5769392597641628595 >>> >>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, >>> Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) >>> (3/4)] WARN com.nsuslab.denma.stream.winloss.flow.Main$ - there is no >>> handhistory info[pot]. >>> >>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org> wrote: >>>> >>>> What do you mean with lost exactly? >>>> >>>> You call value() and it returns a value (!= null/defaultValue) and you >>>> call it again and it returns null/defaultValue for the same key with >>>> no update in between? >>>> >>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas >>>> <k.klou...@data-artisans.com> wrote: >>>>> Hello, >>>>> >>>>> Could you share the code of the job you are running? >>>>> With only this information I am afraid we cannot help much. >>>>> >>>>> Thanks, >>>>> Kostas >>>>> >>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote: >>>>>> >>>>>> Hi. >>>>>> I’m using flink 1.0.3 on aws EMR. >>>>>> sporadically value of ValueState is lost. >>>>>> what is starting point for solving this problem. >>>>>> Thank you. >>>>> >>> >>