Hello again, 

As a follow-up email, on the same topology, I set break-points inside the 
selectChannel() methods for RebalancePartitioner and ForwardPartitioner and 
they are reached. Unfortunately, the same break-point set on the 
HashPartitioner is not reached. To make sure that an instance of a 
HashPartitioner is created for the keyBy() transformation, I set a breakpoint 
inside the StreamGraphGenerator transform() method and verify that a 
HashPartitioner object is created. My topology is submitted in the following 
way: 

// Phase 0: input setup
DataStream<Tuple3<Long, Integer, Integer>> stream = env.fromCollection(…)
               .assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
                   @Override
                    public long extractAscendingTimestamp(Tuple2<Long, Integer> 
event) { return event.f0; }
                }).map( (Tuple2<Long, Integer> e) -> new Tuple3<Long, Integer, 
Integer>(e.f0, e.f1, 1));

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)
DataStream<Tuple3<Long, Integer, Integer> phaseOne = 
stream.keyBy(1).window(TumblingEventTimeWindows.of(Time.seconds(10)).sum(2).setParallelism(N);

// Phase 2: serial full aggregation and ordering, with a parallelism of 1
DataStream<String> phaseTwo = phaseOne
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))
                .apply(new AllWindowsFunction<Tuple3<Long, Integer, Integer>, 
String, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<Tuple3<Long, 
Integer, Integer>> values, Collector<String> out) throws Exception {
                        ...
                        List<Integer> topTenValues = ...;
                        StringBuilder strBuilder = new StringBuilder();
                        for (Integer t : topTenValues)
                            strBuilder.append(Integer.toString(t) + “,”);
                        out.collect(strBuilder.toString());
                    });

Am I missing something or is this a bug? 

Nikos R. Katsipoulakis, 
Department of Computer Science
University of Pittsburgh

-----Original Message-----
From: Katsipoulakis, Nikolaos Romanos [mailto:kat...@cs.pitt.edu] 
Sent: Wednesday, January 25, 2017 10:33 AM
To: user@flink.apache.org
Subject: RE: When is the StreamPartitioner<T> selectChannel() method called

Hello Ufuk, 

First, thank you very much for your quick reply and for clarifying the 
difference between channels and slots. Turning to debugging and visiting the 
breakpoint inside the HashPartitioner, I need to inform you that I am using 
IntelliJ IDE and I have set up the environment as a maven project with the 
following dependencies: 

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.10</artifactId>
            <version>1.1.4</version>
        </dependency>

Therefore, I assume that my local environment is using Flink 1.1. In addition, 
I set the parallelism to 8 for the operation after the keyBy() transformation 
to have 8 sub-tasks perform concurrently. Unfortunately, a breakpoint inside 
HashPartioner's selectChannel() is not reached. Am I doing something wrong?

Kind Regards,

Nikos R. Katsipoulakis,
Department of Computer Science
University of Pittsburgh

-----Original Message-----
From: Ufuk Celebi [mailto:u...@apache.org]
Sent: Wednesday, January 25, 2017 10:06 AM
To: user@flink.apache.org
Subject: Re: When is the StreamPartitioner<T> selectChannel() method called

Hey Nikos,

slots are only relevant for scheduling tasks. The number of outgoing channels 
depends on the number of parallel subtasks that consume a produced intermediate 
result stream, say the result of a source operator. If you have a job with a 
simple source->keyBy->map flow with parallelism X you will have X outgoing 
channels at the source operator, one for each consuming map subtask. This is 
what is exposed by the underlying channel selector.

For 1.1 the mentioned HashPartitioner should be called as you describe. In 1.2 
this has been replaced by the KeyGroupStreamPartitioner.

That the HashPartitioner method is not called is probably due to the fact that 
you are debugging this remotely. Have you tried it from within your local IDE, 
too?

– Ufuk


On Wed, Jan 25, 2017 at 2:12 PM, Katsipoulakis, Nikolaos Romanos 
<kat...@cs.pitt.edu> wrote:
> Hello all,
>
>
>
> I have been looking into different StreamPartitioner<T> 
> implementations of Flink, and I noticed they come with an 
> implementation of selectChannel(), as defined in the 
> ChannelSelector<T> interface. In order to understand better the 
> actions of a StreamPartitioner during execution, I set up Flink on a 
> single server with one TaskManager that had 16 slots. Then, I 
> submitted a job, with a HashPartitioner (through a keyBy() 
> transformation), and remote debugged it to see when the 
> HashPartitioner’s selectChannel() method is called. Unfortunately, the 
> breakpoint is never reached and the job completes successfully. Is the 
> previous behavior normal? If yes, why is the breakpoint never reached?
> Does it have to do with running the job in an environment with local 
> slots? Also, what determines the number of channels when a job is executed? 
> Does it have to do with the number of available slots in the downstream 
> operation of the partitioner?
>
>
>
> Thank you for your time and I appreciate any answers/comments/indications.
>
>
>
> Kind Regards.
>
>
>
> Nikos R. Katsipoulakis
>
> Department of Computer Science,
>
> University of Pittsburgh
>
>

Reply via email to