Hi,

the directed output via the split and select methods are indeed only
available in the DataStream API. Thus, in order to achieve the same with
the DataSet API, you would have to apply multiple filters, as you've
already written.

The result of the select call will only be sent to the same task manager,
if all subtask of the downstream operator are executed on this TaskManager.
Be aware that subtasks of different operators will share the same slot if
you haven't assigned a different slot sharing group to the operator. So for
example, if you have 2 TaskManagers with 2 slots each then you have 4 slots
in total. If you have now two select operators with a parallelism of 2
each, then these two operators could be executed on the same TaskManager.
For more information about the slot sharing model, see the following link:
https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#workers-slots-resources

Cheers,
Till

On Tue, Jun 7, 2016 at 7:15 PM, CPC <acha...@gmail.com> wrote:

> Sorry i think i misunderstand the issue. But it seams DataStream partition
> the data by some field and when i select that  field only one taskmanager
> processing the data. I can achieve same result when i use filter.Below is
> the code piece:
>
>
> import org.apache.flink.api.java.io.TextInputFormat
> import org.apache.flink.core.fs.Path
> import org.apache.flink.streaming.api.scala._
>
> case class WikiData(prevID: Option[Int], curID: Int, num: Int, prevTitle:
> String, curTitle: String, ttype: String)
>
> object StreamingSelect {
>
>
>   def main(args: Array[String]): Unit = {
>     val env: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
>     //val rootPath = "gs://cpcflink/wikistream/"
>     val stream: DataStream[String] = env.createInput(new
> TextInputFormat(new
> Path("/home/capacman/Data/wiki/2015_01_en_clickstream.tsv")))
>
>     val wikiStream = stream.map {
>       line =>
>         val values = line.split("\t")
>         WikiData(
>           if (values(0).isEmpty) None else
> Some(Integer.parseInt(values(0))),
>           Integer.parseInt(values(1)),
>           Integer.parseInt(values(2)),
>           values(3),
>           values(4),
>           if (values.length < 6) null else values(5)
>         )
>     }
>
>     val split = wikiStream
>       .split(i => if (i.curID == 14533) List("14533") else List.empty)
>     val stream14533 = split.select("14533").map(i => (i.curID, i.num))
>
>     stream14533.writeAsCsv("/home/capacman/Data/wiki/14533")
>
>     env.execute()
>   }
> }
>
> On 7 June 2016 at 19:26, CPC <acha...@gmail.com> wrote:
>
> > Hello everyone,
> >
> > When i use DataStream split/select,it always send all selected records to
> > same taskmanager. Is there any reason for this behaviour? Also is it
> > possible to implement same split/select behaviour for DataSet api(without
> >  using a different filter for every output )? I found this
> > https://issues.apache.org/jira/browse/FLINK-87 issue but it is still
> > open...
> >
> > Thanks...
> >
>

Reply via email to