[ https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905279#comment-15905279 ]
ASF GitHub Bot commented on FLINK-6023: --------------------------------------- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3510#discussion_r105422108 --- Diff: docs/dev/stream/process_function.md --- @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, <div data-lang="scala" markdown="1"> {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.RichProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.Context +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext +import org.apache.flink.util.Collector // the source data stream -DataStream<Tuple2<String, String>> stream = ...; +val stream: DataStream[Tuple2[String, String]] = ... // apply the process function onto a keyed stream -DataStream<Tuple2<String, Long>> result = stream - .keyBy(0) - .process(new CountWithTimeoutFunction()); +val result: DataStream[Tuple2[String, Long]] = stream + .keyBy(0) + .process(new CountWithTimeoutFunction()) /** - * The data type stored in the state - */ + * The data type stored in the state + */ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** - * The implementation of the ProcessFunction that maintains the count and timeouts - */ -class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] { + * The implementation of the ProcessFunction that maintains the count and timeouts + */ +class TimeoutStateFunction extends RichProcessFunction[(String, Long), (String, Long)] { /** The state that is maintained by this process function */ - lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext() - .getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp])) + lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext + .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp])) override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = { // initialize or retrieve/update the state + val (key, _) = value --- End diff -- Sorry i didn't make myself clear. What IDS complains is the variable name `key` is conflicts with the following lines: ```case CountWithTimestamp(key, count, _) => CountWithTimestamp(key, count + 1, ctx.timestamp) ``` It's not clear whether you want to use the `key` you just defined or the `key` in the match pattern. > Fix Scala snippet into Process Function (Low-level Operations) Doc > ------------------------------------------------------------------ > > Key: FLINK-6023 > URL: https://issues.apache.org/jira/browse/FLINK-6023 > Project: Flink > Issue Type: Bug > Components: Documentation > Reporter: Mauro Cortellazzi > Assignee: Mauro Cortellazzi > Priority: Trivial > Fix For: 1.3.0, 1.2.1 > > > The current `/docs/dev/stream/process_function.md` has some errors in the > Scala snippet -- This message was sent by Atlassian JIRA (v6.3.15#6346)