Hi Stephan,

This did not work. For the working case I do see a better utilization of
available slots. However the non working case still doesn't work.

Basically I assigned a unique group to the sources in my for loop - given I
have way more slots than the parallelism I seek.

I know about the parallel source. Doesn't source eat up a slot (like
spark)? Since my data is pre partitioned, I was merely monitoring from
source (keeping it lightweight) and then fanning out to do the actual
reads/work from the next (event driven) operator (after splitting the
stream from source).

This is more like a batch use case. However, I want to use a single
streaming job to do streaming + batch. This batch job emits a application
level marker that gets fanned back in to declare success/completion for the
batch.

Since my data is pre partitioned, my windows don't need to run globally.
Also I don't know how to have a global keyBy (shuffle) and then send a app
marker from source to all the operators. Which is why I keep things hand
partitioned (I can send something from source to each of my partitions and
they get sent to my sink for a count up to indicate completion). I can
control how the markers are sent forward, and my keyBy and windowing
happens with a parallelism of 1 - so I know I can reach the next stage to
keep propagating  my marker. Except that the pattern doesn't scale beyond 8
partitions:(

-Abhishek-

On Mon, Jan 23, 2017 at 10:42 AM Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> I think what you are seeing is the effect of too mans tasks going to the
> same task slot. Have a look here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/runtime.html#task-slots-and-resources
>
> By default, Flink shares task slots across all distinct pipelines of the
> same program, for easier "getting started" scheduling behavior.
> For proper deployments (or setups where you just have very , I would make
> sure that the program sets different "sharing groups" (via
> "..slotSharingGroup()") on the different streams.
>
> Also, rather than defining 100s of different sources, I would consider
> defining one source and making it parallel. It works better with Flink's
> default scheduling parameters.
>
> Hope that helps.
>
> Stephan
>
>
>
>
> On Mon, Jan 23, 2017 at 5:40 PM, Abhishek R. Singh <
> abhis...@tetrationanalytics.com> wrote:
>
> Actually, I take it back. It is the last union that is causing issues (of
> job being un-submittable). If I don’t conbineAtEnd, I can go higher (at
> least deploy the job), all the way up to 63.
>
> After that it starts failing in too many files open in Rocks DB (which I
> can understand and is at least better than silently not accepting my job).
>
> Caused by: java.lang.RuntimeException: Error while opening RocksDB
> instance.
>         at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:306)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:821)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:118)
>         ... 4 more
> Caused by: org.rocksdb.RocksDBException: IO error:
> /var/folders/l1/ncffkbq11_lg6tjk_3cvc_n00000gn/T/flink-io-45a78866-a9da-40ca-be51-a894c4fac9be/3815eb68c3777ba4f504e8529db6e145/StreamSource_39_0/dummy_state/7ff48c49-b6ce-4de8-ba7e-8a240b181ae2/db/MANIFEST-000001:
> Too many open files
>         at org.rocksdb.RocksDB.open(Native Method)
>         at org.rocksdb.RocksDB.open(RocksDB.java:239)
>         at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:304)
>         ... 6 more
>
>
> On Jan 23, 2017, at 8:20 AM, Abhishek R. Singh <
> abhis...@tetrationanalytics.com> wrote:
>
> Is there a limit on how many DataStreams can be defined in a streaming
> program?
>
> Looks like flink has problems handling too many data streams? I simplified
> my topology further. For eg, this works (parallelism of 4)
>
> <PastedGraphic-2.png>
>
> However, when I try to go beyond 51 (found empirically by parametrizing
> nParts), it barfs again. Submission fails, it wants me to increase
> akka.client.timeout
>
> Here is the reduced code for repro (union at the end itself is not an
> issue). It is the parallelism of the first for loop:
>
> int nParts = cfg.getInt("dummyPartitions", 4);
>
> boolean combineAtEnd = cfg.getBoolean("dummyCombineAtEnd", true);
>
> // create lots of streams
> List<SingleOutputStreamOperator<String>> streams = new ArrayList<>(nParts);
> for (int i = 0; i < nParts; i++) {
>   streams.add(env
>       .readFile(
>           new TextInputFormat(new Path("/tmp/input")),
>           "/tmp/input",
>           FileProcessingMode.PROCESS_CONTINUOUSLY,
>           1000,
>           FilePathFilter.createDefaultFilter())
>       .setParallelism(1).name("src"));
> }
>
> if (combineAtEnd == true) {
>   DataStream<String> combined = streams.get(0);
>   for (int i = 1; i < nParts; i++) {
>     combined = combined.union(streams.get(i));
>   }
>   combined.print().setParallelism(1);
> } else { // die parallel
>   for (int i = 1; i < nParts; i++) {
>     streams.get(i).print();
>   }
> }
>
>
>
> On Jan 23, 2017, at 6:14 AM, Abhishek R. Singh <
> abhis...@tetrationanalytics.com> wrote:
>
> I even make it 10 minutes:
>
> akka.client.timeout: 600s
>
> But doesn’t feel like it is taking effect. It still comes out at about the
> same time with the same error.
>
> -Abhishek-
>
> On Jan 23, 2017, at 6:04 AM, Abhishek R. Singh <
> abhis...@tetrationanalytics.com> wrote:
>
> yes, I had increased it to 5 minutes. It just sits there and bails out
> again.
>
> On Jan 23, 2017, at 1:47 AM, Jonas <jo...@huntun.de> wrote:
>
> The exception says that
>
> Did you already try that?
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com <http://nabble.com/>.
>
>
>
>
>
>
>
>
>

Reply via email to