Hi!

So far we are not aware of a state loss bug in Flink. My guess is that it
is some subtlety in the program.

The check that logs also has other checks, like "handHistoryInfo.playType
== PlayType.Cash" and "players.size > 1". Is one of them maybe the problem?


To debug this, you can try and do the following:

Rather than using Flink's key/value state, simply use your own java/scala
map in the RichFlatMapFunction.
That is not by default fault-tolerant, but you can use that to see if the
error occurs in the same way or not.

Greetings,
Stephan




On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:

> Hi.
> I checked order of data. but it is alright.
> Is there any other possibilities?
> Thank you.
>
> On Aug 12, 2016, at 7:09 PM, Stephan Ewen <se...@apache.org> wrote:
>
> Hi!
>
> Its not that easy to say at a first glance.
>
> One thing that is important to bear in mind is what ordering guarantees
> Flink gives, and where the ordering guarantees are not given.
> When you use keyBy() or redistribute(), order is preserved per parallel
> source/target pair only.
>
> Have a look here:
> https://ci.apache.org/projects/flink/flink-docs-
> master/concepts/concepts.html#parallel-dataflows
>
>
> Could it be that the events simply arrive in a different order in the
> functions, so that a later event that looks for state comes before an
> earlier event that creates the state?
>
> Greetings,
> Stephan
>
> On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>
>> Nope.
>> I added log in End.
>> but there is same log.
>> is there any fault in my code?
>>
>> thank you.
>>
>>
>> > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org> wrote:
>> >
>> > You're clearing the "handState" on "GameEndHistory". I'm assuming this
>> > event comes in before "CommCardHistory" where you check the state.
>> >
>> > On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com>
>> wrote:
>> >> in my code, is the config of ExecutionEnv alright?
>> >>
>> >>
>> >>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>> >>>
>> >>>
>> >>> my code and log is as below.
>> >>>
>> >>>
>> >>>   val getExecuteEnv: StreamExecutionEnvironment = {
>> >>>       val env = StreamExecutionEnvironment.get
>> ExecutionEnvironment.enableCheckpointing(10000)
>> >>>       env.setStreamTimeCharacteristic(TimeCharacteristic.Ingestio
>> nTime)
>> >>>       env.getCheckpointConfig.setCheckpointingMode(CheckpointingM
>> ode.AT_LEAST_ONCE)
>> >>>       env.getCheckpointConfig.setCheckpointTimeout(60000)
>> >>>       env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>> >>>       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30,
>> 30000))
>> >>>       env
>> >>>   }
>> >>>
>> >>> def transform(target: DataStream[(String, String, String, String,
>> Long)]): DataStream[WinLossBase] =
>> >>>       target.keyBy(_._3).flatMap(new StateOperator)
>> >>>
>> >>> def main(args: Array[String]) {
>> >>>       val env = getExecuteEnv
>> >>>       val source: DataStream[String] = extractFromKafka(env).name("Ka
>> fkaSource")
>> >>>       val json = deserializeToJsonObj(source).name("ConvertToJson")
>> >>>       val target: DataStream[(String, String, String, String, Long)]
>> = preTransform(json)
>> >>>       val result: DataStream[WinLossBase] =
>> transform(target).name("ToKeyedStream”)
>> >>> …
>> >>> }
>> >>>
>> >>> class StateOperator extends RichFlatMapFunction[(String, String,
>> String, String, Long), WinLossBase] {
>> >>>       var playerState: ValueState[util.Map[String, PotPlayer]] = _
>> >>>       var handState: ValueState[HandHistoryInfo] = _
>> >>>
>> >>>       override def open(param: Configuration): Unit = {
>> >>>           val playerValueStateDescriptor = new
>> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>> >>>               classOf[util.Map[String, PotPlayer]],
>> Maps.newHashMap[String, PotPlayer]())
>> >>>           playerState = getRuntimeContext.getState(pla
>> yerValueStateDescriptor)
>> >>>           handState = getRuntimeContext.getState(new
>> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>> >>>       }
>> >>>
>> >>>       override def flatMap(in: (String, String, String, String,
>> Long), out: Collector[WinLossBase]): Unit = {
>> >>>           in._2 match {
>> >>>               case "GameStartHistory" =>
>> >>>                   val players = playerState.value()
>> >>>                   val obj = _convertJsonToRecord(in._4,
>> classOf[GameStartHistoryRecord])
>> >>>                   val record = obj.asInstanceOf[GameStartHist
>> oryRecord]
>> >>>                   val handHistoryInfo: HandHistoryInfo =
>> _setUpHandHistoryInfo(record)
>> >>>                   if (LOG.isInfoEnabled())
>> >>>                       LOG.info <http://log.info>("hand start {}", if
>> (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”)
>> >>>                     ….
>> >>>                   playerState.update(players)
>> >>>                   handState.update(handHistoryInfo)
>> >>>               case "HoleCardHistory" =>
>> >>>                   val players = playerState.value()
>> >>>                   if (players != null) {
>> >>>                      ...
>> >>>                        playerState.update(players)
>> >>>                   } else LOG.warn("there is no player[hole card].
>> {}", in._4)
>> >>>               case "PlayerStateHistory" =>
>> >>>                   val players = playerState.value()
>> >>>                   if (players != null) {
>> >>>                      ….
>> >>>                       playerState.update(players)
>> >>>                   } else LOG.warn("there is no player[player state].
>> {}", in._4)
>> >>>               case "CommCardHistory" =>
>> >>>                   val handHistoryInfo = handState.value()
>> >>>                   val commCardHistory: CommCardHistory =
>> commCardState.value()
>> >>>                   if (handHistoryInfo != null) {
>> >>>                      ...
>> >>>                       handState.update(handHistoryInfo)
>> >>>                       commCardState.update(commCardHistory)
>> >>>                   } else LOG.warn("there is no handhistory info[comm
>> card]. {}", in._4)
>> >>>               case "PlayerActionHistory" =>
>> >>>                   val handHistoryInfo = handState.value()
>> >>>                   val players = playerState.value()
>> >>>
>> >>>                   if (handHistoryInfo != null) {
>> >>>                      ...
>> >>>                   } else LOG.warn("there is no handhistory
>> info[player action]. {}", in._4)
>> >>>               case "PotHistory" =>
>> >>>                   val players = playerState.value()
>> >>>                   val handHistoryInfo = handState.value()
>> >>>                   val commCardHistory: CommCardHistory =
>> commCardState.value()
>> >>>                   if (handHistoryInfo != null &&
>> handHistoryInfo.playType == PlayType.Cash && players != null &&
>> players.size > 1) {
>> >>>                       ...
>> >>>                   } else LOG.warn("there is no handhistory info[pot].
>> {}", in._4)
>> >>>               case "GameEndHistory" =>
>> >>>                   val players = playerState.value()
>> >>>                   val handHistoryInfo = handState.value()
>> >>>                      ...
>> >>>                   if (LOG.isTraceEnabled()) LOG.trace("end {}",
>> record.getHandHistoryId)
>> >>>                   playerState.clear()
>> >>>                   handState.clear()
>> >>>               case _ =>
>> >>>           }
>> >>>       }
>> >>>
>> >>> —— log ——
>> >>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase,
>> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to
>> HBase) (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand
>> start 5769392597641628595
>> >>>
>> >>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase,
>> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to
>> HBase) (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there
>> is no handhistory info[pot].
>> >>>
>> >>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org> wrote:
>> >>>>
>> >>>> What do you mean with lost exactly?
>> >>>>
>> >>>> You call value() and it returns a value (!= null/defaultValue) and
>> you
>> >>>> call it again and it returns null/defaultValue for the same key with
>> >>>> no update in between?
>> >>>>
>> >>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>> >>>> <k.klou...@data-artisans.com> wrote:
>> >>>>> Hello,
>> >>>>>
>> >>>>> Could you share the code of the job you are running?
>> >>>>> With only this information I am afraid we cannot help much.
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Kostas
>> >>>>>
>> >>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com>
>> wrote:
>> >>>>>>
>> >>>>>> Hi.
>> >>>>>> I’m using flink 1.0.3 on aws EMR.
>> >>>>>> sporadically value of ValueState is lost.
>> >>>>>> what is starting point for solving this problem.
>> >>>>>> Thank you.
>> >>>>>
>> >>>
>> >>
>>
>>
>
>

Reply via email to