Hi!

I think what you are seeing is the effect of too mans tasks going to the
same task slot. Have a look here:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/runtime.html#task-slots-and-resources

By default, Flink shares task slots across all distinct pipelines of the
same program, for easier "getting started" scheduling behavior.
For proper deployments (or setups where you just have very , I would make
sure that the program sets different "sharing groups" (via
"..slotSharingGroup()") on the different streams.

Also, rather than defining 100s of different sources, I would consider
defining one source and making it parallel. It works better with Flink's
default scheduling parameters.

Hope that helps.

Stephan




On Mon, Jan 23, 2017 at 5:40 PM, Abhishek R. Singh <
abhis...@tetrationanalytics.com> wrote:

> Actually, I take it back. It is the last union that is causing issues (of
> job being un-submittable). If I don’t conbineAtEnd, I can go higher (at
> least deploy the job), all the way up to 63.
>
> After that it starts failing in too many files open in Rocks DB (which I
> can understand and is at least better than silently not accepting my job).
>
> Caused by: java.lang.RuntimeException: Error while opening RocksDB
> instance.
>         at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.
> initializeForJob(RocksDBStateBackend.java:306)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createStateBackend(StreamTask.java:821)
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.setup(AbstractStreamOperator.java:118)
>         ... 4 more
> Caused by: org.rocksdb.RocksDBException: IO error:
> /var/folders/l1/ncffkbq11_lg6tjk_3cvc_n00000gn/T/flink-
> io-45a78866-a9da-40ca-be51-a894c4fac9be/3815eb68c3777ba4f504e8529db6e1
> 45/StreamSource_39_0/dummy_state/7ff48c49-b6ce-4de8-ba7e-
> 8a240b181ae2/db/MANIFEST-000001: Too many open files
>         at org.rocksdb.RocksDB.open(Native Method)
>         at org.rocksdb.RocksDB.open(RocksDB.java:239)
>         at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.
> initializeForJob(RocksDBStateBackend.java:304)
>         ... 6 more
>
>
> On Jan 23, 2017, at 8:20 AM, Abhishek R. Singh <
> abhis...@tetrationanalytics.com> wrote:
>
> Is there a limit on how many DataStreams can be defined in a streaming
> program?
>
> Looks like flink has problems handling too many data streams? I simplified
> my topology further. For eg, this works (parallelism of 4)
>
> <PastedGraphic-2.png>
>
> However, when I try to go beyond 51 (found empirically by parametrizing
> nParts), it barfs again. Submission fails, it wants me to increase
> akka.client.timeout
>
> Here is the reduced code for repro (union at the end itself is not an
> issue). It is the parallelism of the first for loop:
>
> int nParts = cfg.getInt("dummyPartitions", 4);
>
> boolean combineAtEnd = cfg.getBoolean("dummyCombineAtEnd", true);
>
> // create lots of streams
> List<SingleOutputStreamOperator<String>> streams = new ArrayList<>(nParts);
> for (int i = 0; i < nParts; i++) {
>   streams.add(env
>       .readFile(
>           new TextInputFormat(new Path("/tmp/input")),
>           "/tmp/input",
>           FileProcessingMode.PROCESS_CONTINUOUSLY,
>           1000,
>           FilePathFilter.createDefaultFilter())
>       .setParallelism(1).name("src"));
> }
>
> if (combineAtEnd == true) {
>   DataStream<String> combined = streams.get(0);
>   for (int i = 1; i < nParts; i++) {
>     combined = combined.union(streams.get(i));
>   }
>   combined.print().setParallelism(1);
> } else { // die parallel
>   for (int i = 1; i < nParts; i++) {
>     streams.get(i).print();
>   }
> }
>
>
>
> On Jan 23, 2017, at 6:14 AM, Abhishek R. Singh <
> abhis...@tetrationanalytics.com> wrote:
>
> I even make it 10 minutes:
>
> akka.client.timeout: 600s
>
> But doesn’t feel like it is taking effect. It still comes out at about the
> same time with the same error.
>
> -Abhishek-
>
> On Jan 23, 2017, at 6:04 AM, Abhishek R. Singh <
> abhis...@tetrationanalytics.com> wrote:
>
> yes, I had increased it to 5 minutes. It just sits there and bails out
> again.
>
> On Jan 23, 2017, at 1:47 AM, Jonas <jo...@huntun.de> wrote:
>
> The exception says that
>
> Did you already try that?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/weird-client-
> failure-timeout-tp11201p11204.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com <http://nabble.com/>.
>
>
>
>
>
>

Reply via email to