Hi Stephan, This did not work. For the working case I do see a better utilization of available slots. However the non working case still doesn't work.
Basically I assigned a unique group to the sources in my for loop - given I have way more slots than the parallelism I seek. I know about the parallel source. Doesn't source eat up a slot (like spark)? Since my data is pre partitioned, I was merely monitoring from source (keeping it lightweight) and then fanning out to do the actual reads/work from the next (event driven) operator (after splitting the stream from source). This is more like a batch use case. However, I want to use a single streaming job to do streaming + batch. This batch job emits a application level marker that gets fanned back in to declare success/completion for the batch. Since my data is pre partitioned, my windows don't need to run globally. Also I don't know how to have a global keyBy (shuffle) and then send a app marker from source to all the operators. Which is why I keep things hand partitioned (I can send something from source to each of my partitions and they get sent to my sink for a count up to indicate completion). I can control how the markers are sent forward, and my keyBy and windowing happens with a parallelism of 1 - so I know I can reach the next stage to keep propagating my marker. Except that the pattern doesn't scale beyond 8 partitions:( -Abhishek- On Mon, Jan 23, 2017 at 10:42 AM Stephan Ewen <se...@apache.org> wrote: > Hi! > > I think what you are seeing is the effect of too mans tasks going to the > same task slot. Have a look here: > https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/runtime.html#task-slots-and-resources > > By default, Flink shares task slots across all distinct pipelines of the > same program, for easier "getting started" scheduling behavior. > For proper deployments (or setups where you just have very , I would make > sure that the program sets different "sharing groups" (via > "..slotSharingGroup()") on the different streams. > > Also, rather than defining 100s of different sources, I would consider > defining one source and making it parallel. It works better with Flink's > default scheduling parameters. > > Hope that helps. > > Stephan > > > > > On Mon, Jan 23, 2017 at 5:40 PM, Abhishek R. Singh < > abhis...@tetrationanalytics.com> wrote: > > Actually, I take it back. It is the last union that is causing issues (of > job being un-submittable). If I don’t conbineAtEnd, I can go higher (at > least deploy the job), all the way up to 63. > > After that it starts failing in too many files open in Rocks DB (which I > can understand and is at least better than silently not accepting my job). > > Caused by: java.lang.RuntimeException: Error while opening RocksDB > instance. > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:306) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:821) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:118) > ... 4 more > Caused by: org.rocksdb.RocksDBException: IO error: > /var/folders/l1/ncffkbq11_lg6tjk_3cvc_n00000gn/T/flink-io-45a78866-a9da-40ca-be51-a894c4fac9be/3815eb68c3777ba4f504e8529db6e145/StreamSource_39_0/dummy_state/7ff48c49-b6ce-4de8-ba7e-8a240b181ae2/db/MANIFEST-000001: > Too many open files > at org.rocksdb.RocksDB.open(Native Method) > at org.rocksdb.RocksDB.open(RocksDB.java:239) > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:304) > ... 6 more > > > On Jan 23, 2017, at 8:20 AM, Abhishek R. Singh < > abhis...@tetrationanalytics.com> wrote: > > Is there a limit on how many DataStreams can be defined in a streaming > program? > > Looks like flink has problems handling too many data streams? I simplified > my topology further. For eg, this works (parallelism of 4) > > <PastedGraphic-2.png> > > However, when I try to go beyond 51 (found empirically by parametrizing > nParts), it barfs again. Submission fails, it wants me to increase > akka.client.timeout > > Here is the reduced code for repro (union at the end itself is not an > issue). It is the parallelism of the first for loop: > > int nParts = cfg.getInt("dummyPartitions", 4); > > boolean combineAtEnd = cfg.getBoolean("dummyCombineAtEnd", true); > > // create lots of streams > List<SingleOutputStreamOperator<String>> streams = new ArrayList<>(nParts); > for (int i = 0; i < nParts; i++) { > streams.add(env > .readFile( > new TextInputFormat(new Path("/tmp/input")), > "/tmp/input", > FileProcessingMode.PROCESS_CONTINUOUSLY, > 1000, > FilePathFilter.createDefaultFilter()) > .setParallelism(1).name("src")); > } > > if (combineAtEnd == true) { > DataStream<String> combined = streams.get(0); > for (int i = 1; i < nParts; i++) { > combined = combined.union(streams.get(i)); > } > combined.print().setParallelism(1); > } else { // die parallel > for (int i = 1; i < nParts; i++) { > streams.get(i).print(); > } > } > > > > On Jan 23, 2017, at 6:14 AM, Abhishek R. Singh < > abhis...@tetrationanalytics.com> wrote: > > I even make it 10 minutes: > > akka.client.timeout: 600s > > But doesn’t feel like it is taking effect. It still comes out at about the > same time with the same error. > > -Abhishek- > > On Jan 23, 2017, at 6:04 AM, Abhishek R. Singh < > abhis...@tetrationanalytics.com> wrote: > > yes, I had increased it to 5 minutes. It just sits there and bails out > again. > > On Jan 23, 2017, at 1:47 AM, Jonas <jo...@huntun.de> wrote: > > The exception says that > > Did you already try that? > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com <http://nabble.com/>. > > > > > > > > >