Hi. I checked order of data. but it is alright. Is there any other possibilities? Thank you.
> On Aug 12, 2016, at 7:09 PM, Stephan Ewen <se...@apache.org> wrote: > > Hi! > > Its not that easy to say at a first glance. > > One thing that is important to bear in mind is what ordering guarantees Flink > gives, and where the ordering guarantees are not given. > When you use keyBy() or redistribute(), order is preserved per parallel > source/target pair only. > > Have a look here: > https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows > > <https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows> > > > Could it be that the events simply arrive in a different order in the > functions, so that a later event that looks for state comes before an earlier > event that creates the state? > > Greetings, > Stephan > > On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <kim.s...@gmail.com > <mailto:kim.s...@gmail.com>> wrote: > Nope. > I added log in End. > but there is same log. > is there any fault in my code? > > thank you. > > > > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org > > <mailto:m...@apache.org>> wrote: > > > > You're clearing the "handState" on "GameEndHistory". I'm assuming this > > event comes in before "CommCardHistory" where you check the state. > > > > On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com > > <mailto:kim.s...@gmail.com>> wrote: > >> in my code, is the config of ExecutionEnv alright? > >> > >> > >>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com > >>> <mailto:kim.s...@gmail.com>> wrote: > >>> > >>> > >>> my code and log is as below. > >>> > >>> > >>> val getExecuteEnv: StreamExecutionEnvironment = { > >>> val env = > >>> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000) > >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) > >>> > >>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) > >>> env.getCheckpointConfig.setCheckpointTimeout(60000) > >>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) > >>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, > >>> 30000)) > >>> env > >>> } > >>> > >>> def transform(target: DataStream[(String, String, String, String, > >>> Long)]): DataStream[WinLossBase] = > >>> target.keyBy(_._3).flatMap(new StateOperator) > >>> > >>> def main(args: Array[String]) { > >>> val env = getExecuteEnv > >>> val source: DataStream[String] = > >>> extractFromKafka(env).name("KafkaSource") > >>> val json = deserializeToJsonObj(source).name("ConvertToJson") > >>> val target: DataStream[(String, String, String, String, Long)] = > >>> preTransform(json) > >>> val result: DataStream[WinLossBase] = > >>> transform(target).name("ToKeyedStream”) > >>> … > >>> } > >>> > >>> class StateOperator extends RichFlatMapFunction[(String, String, String, > >>> String, Long), WinLossBase] { > >>> var playerState: ValueState[util.Map[String, PotPlayer]] = _ > >>> var handState: ValueState[HandHistoryInfo] = _ > >>> > >>> override def open(param: Configuration): Unit = { > >>> val playerValueStateDescriptor = new > >>> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss", > >>> classOf[util.Map[String, PotPlayer]], > >>> Maps.newHashMap[String, PotPlayer]()) > >>> playerState = > >>> getRuntimeContext.getState(playerValueStateDescriptor) > >>> handState = getRuntimeContext.getState(new > >>> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null)) > >>> } > >>> > >>> override def flatMap(in: (String, String, String, String, Long), > >>> out: Collector[WinLossBase]): Unit = { > >>> in._2 match { > >>> case "GameStartHistory" => > >>> val players = playerState.value() > >>> val obj = _convertJsonToRecord(in._4, > >>> classOf[GameStartHistoryRecord]) > >>> val record = obj.asInstanceOf[GameStartHistoryRecord] > >>> val handHistoryInfo: HandHistoryInfo = > >>> _setUpHandHistoryInfo(record) > >>> if (LOG.isInfoEnabled()) > >>> LOG.info("hand start {}", if (handHistoryInfo != > >>> null) handHistoryInfo.handHistoryId else "NULL”) > >>> …. > >>> playerState.update(players) > >>> handState.update(handHistoryInfo) > >>> case "HoleCardHistory" => > >>> val players = playerState.value() > >>> if (players != null) { > >>> ... > >>> playerState.update(players) > >>> } else LOG.warn("there is no player[hole card]. {}", > >>> in._4) > >>> case "PlayerStateHistory" => > >>> val players = playerState.value() > >>> if (players != null) { > >>> …. > >>> playerState.update(players) > >>> } else LOG.warn("there is no player[player state]. {}", > >>> in._4) > >>> case "CommCardHistory" => > >>> val handHistoryInfo = handState.value() > >>> val commCardHistory: CommCardHistory = > >>> commCardState.value() > >>> if (handHistoryInfo != null) { > >>> ... > >>> handState.update(handHistoryInfo) > >>> commCardState.update(commCardHistory) > >>> } else LOG.warn("there is no handhistory info[comm > >>> card]. {}", in._4) > >>> case "PlayerActionHistory" => > >>> val handHistoryInfo = handState.value() > >>> val players = playerState.value() > >>> > >>> if (handHistoryInfo != null) { > >>> ... > >>> } else LOG.warn("there is no handhistory info[player > >>> action]. {}", in._4) > >>> case "PotHistory" => > >>> val players = playerState.value() > >>> val handHistoryInfo = handState.value() > >>> val commCardHistory: CommCardHistory = > >>> commCardState.value() > >>> if (handHistoryInfo != null && handHistoryInfo.playType > >>> == PlayType.Cash && players != null && players.size > 1) { > >>> ... > >>> } else LOG.warn("there is no handhistory info[pot]. > >>> {}", in._4) > >>> case "GameEndHistory" => > >>> val players = playerState.value() > >>> val handHistoryInfo = handState.value() > >>> ... > >>> if (LOG.isTraceEnabled()) LOG.trace("end {}", > >>> record.getHandHistoryId) > >>> playerState.clear() > >>> handState.clear() > >>> case _ => > >>> } > >>> } > >>> > >>> —— log —— > >>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, > >>> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to > >>> HBase) (3/4)] INFO com.nsuslab.denma.stream.winloss.flow.Main$ - hand > >>> start 5769392597641628595 > >>> > >>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, > >>> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to > >>> HBase) (3/4)] WARN com.nsuslab.denma.stream.winloss.flow.Main$ - there > >>> is no handhistory info[pot]. > >>> > >>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org > >>>> <mailto:u...@apache.org>> wrote: > >>>> > >>>> What do you mean with lost exactly? > >>>> > >>>> You call value() and it returns a value (!= null/defaultValue) and you > >>>> call it again and it returns null/defaultValue for the same key with > >>>> no update in between? > >>>> > >>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas > >>>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote: > >>>>> Hello, > >>>>> > >>>>> Could you share the code of the job you are running? > >>>>> With only this information I am afraid we cannot help much. > >>>>> > >>>>> Thanks, > >>>>> Kostas > >>>>> > >>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com > >>>>>> <mailto:kim.s...@gmail.com>> wrote: > >>>>>> > >>>>>> Hi. > >>>>>> I’m using flink 1.0.3 on aws EMR. > >>>>>> sporadically value of ValueState is lost. > >>>>>> what is starting point for solving this problem. > >>>>>> Thank you. > >>>>> > >>> > >> > >