[ https://issues.apache.org/jira/browse/FLINK-6298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Greg Hogan updated FLINK-6298: ------------------------------ Fix Version/s: 1.2.2 1.3.0 > Local execution is not setting RuntimeContext for RichOutputFormat > ------------------------------------------------------------------ > > Key: FLINK-6298 > URL: https://issues.apache.org/jira/browse/FLINK-6298 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.1.0, 1.2.0 > Reporter: Mateusz Zakarczemny > Assignee: Wenlong Lyu > Fix For: 1.3.0, 1.2.2 > > > RuntimeContext is never set in RichOutputFormat. I tested it in local > execution. RichMapFunction is setup correctly. > Following code will never print "//////Context set in RichOutputFormat" > {code} > import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext} > import org.apache.flink.api.common.io.RichOutputFormat > import org.apache.flink.api.scala._ > import org.apache.flink.configuration.Configuration > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > object Startup { > def main(args: Array[String]): Unit = { > val mapFunction = new RichMapFunction[String, String] { > def open(taskNumber: Int, numTasks: Int) { getRuntimeContext } > def map(event: String) = { event } > override def setRuntimeContext(t: RuntimeContext) = { > println("//////Context set in RichMapFunction") > super.setRuntimeContext(t) > } > } > val outputFormat = new RichOutputFormat[String] { > override def setRuntimeContext(t: RuntimeContext) = { > println("//////Context set in RichOutputFormat") > super.setRuntimeContext(t) > } > def open(taskNumber: Int, numTasks: Int) {} > def writeRecord(event: String) { > println(event) > } > def configure(parameters: Configuration): Unit = {} > def close(): Unit = {} > } > val see = StreamExecutionEnvironment.getExecutionEnvironment > val eventsStream = see.fromElements[String]("A", "B", > "C").map(mapFunction) > eventsStream.writeUsingOutputFormat(outputFormat) > see.execute("test-job") > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)