[ 
https://issues.apache.org/jira/browse/FLINK-6298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15967340#comment-15967340
 ] 

ASF GitHub Bot commented on FLINK-6298:
---------------------------------------

GitHub user wenlong88 opened a pull request:

    https://github.com/apache/flink/pull/3716

    [FLINK-6298]Local execution is not setting RuntimeContext for 
RichOutputFormat

    call set RuntimeContext OutputFormat when the OutputFormat is 
RichOutputFormat

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wenlong88/flink jira-6298

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3716.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3716
    
----
commit 050d3262b2882cb6ca1a6b6a23ad7d49f76b1e94
Author: wenlong.lwl <wenlong....@alibaba-inc.com>
Date:   2017-04-13T09:23:15Z

    Set runtimeContext to output format when it is a RichOutputFormat

----


> Local execution is not setting RuntimeContext for RichOutputFormat
> ------------------------------------------------------------------
>
>                 Key: FLINK-6298
>                 URL: https://issues.apache.org/jira/browse/FLINK-6298
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.1.0, 1.2.0
>            Reporter: Mateusz Zakarczemny
>            Assignee: Wenlong Lyu
>
> RuntimeContext is never set in RichOutputFormat. I tested it in local 
> execution. RichMapFunction is setup correctly. 
> Following code will never print "//////Context set in RichOutputFormat"
> {code}
> import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
> import org.apache.flink.api.common.io.RichOutputFormat
> import org.apache.flink.api.scala._
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> object Startup {
>   def main(args: Array[String]): Unit = {
>     val mapFunction = new RichMapFunction[String, String] {
>       def open(taskNumber: Int, numTasks: Int) { getRuntimeContext }
>       def map(event: String) = { event }
>       override def setRuntimeContext(t: RuntimeContext) = {
>         println("//////Context set in RichMapFunction")
>         super.setRuntimeContext(t)
>       }
>     }
>     val outputFormat = new RichOutputFormat[String] {
>       override def setRuntimeContext(t: RuntimeContext) = {
>         println("//////Context set in RichOutputFormat")
>         super.setRuntimeContext(t)
>       }
>       def open(taskNumber: Int, numTasks: Int) {}
>       def writeRecord(event: String) {
>         println(event)
>       }
>       def configure(parameters: Configuration): Unit = {}
>       def close(): Unit = {}
>     }
>     val see = StreamExecutionEnvironment.getExecutionEnvironment
>     val eventsStream = see.fromElements[String]("A", "B", 
> "C").map(mapFunction)
>     eventsStream.writeUsingOutputFormat(outputFormat)
>     see.execute("test-job")
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to