[ 
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905267#comment-15905267
 ] 

ASF GitHub Bot commented on FLINK-6023:
---------------------------------------

Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3510#discussion_r105420103
  
    --- Diff: docs/dev/stream/process_function.md ---
    @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction<Tuple2<String,
     
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -import org.apache.flink.api.common.state.ValueState;
    -import org.apache.flink.api.common.state.ValueStateDescriptor;
    -import org.apache.flink.streaming.api.functions.ProcessFunction;
    -import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
    -import 
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
    -import org.apache.flink.util.Collector;
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.streaming.api.functions.ProcessFunction.Context
    +import 
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
    +import org.apache.flink.util.Collector
     
     // the source data stream
    -DataStream<Tuple2<String, String>> stream = ...;
    +val stream: DataStream[Tuple2[String, String]] = ...
     
     // apply the process function onto a keyed stream
    -DataStream<Tuple2<String, Long>> result = stream
    -    .keyBy(0)
    -    .process(new CountWithTimeoutFunction());
    +val result: DataStream[Tuple2[String, Long]] = stream
    +  .keyBy(0)
    +  .process(new CountWithTimeoutFunction())
     
     /**
    - * The data type stored in the state
    - */
    +  * The data type stored in the state
    +  */
     case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
     
     /**
    - * The implementation of the ProcessFunction that maintains the count and 
timeouts
    - */
    -class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
    +  * The implementation of the ProcessFunction that maintains the count and 
timeouts
    +  */
    +class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
     
       /** The state that is maintained by this process function */
    -  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
    -      .getState(new ValueStateDescriptor<>("myState", 
clasOf[CountWithTimestamp]))
    +  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
    +    .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", 
classOf[CountWithTimestamp]))
     
     
       override def processElement(value: (String, Long), ctx: Context, out: 
Collector[(String, Long)]): Unit = {
    --- End diff --
    
    I think it should be  value: (String, String)


> Fix Scala snippet into Process Function (Low-level Operations) Doc
> ------------------------------------------------------------------
>
>                 Key: FLINK-6023
>                 URL: https://issues.apache.org/jira/browse/FLINK-6023
>             Project: Flink
>          Issue Type: Bug
>          Components: Documentation
>            Reporter: Mauro Cortellazzi
>            Assignee: Mauro Cortellazzi
>            Priority: Trivial
>             Fix For: 1.3.0, 1.2.1
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the 
> Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to