my code and log is as below.

    val getExecuteEnv: StreamExecutionEnvironment = {
        val env = 
StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
        
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
        env.getCheckpointConfig.setCheckpointTimeout(60000)
        env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
        env
    }

def transform(target: DataStream[(String, String, String, String, Long)]): 
DataStream[WinLossBase] =
        target.keyBy(_._3).flatMap(new StateOperator)

def main(args: Array[String]) {
        val env = getExecuteEnv
        val source: DataStream[String] = 
extractFromKafka(env).name("KafkaSource")
        val json = deserializeToJsonObj(source).name("ConvertToJson")
        val target: DataStream[(String, String, String, String, Long)] = 
preTransform(json)
        val result: DataStream[WinLossBase] = 
transform(target).name("ToKeyedStream”)
…
}

class StateOperator extends RichFlatMapFunction[(String, String, String, 
String, Long), WinLossBase] {
        var playerState: ValueState[util.Map[String, PotPlayer]] = _
        var handState: ValueState[HandHistoryInfo] = _

        override def open(param: Configuration): Unit = {
            val playerValueStateDescriptor = new 
ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
                classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, 
PotPlayer]())
            playerState = getRuntimeContext.getState(playerValueStateDescriptor)
            handState = getRuntimeContext.getState(new 
ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
        }

        override def flatMap(in: (String, String, String, String, Long), out: 
Collector[WinLossBase]): Unit = {
            in._2 match {
                case "GameStartHistory" =>
                    val players = playerState.value()
                    val obj = _convertJsonToRecord(in._4, 
classOf[GameStartHistoryRecord])
                    val record = obj.asInstanceOf[GameStartHistoryRecord]
                    val handHistoryInfo: HandHistoryInfo = 
_setUpHandHistoryInfo(record)
                    if (LOG.isInfoEnabled())
                        LOG.info("hand start {}", if (handHistoryInfo != null) 
handHistoryInfo.handHistoryId else "NULL”)
                       ….
                    playerState.update(players)
                    handState.update(handHistoryInfo)
                case "HoleCardHistory" =>
                    val players = playerState.value()
                    if (players != null) {
                        ...
                         playerState.update(players)
                    } else LOG.warn("there is no player[hole card]. {}", in._4)
                case "PlayerStateHistory" =>
                    val players = playerState.value()
                    if (players != null) {
                        ….
                        playerState.update(players)
                    } else LOG.warn("there is no player[player state]. {}", 
in._4)
                case "CommCardHistory" =>
                    val handHistoryInfo = handState.value()
                    val commCardHistory: CommCardHistory = commCardState.value()
                    if (handHistoryInfo != null) {
                        ...
                        handState.update(handHistoryInfo)
                        commCardState.update(commCardHistory)
                    } else LOG.warn("there is no handhistory info[comm card]. 
{}", in._4)
                case "PlayerActionHistory" =>
                    val handHistoryInfo = handState.value()
                    val players = playerState.value()

                    if (handHistoryInfo != null) {
                        ...
                    } else LOG.warn("there is no handhistory info[player 
action]. {}", in._4)
                case "PotHistory" =>
                    val players = playerState.value()
                    val handHistoryInfo = handState.value()
                    val commCardHistory: CommCardHistory = commCardState.value()
                    if (handHistoryInfo != null && handHistoryInfo.playType == 
PlayType.Cash && players != null && players.size > 1) {
                        ...
                    } else LOG.warn("there is no handhistory info[pot]. {}", 
in._4)
                case "GameEndHistory" =>
                    val players = playerState.value()
                    val handHistoryInfo = handState.value()
                        ...
                    if (LOG.isTraceEnabled()) LOG.trace("end {}", 
record.getHandHistoryId)
                    playerState.clear()
                    handState.clear()
                case _ =>
            }
        }

—— log —— 
2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map 
-> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] 
INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 
5769392597641628595

2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map 
-> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] 
WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory 
info[pot]. 

> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org> wrote:
> 
> What do you mean with lost exactly?
> 
> You call value() and it returns a value (!= null/defaultValue) and you
> call it again and it returns null/defaultValue for the same key with
> no update in between?
> 
> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
> <k.klou...@data-artisans.com> wrote:
>> Hello,
>> 
>> Could you share the code of the job you are running?
>> With only this information I am afraid we cannot help much.
>> 
>> Thanks,
>> Kostas
>> 
>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>>> 
>>> Hi.
>>> I’m using flink 1.0.3 on aws EMR.
>>> sporadically value of ValueState is lost.
>>> what is starting point for solving this problem.
>>> Thank you.
>> 

Reply via email to