Hi! Concerning your latest questions
- There should not be multiple threads accessing the same state. - With "using a regular Java Map" I mean keeping everything as it is, except instead of using "ValueState" in the RichFlatMapFunction, you use a java.util.HashMap - If the program works within windows, it could be that events arrive out of order (are you using Event Time here?) Greetings, Stephan On Mon, Aug 15, 2016 at 9:56 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote: > Hi. > I've tested the program with window function(keyBy->window->collect). it > has no problem. > > my old program. (keyBy-> state processing). can it be processed by > multiple thread within a key? > > Thank you. > > On Aug 12, 2016, at 8:27 PM, Stephan Ewen <se...@apache.org> wrote: > > Hi! > > So far we are not aware of a state loss bug in Flink. My guess is that it > is some subtlety in the program. > > The check that logs also has other checks, like "handHistoryInfo.playType > == PlayType.Cash" and "players.size > 1". Is one of them maybe the problem? > > > To debug this, you can try and do the following: > > Rather than using Flink's key/value state, simply use your own java/scala > map in the RichFlatMapFunction. > That is not by default fault-tolerant, but you can use that to see if the > error occurs in the same way or not. > > Greetings, > Stephan > > > > > On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote: > >> Hi. >> I checked order of data. but it is alright. >> Is there any other possibilities? >> Thank you. >> >> On Aug 12, 2016, at 7:09 PM, Stephan Ewen <se...@apache.org> wrote: >> >> Hi! >> >> Its not that easy to say at a first glance. >> >> One thing that is important to bear in mind is what ordering guarantees >> Flink gives, and where the ordering guarantees are not given. >> When you use keyBy() or redistribute(), order is preserved per parallel >> source/target pair only. >> >> Have a look here: >> https://ci.apache.org/projects/flink/flink-docs-master/ >> concepts/concepts.html#parallel-dataflows >> >> >> Could it be that the events simply arrive in a different order in the >> functions, so that a later event that looks for state comes before an >> earlier event that creates the state? >> >> Greetings, >> Stephan >> >> On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <kim.s...@gmail.com> >> wrote: >> >>> Nope. >>> I added log in End. >>> but there is same log. >>> is there any fault in my code? >>> >>> thank you. >>> >>> >>> > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org> >>> wrote: >>> > >>> > You're clearing the "handState" on "GameEndHistory". I'm assuming this >>> > event comes in before "CommCardHistory" where you check the state. >>> > >>> > On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com> >>> wrote: >>> >> in my code, is the config of ExecutionEnv alright? >>> >> >>> >> >>> >>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com> >>> wrote: >>> >>> >>> >>> >>> >>> my code and log is as below. >>> >>> >>> >>> >>> >>> val getExecuteEnv: StreamExecutionEnvironment = { >>> >>> val env = StreamExecutionEnvironment.get >>> ExecutionEnvironment.enableCheckpointing(10000) >>> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.Ingestio >>> nTime) >>> >>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingM >>> ode.AT_LEAST_ONCE) >>> >>> env.getCheckpointConfig.setCheckpointTimeout(60000) >>> >>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>> >>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, >>> 30000)) >>> >>> env >>> >>> } >>> >>> >>> >>> def transform(target: DataStream[(String, String, String, String, >>> Long)]): DataStream[WinLossBase] = >>> >>> target.keyBy(_._3).flatMap(new StateOperator) >>> >>> >>> >>> def main(args: Array[String]) { >>> >>> val env = getExecuteEnv >>> >>> val source: DataStream[String] = extractFromKafka(env).name("Ka >>> fkaSource") >>> >>> val json = deserializeToJsonObj(source).name("ConvertToJson") >>> >>> val target: DataStream[(String, String, String, String, Long)] >>> = preTransform(json) >>> >>> val result: DataStream[WinLossBase] = >>> transform(target).name("ToKeyedStream”) >>> >>> … >>> >>> } >>> >>> >>> >>> class StateOperator extends RichFlatMapFunction[(String, String, >>> String, String, Long), WinLossBase] { >>> >>> var playerState: ValueState[util.Map[String, PotPlayer]] = _ >>> >>> var handState: ValueState[HandHistoryInfo] = _ >>> >>> >>> >>> override def open(param: Configuration): Unit = { >>> >>> val playerValueStateDescriptor = new >>> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss", >>> >>> classOf[util.Map[String, PotPlayer]], >>> Maps.newHashMap[String, PotPlayer]()) >>> >>> playerState = getRuntimeContext.getState(pla >>> yerValueStateDescriptor) >>> >>> handState = getRuntimeContext.getState(new >>> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null)) >>> >>> } >>> >>> >>> >>> override def flatMap(in: (String, String, String, String, >>> Long), out: Collector[WinLossBase]): Unit = { >>> >>> in._2 match { >>> >>> case "GameStartHistory" => >>> >>> val players = playerState.value() >>> >>> val obj = _convertJsonToRecord(in._4, >>> classOf[GameStartHistoryRecord]) >>> >>> val record = obj.asInstanceOf[GameStartHist >>> oryRecord] >>> >>> val handHistoryInfo: HandHistoryInfo = >>> _setUpHandHistoryInfo(record) >>> >>> if (LOG.isInfoEnabled()) >>> >>> LOG.info <http://log.info/>("hand start {}", >>> if (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”) >>> >>> …. >>> >>> playerState.update(players) >>> >>> handState.update(handHistoryInfo) >>> >>> case "HoleCardHistory" => >>> >>> val players = playerState.value() >>> >>> if (players != null) { >>> >>> ... >>> >>> playerState.update(players) >>> >>> } else LOG.warn("there is no player[hole card]. >>> {}", in._4) >>> >>> case "PlayerStateHistory" => >>> >>> val players = playerState.value() >>> >>> if (players != null) { >>> >>> …. >>> >>> playerState.update(players) >>> >>> } else LOG.warn("there is no player[player state]. >>> {}", in._4) >>> >>> case "CommCardHistory" => >>> >>> val handHistoryInfo = handState.value() >>> >>> val commCardHistory: CommCardHistory = >>> commCardState.value() >>> >>> if (handHistoryInfo != null) { >>> >>> ... >>> >>> handState.update(handHistoryInfo) >>> >>> commCardState.update(commCardHistory) >>> >>> } else LOG.warn("there is no handhistory info[comm >>> card]. {}", in._4) >>> >>> case "PlayerActionHistory" => >>> >>> val handHistoryInfo = handState.value() >>> >>> val players = playerState.value() >>> >>> >>> >>> if (handHistoryInfo != null) { >>> >>> ... >>> >>> } else LOG.warn("there is no handhistory >>> info[player action]. {}", in._4) >>> >>> case "PotHistory" => >>> >>> val players = playerState.value() >>> >>> val handHistoryInfo = handState.value() >>> >>> val commCardHistory: CommCardHistory = >>> commCardState.value() >>> >>> if (handHistoryInfo != null && >>> handHistoryInfo.playType == PlayType.Cash && players != null && >>> players.size > 1) { >>> >>> ... >>> >>> } else LOG.warn("there is no handhistory >>> info[pot]. {}", in._4) >>> >>> case "GameEndHistory" => >>> >>> val players = playerState.value() >>> >>> val handHistoryInfo = handState.value() >>> >>> ... >>> >>> if (LOG.isTraceEnabled()) LOG.trace("end {}", >>> record.getHandHistoryId) >>> >>> playerState.clear() >>> >>> handState.clear() >>> >>> case _ => >>> >>> } >>> >>> } >>> >>> >>> >>> —— log —— >>> >>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to >>> HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for >>> opponent to HBase) (3/4)] INFO com.nsuslab.denma.stream.winloss.flow.Main$ >>> - hand start 5769392597641628595 >>> >>> >>> >>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to >>> HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for >>> opponent to HBase) (3/4)] WARN com.nsuslab.denma.stream.winloss.flow.Main$ >>> - there is no handhistory info[pot]. >>> >>> >>> >>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org> wrote: >>> >>>> >>> >>>> What do you mean with lost exactly? >>> >>>> >>> >>>> You call value() and it returns a value (!= null/defaultValue) and >>> you >>> >>>> call it again and it returns null/defaultValue for the same key with >>> >>>> no update in between? >>> >>>> >>> >>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas >>> >>>> <k.klou...@data-artisans.com> wrote: >>> >>>>> Hello, >>> >>>>> >>> >>>>> Could you share the code of the job you are running? >>> >>>>> With only this information I am afraid we cannot help much. >>> >>>>> >>> >>>>> Thanks, >>> >>>>> Kostas >>> >>>>> >>> >>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com> >>> wrote: >>> >>>>>> >>> >>>>>> Hi. >>> >>>>>> I’m using flink 1.0.3 on aws EMR. >>> >>>>>> sporadically value of ValueState is lost. >>> >>>>>> what is starting point for solving this problem. >>> >>>>>> Thank you. >>> >>>>> >>> >>> >>> >> >>> >>> >> >> > >