Hi, the directed output via the split and select methods are indeed only available in the DataStream API. Thus, in order to achieve the same with the DataSet API, you would have to apply multiple filters, as you've already written.
The result of the select call will only be sent to the same task manager, if all subtask of the downstream operator are executed on this TaskManager. Be aware that subtasks of different operators will share the same slot if you haven't assigned a different slot sharing group to the operator. So for example, if you have 2 TaskManagers with 2 slots each then you have 4 slots in total. If you have now two select operators with a parallelism of 2 each, then these two operators could be executed on the same TaskManager. For more information about the slot sharing model, see the following link: https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#workers-slots-resources Cheers, Till On Tue, Jun 7, 2016 at 7:15 PM, CPC <acha...@gmail.com> wrote: > Sorry i think i misunderstand the issue. But it seams DataStream partition > the data by some field and when i select that field only one taskmanager > processing the data. I can achieve same result when i use filter.Below is > the code piece: > > > import org.apache.flink.api.java.io.TextInputFormat > import org.apache.flink.core.fs.Path > import org.apache.flink.streaming.api.scala._ > > case class WikiData(prevID: Option[Int], curID: Int, num: Int, prevTitle: > String, curTitle: String, ttype: String) > > object StreamingSelect { > > > def main(args: Array[String]): Unit = { > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > //val rootPath = "gs://cpcflink/wikistream/" > val stream: DataStream[String] = env.createInput(new > TextInputFormat(new > Path("/home/capacman/Data/wiki/2015_01_en_clickstream.tsv"))) > > val wikiStream = stream.map { > line => > val values = line.split("\t") > WikiData( > if (values(0).isEmpty) None else > Some(Integer.parseInt(values(0))), > Integer.parseInt(values(1)), > Integer.parseInt(values(2)), > values(3), > values(4), > if (values.length < 6) null else values(5) > ) > } > > val split = wikiStream > .split(i => if (i.curID == 14533) List("14533") else List.empty) > val stream14533 = split.select("14533").map(i => (i.curID, i.num)) > > stream14533.writeAsCsv("/home/capacman/Data/wiki/14533") > > env.execute() > } > } > > On 7 June 2016 at 19:26, CPC <acha...@gmail.com> wrote: > > > Hello everyone, > > > > When i use DataStream split/select,it always send all selected records to > > same taskmanager. Is there any reason for this behaviour? Also is it > > possible to implement same split/select behaviour for DataSet api(without > > using a different filter for every output )? I found this > > https://issues.apache.org/jira/browse/FLINK-87 issue but it is still > > open... > > > > Thanks... > > >