You're clearing the "handState" on "GameEndHistory". I'm assuming this
event comes in before "CommCardHistory" where you check the state.

On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
> in my code, is the config of ExecutionEnv alright?
>
>
>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>>
>>
>> my code and log is as below.
>>
>>
>>    val getExecuteEnv: StreamExecutionEnvironment = {
>>        val env = 
>> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>>        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>        
>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>        env.getCheckpointConfig.setCheckpointTimeout(60000)
>>        env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>>        env
>>    }
>>
>> def transform(target: DataStream[(String, String, String, String, Long)]): 
>> DataStream[WinLossBase] =
>>        target.keyBy(_._3).flatMap(new StateOperator)
>>
>> def main(args: Array[String]) {
>>        val env = getExecuteEnv
>>        val source: DataStream[String] = 
>> extractFromKafka(env).name("KafkaSource")
>>        val json = deserializeToJsonObj(source).name("ConvertToJson")
>>        val target: DataStream[(String, String, String, String, Long)] = 
>> preTransform(json)
>>        val result: DataStream[WinLossBase] = 
>> transform(target).name("ToKeyedStream”)
>> …
>> }
>>
>> class StateOperator extends RichFlatMapFunction[(String, String, String, 
>> String, Long), WinLossBase] {
>>        var playerState: ValueState[util.Map[String, PotPlayer]] = _
>>        var handState: ValueState[HandHistoryInfo] = _
>>
>>        override def open(param: Configuration): Unit = {
>>            val playerValueStateDescriptor = new 
>> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>>                classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, 
>> PotPlayer]())
>>            playerState = 
>> getRuntimeContext.getState(playerValueStateDescriptor)
>>            handState = getRuntimeContext.getState(new 
>> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>>        }
>>
>>        override def flatMap(in: (String, String, String, String, Long), out: 
>> Collector[WinLossBase]): Unit = {
>>            in._2 match {
>>                case "GameStartHistory" =>
>>                    val players = playerState.value()
>>                    val obj = _convertJsonToRecord(in._4, 
>> classOf[GameStartHistoryRecord])
>>                    val record = obj.asInstanceOf[GameStartHistoryRecord]
>>                    val handHistoryInfo: HandHistoryInfo = 
>> _setUpHandHistoryInfo(record)
>>                    if (LOG.isInfoEnabled())
>>                        LOG.info("hand start {}", if (handHistoryInfo != 
>> null) handHistoryInfo.handHistoryId else "NULL”)
>>                      ….
>>                    playerState.update(players)
>>                    handState.update(handHistoryInfo)
>>                case "HoleCardHistory" =>
>>                    val players = playerState.value()
>>                    if (players != null) {
>>                       ...
>>                         playerState.update(players)
>>                    } else LOG.warn("there is no player[hole card]. {}", 
>> in._4)
>>                case "PlayerStateHistory" =>
>>                    val players = playerState.value()
>>                    if (players != null) {
>>                       ….
>>                        playerState.update(players)
>>                    } else LOG.warn("there is no player[player state]. {}", 
>> in._4)
>>                case "CommCardHistory" =>
>>                    val handHistoryInfo = handState.value()
>>                    val commCardHistory: CommCardHistory = 
>> commCardState.value()
>>                    if (handHistoryInfo != null) {
>>                       ...
>>                        handState.update(handHistoryInfo)
>>                        commCardState.update(commCardHistory)
>>                    } else LOG.warn("there is no handhistory info[comm card]. 
>> {}", in._4)
>>                case "PlayerActionHistory" =>
>>                    val handHistoryInfo = handState.value()
>>                    val players = playerState.value()
>>
>>                    if (handHistoryInfo != null) {
>>                       ...
>>                    } else LOG.warn("there is no handhistory info[player 
>> action]. {}", in._4)
>>                case "PotHistory" =>
>>                    val players = playerState.value()
>>                    val handHistoryInfo = handState.value()
>>                    val commCardHistory: CommCardHistory = 
>> commCardState.value()
>>                    if (handHistoryInfo != null && handHistoryInfo.playType 
>> == PlayType.Cash && players != null && players.size > 1) {
>>                        ...
>>                    } else LOG.warn("there is no handhistory info[pot]. {}", 
>> in._4)
>>                case "GameEndHistory" =>
>>                    val players = playerState.value()
>>                    val handHistoryInfo = handState.value()
>>                       ...
>>                    if (LOG.isTraceEnabled()) LOG.trace("end {}", 
>> record.getHandHistoryId)
>>                    playerState.clear()
>>                    handState.clear()
>>                case _ =>
>>            }
>>        }
>>
>> —— log ——
>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, 
>> Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) 
>> (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 
>> 5769392597641628595
>>
>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, 
>> Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) 
>> (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no 
>> handhistory info[pot].
>>
>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org> wrote:
>>>
>>> What do you mean with lost exactly?
>>>
>>> You call value() and it returns a value (!= null/defaultValue) and you
>>> call it again and it returns null/defaultValue for the same key with
>>> no update in between?
>>>
>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>>> <k.klou...@data-artisans.com> wrote:
>>>> Hello,
>>>>
>>>> Could you share the code of the job you are running?
>>>> With only this information I am afraid we cannot help much.
>>>>
>>>> Thanks,
>>>> Kostas
>>>>
>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>>>>>
>>>>> Hi.
>>>>> I’m using flink 1.0.3 on aws EMR.
>>>>> sporadically value of ValueState is lost.
>>>>> what is starting point for solving this problem.
>>>>> Thank you.
>>>>
>>
>

Reply via email to