Hi. Stephan. do you mean using map on local excution? I’ve tested it but not works at all. Thanks.
> On Aug 15, 2016, at 4:56 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote: > > Hi. > I've tested the program with window function(keyBy->window->collect). it has > no problem. > > my old program. (keyBy-> state processing). can it be processed by multiple > thread within a key? > > Thank you. > >> On Aug 12, 2016, at 8:27 PM, Stephan Ewen <se...@apache.org >> <mailto:se...@apache.org>> wrote: >> >> Hi! >> >> So far we are not aware of a state loss bug in Flink. My guess is that it is >> some subtlety in the program. >> >> The check that logs also has other checks, like "handHistoryInfo.playType == >> PlayType.Cash" and "players.size > 1". Is one of them maybe the problem? >> >> >> To debug this, you can try and do the following: >> >> Rather than using Flink's key/value state, simply use your own java/scala >> map in the RichFlatMapFunction. >> That is not by default fault-tolerant, but you can use that to see if the >> error occurs in the same way or not. >> >> Greetings, >> Stephan >> >> >> >> >> On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <kim.s...@gmail.com >> <mailto:kim.s...@gmail.com>> wrote: >> Hi. >> I checked order of data. but it is alright. >> Is there any other possibilities? >> Thank you. >> >>> On Aug 12, 2016, at 7:09 PM, Stephan Ewen <se...@apache.org >>> <mailto:se...@apache.org>> wrote: >>> >>> Hi! >>> >>> Its not that easy to say at a first glance. >>> >>> One thing that is important to bear in mind is what ordering guarantees >>> Flink gives, and where the ordering guarantees are not given. >>> When you use keyBy() or redistribute(), order is preserved per parallel >>> source/target pair only. >>> >>> Have a look here: >>> https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows >>> >>> <https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows> >>> >>> >>> Could it be that the events simply arrive in a different order in the >>> functions, so that a later event that looks for state comes before an >>> earlier event that creates the state? >>> >>> Greetings, >>> Stephan >>> >>> On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <kim.s...@gmail.com >>> <mailto:kim.s...@gmail.com>> wrote: >>> Nope. >>> I added log in End. >>> but there is same log. >>> is there any fault in my code? >>> >>> thank you. >>> >>> >>> > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org >>> > <mailto:m...@apache.org>> wrote: >>> > >>> > You're clearing the "handState" on "GameEndHistory". I'm assuming this >>> > event comes in before "CommCardHistory" where you check the state. >>> > >>> > On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com >>> > <mailto:kim.s...@gmail.com>> wrote: >>> >> in my code, is the config of ExecutionEnv alright? >>> >> >>> >> >>> >>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com >>> >>> <mailto:kim.s...@gmail.com>> wrote: >>> >>> >>> >>> >>> >>> my code and log is as below. >>> >>> >>> >>> >>> >>> val getExecuteEnv: StreamExecutionEnvironment = { >>> >>> val env = >>> >>> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000) >>> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) >>> >>> >>> >>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) >>> >>> env.getCheckpointConfig.setCheckpointTimeout(60000) >>> >>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>> >>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, >>> >>> 30000)) >>> >>> env >>> >>> } >>> >>> >>> >>> def transform(target: DataStream[(String, String, String, String, >>> >>> Long)]): DataStream[WinLossBase] = >>> >>> target.keyBy(_._3).flatMap(new StateOperator) >>> >>> >>> >>> def main(args: Array[String]) { >>> >>> val env = getExecuteEnv >>> >>> val source: DataStream[String] = >>> >>> extractFromKafka(env).name("KafkaSource") >>> >>> val json = deserializeToJsonObj(source).name("ConvertToJson") >>> >>> val target: DataStream[(String, String, String, String, Long)] = >>> >>> preTransform(json) >>> >>> val result: DataStream[WinLossBase] = >>> >>> transform(target).name("ToKeyedStream”) >>> >>> … >>> >>> } >>> >>> >>> >>> class StateOperator extends RichFlatMapFunction[(String, String, >>> >>> String, String, Long), WinLossBase] { >>> >>> var playerState: ValueState[util.Map[String, PotPlayer]] = _ >>> >>> var handState: ValueState[HandHistoryInfo] = _ >>> >>> >>> >>> override def open(param: Configuration): Unit = { >>> >>> val playerValueStateDescriptor = new >>> >>> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss", >>> >>> classOf[util.Map[String, PotPlayer]], >>> >>> Maps.newHashMap[String, PotPlayer]()) >>> >>> playerState = >>> >>> getRuntimeContext.getState(playerValueStateDescriptor) >>> >>> handState = getRuntimeContext.getState(new >>> >>> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null)) >>> >>> } >>> >>> >>> >>> override def flatMap(in: (String, String, String, String, Long), >>> >>> out: Collector[WinLossBase]): Unit = { >>> >>> in._2 match { >>> >>> case "GameStartHistory" => >>> >>> val players = playerState.value() >>> >>> val obj = _convertJsonToRecord(in._4, >>> >>> classOf[GameStartHistoryRecord]) >>> >>> val record = obj.asInstanceOf[GameStartHistoryRecord] >>> >>> val handHistoryInfo: HandHistoryInfo = >>> >>> _setUpHandHistoryInfo(record) >>> >>> if (LOG.isInfoEnabled()) >>> >>> LOG.info <http://log.info/>("hand start {}", if >>> >>> (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”) >>> >>> …. >>> >>> playerState.update(players) >>> >>> handState.update(handHistoryInfo) >>> >>> case "HoleCardHistory" => >>> >>> val players = playerState.value() >>> >>> if (players != null) { >>> >>> ... >>> >>> playerState.update(players) >>> >>> } else LOG.warn("there is no player[hole card]. {}", >>> >>> in._4) >>> >>> case "PlayerStateHistory" => >>> >>> val players = playerState.value() >>> >>> if (players != null) { >>> >>> …. >>> >>> playerState.update(players) >>> >>> } else LOG.warn("there is no player[player state]. >>> >>> {}", in._4) >>> >>> case "CommCardHistory" => >>> >>> val handHistoryInfo = handState.value() >>> >>> val commCardHistory: CommCardHistory = >>> >>> commCardState.value() >>> >>> if (handHistoryInfo != null) { >>> >>> ... >>> >>> handState.update(handHistoryInfo) >>> >>> commCardState.update(commCardHistory) >>> >>> } else LOG.warn("there is no handhistory info[comm >>> >>> card]. {}", in._4) >>> >>> case "PlayerActionHistory" => >>> >>> val handHistoryInfo = handState.value() >>> >>> val players = playerState.value() >>> >>> >>> >>> if (handHistoryInfo != null) { >>> >>> ... >>> >>> } else LOG.warn("there is no handhistory info[player >>> >>> action]. {}", in._4) >>> >>> case "PotHistory" => >>> >>> val players = playerState.value() >>> >>> val handHistoryInfo = handState.value() >>> >>> val commCardHistory: CommCardHistory = >>> >>> commCardState.value() >>> >>> if (handHistoryInfo != null && >>> >>> handHistoryInfo.playType == PlayType.Cash && players != null && >>> >>> players.size > 1) { >>> >>> ... >>> >>> } else LOG.warn("there is no handhistory info[pot]. >>> >>> {}", in._4) >>> >>> case "GameEndHistory" => >>> >>> val players = playerState.value() >>> >>> val handHistoryInfo = handState.value() >>> >>> ... >>> >>> if (LOG.isTraceEnabled()) LOG.trace("end {}", >>> >>> record.getHandHistoryId) >>> >>> playerState.clear() >>> >>> handState.clear() >>> >>> case _ => >>> >>> } >>> >>> } >>> >>> >>> >>> —— log —— >>> >>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, >>> >>> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent >>> >>> to HBase) (3/4)] INFO com.nsuslab.denma.stream.winloss.flow.Main$ - >>> >>> hand start 5769392597641628595 >>> >>> >>> >>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, >>> >>> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent >>> >>> to HBase) (3/4)] WARN com.nsuslab.denma.stream.winloss.flow.Main$ - >>> >>> there is no handhistory info[pot]. >>> >>> >>> >>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org >>> >>>> <mailto:u...@apache.org>> wrote: >>> >>>> >>> >>>> What do you mean with lost exactly? >>> >>>> >>> >>>> You call value() and it returns a value (!= null/defaultValue) and you >>> >>>> call it again and it returns null/defaultValue for the same key with >>> >>>> no update in between? >>> >>>> >>> >>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas >>> >>>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> >>> >>>> wrote: >>> >>>>> Hello, >>> >>>>> >>> >>>>> Could you share the code of the job you are running? >>> >>>>> With only this information I am afraid we cannot help much. >>> >>>>> >>> >>>>> Thanks, >>> >>>>> Kostas >>> >>>>> >>> >>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com >>> >>>>>> <mailto:kim.s...@gmail.com>> wrote: >>> >>>>>> >>> >>>>>> Hi. >>> >>>>>> I’m using flink 1.0.3 on aws EMR. >>> >>>>>> sporadically value of ValueState is lost. >>> >>>>>> what is starting point for solving this problem. >>> >>>>>> Thank you. >>> >>>>> >>> >>> >>> >> >>> >>> >> >> >