Hi Stephan, I’m using DataStream.writeAsText(String path, WriteMode writemode) for my sink. The data is written to disk and there’s plenty of space available.
I looked deeper into the logs and found out that the jobs on 174 and 175 are not actually stuck, but they’re moving extremely slowly, This is an excerpt from the task manager log on 175: 03:44:43,307 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 254 to read a 1000 lines 03:44:43,315 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 254 to read a 1000 lines 03:46:09,360 INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.200.173/192.168.200.173:2181. Will not attempt to au thenticate using SASL (unknown error) 03:46:09,361 INFO org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 86223ms for sessionid 0x25181a544860091, closing socket connection and attempting reconnect 03:46:09,362 WARN org.apache.flink.shaded.org.apache.curator.ConnectionState - Connection attempt unsuccessful after 86221 (greater than max timeout of 60000). Resetting conne ction and trying again with a new connection. 03:46:09,391 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 86222 to read a 1000 lines 03:46:09,394 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 86243 to read a 1000 lines 03:46:09,439 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 86224 to read a 1000 lines 03:46:09,445 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 86217 to read a 1000 lines 03:46:09,462 INFO org.apache.zookeeper.ZooKeeper - Session: 0x25181a544860091 closed 03:46:09,462 INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.200.173:2181,192.168.200.174:2181,192.168.20 0.175:2181 sessionTimeout=60000 watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@550a1967 03:46:09,462 INFO org.apache.zookeeper.ClientCnxn - EventThread shut down 03:46:09,463 INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.200.174/192.168.200.174:2181. Will not attempt to au thenticate using SASL (unknown error) 03:46:09,463 ERROR org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl - Background operation retry gave up org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.zookeeper.KeeperException.create(KeeperException.java:99) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl.checkBackgroundRetry(CuratorFrameworkImpl.java:708) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl.performBackgroundOperation(CuratorFrameworkImpl.java:826) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl.access$300(CuratorFrameworkImpl.java:62) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl$4.call(CuratorFrameworkImpl.java:257) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1 142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 617) at java.lang.Thread.run(Thread.java:745) 03:46:09,464 ERROR org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl - Background retry gave up org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl.performBackgroundOperation(CuratorFrameworkImpl.java:809) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl.access$300(CuratorFrameworkImpl.java:62) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl$4.call(CuratorFrameworkImpl.java:257) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1 142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 617) at java.lang.Thread.run(Thread.java:745) 03:46:09,464 INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.200.174/192.168.200.174:2181, initiating session 03:46:09,468 INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.200.174/192.168.200.174:2181, sessionid = 0x25181a544860094, negotiated timeout = 40000 03:46:09,469 INFO org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateM anager - State change: RECONNECTED 03:46:09,475 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 86212 to read a 1000 lines 03:46:09,523 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 86217 to read a 1000 lines You’ll notice that at some point it takes 254 milliseconds to process a 1000 lines of input, and then it jumps 86 seconds!! And I also see some zookeeper exceptions that lead me to believe that it’s a networking problem. I have 4 VMs running on 4 different hosts, and connected via a 10G NIC. Thanks, Ali On 2015-12-11, 11:23 AM, "Stephan Ewen" <se...@apache.org> wrote: >Hi Ali! > >I see, so the tasks 192.168.200.174 and 192.168.200.175 apparently do not >make progress, even do not recognize the end-of-stream point. > >I expect that the streams on 192.168.200.174 and 192.168.200.175 are >back-pressured to a stand-still. Since no network is involved, the reason >for the back pressure are probably the sinks. > >What kind of data sink are you using (in the addSink()) function? >Can you check if that one starts to fully block on machines >192.168.200.174 and 192.168.200.175 ? > >Greetings, >Stephan > > > >On Fri, Dec 11, 2015 at 4:50 PM, Kashmar, Ali <ali.kash...@emc.com> wrote: > >> Hi Stephan, >> >> I got a request to share the image with someone and I assume it was you. >> You should be able to see it now. This seems to be the main issue I have >> at this time. I've tried running the job on the cluster with a >>parallelism >> of 16, 24, 36, and even went up to 48. I see all the parallel pipelines >> working for a bit and then some of them just stop, I’m not sure if >>they’re >> stuck or not. Here’s another screenshot: >> http://postimg.org/image/gr6ogxqjj/ >> >> Two things you’ll notice: >> 1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing >> anything at one point and only 192.168.200.173 is doing all the work. >> 2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end >>time >> even though the job should be finished (the screenshot was taken after >>the >> source was closed). >> >> I’m not sure if this helps or not, but here are some properties from the >> flink-conf.yaml: >> >> jobmanager.heap.mb: 8192 >> taskmanager.heap.mb: 49152 >> taskmanager.numberOfTaskSlots: 16 >> parallelism.default: 1 >> >> state.backend: filesystem >> state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints >> >> taskmanager.network.numberOfBuffers: 3072 >> >> recovery.mode: zookeeper >> recovery.zookeeper.quorum: >> 192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181 >> recovery.zookeeper.storageDir: file:///tmp/zk-recovery >> recovery.zookeeper.path.root: /opt/flink-0.10.0 >> >> I appreciate all the help. >> >> >> Thanks, >> Ali >> >> >> On 2015-12-10, 10:16 AM, "Stephan Ewen" <se...@apache.org> wrote: >> >> >Hi Ali! >> > >> >Seems like the Google Doc has restricted access, I tells me I have no >> >permission to view it... >> > >> >Stephan >> > >> > >> >On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <ali.kash...@emc.com> >>wrote: >> > >> >> Hi Stephan, >> >> >> >> Here’s a link to the screenshot I tried to attach earlier: >> >> >> >> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28 >> >> >> >> It looks to me like the distribution is fairly skewed across the >>nodes, >> >> even though they’re executing the same pipeline. >> >> >> >> Thanks, >> >> Ali >> >> >> >> >> >> On 2015-12-09, 12:36 PM, "Stephan Ewen" <se...@apache.org> wrote: >> >> >> >> >Hi! >> >> > >> >> >The parallel socket source looks good. >> >> >I think you forgot to attach the screenshot, or the mailing list >> >>dropped >> >> >the attachment... >> >> > >> >> >Not sure if I can diagnose that without more details. The sources >>all >> >>do >> >> >the same. Assuming that the server distributes the data evenly >>across >> >>all >> >> >connected sockets, and that the network bandwidth ends up being >> >>divided in >> >> >a fair way, all pipelines should run be similarly "eager". >> >> > >> >> >Greetings, >> >> >Stephan >> >> > >> >> > >> >> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <ali.kash...@emc.com> >> >>wrote: >> >> > >> >> >> Hi Stephan, >> >> >> >> >> >> That was my original understanding, until I realized that I was >>not >> >> >>using >> >> >> a parallel socket source. I had a custom source that extended >> >> >> SourceFunction which always runs with parallelism = 1. I looked >> >>through >> >> >> the API and found the ParallelSourceFunction interface so I >> >>implemented >> >> >> that and voila, now all 3 nodes in the cluster are actually >>receiving >> >> >> traffic on socket connections. >> >> >> >> >> >> Now that I’m running it successfully end to end, I’m trying to >> >>improve >> >> >>the >> >> >> performance. Can you take a look at the attached screen shot and >> >>tell me >> >> >> if the distribution of work amongst the pipelines is normal? I >>feel >> >>like >> >> >> some pipelines are lot lazier than others, even though the cluster >> >>nodes >> >> >> are exactly the same. >> >> >> >> >> >> By the way, here’s the class I wrote. It would be useful to have >>this >> >> >> available in Flink distro: >> >> >> >> >> >> public class ParallelSocketSource implements >> >> >> ParallelSourceFunction<String> { >> >> >> >> >> >> private static final long serialVersionUID = >> >> >>-271094428915640892L; >> >> >> private static final Logger LOG = >> >> >> LoggerFactory.getLogger(ParallelSocketSource.class); >> >> >> >> >> >> private volatile boolean running = true; >> >> >> private String host; >> >> >> private int port; >> >> >> >> >> >> public ParallelSocketSource(String host, int port) { >> >> >> this.host = host; >> >> >> this.port = port; >> >> >> } >> >> >> >> >> >> @Override >> >> >> public void run(SourceContext<String> ctx) throws >>Exception { >> >> >> try (Socket socket = new Socket(host, port); >> >> >> BufferedReader reader = new >> >>BufferedReader(new >> >> >> InputStreamReader(socket.getInputStream()))) { >> >> >> String line = null; >> >> >> while(running && ((line = >>reader.readLine()) >> >>!= >> >> >> null)) { >> >> >> ctx.collect(line); >> >> >> } >> >> >> } catch(IOException ex) { >> >> >> LOG.error("error reading from socket", >>ex); >> >> >> } >> >> >> } >> >> >> >> >> >> @Override >> >> >> public void cancel() { >> >> >> running = false; >> >> >> } >> >> >> } >> >> >> >> >> >> Regards, >> >> >> Ali >> >> >> >> >> >> >> >> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <se...@apache.org> wrote: >> >> >> >> >> >> >Hi Ali! >> >> >> > >> >> >> >In the case you have, the sequence of source-map-filter ... >>forms a >> >> >> >pipeline. >> >> >> > >> >> >> >You mentioned that you set the parallelism to 16, so there should >> >>be 16 >> >> >> >pipelines. These pipelines should be completely independent. >> >> >> > >> >> >> >Looking at the way the scheduler is implemented, independent >> >>pipelines >> >> >> >should be spread across machines. But when you execute that in >> >> >>parallel, >> >> >> >you say all 16 pipelines end up on the same machine? >> >> >> > >> >> >> >Can you share with us the rough code of your program? Or a >> >>Screenshot >> >> >>from >> >> >> >the runtime dashboard that shows the program graph? >> >> >> > >> >> >> > >> >> >> >If your cluster is basically for that one job only, you could try >> >>and >> >> >>set >> >> >> >the number of slots to 4 for each machine. Then you have 16 >>slots in >> >> >>total >> >> >> >and each node would run one of the 16 pipelines. >> >> >> > >> >> >> > >> >> >> >Greetings, >> >> >> >Stephan >> >> >> > >> >> >> > >> >> >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali >><ali.kash...@emc.com> >> >> >>wrote: >> >> >> > >> >> >> >> There is no shuffle operation in my flow. Mine actually looks >>like >> >> >>this: >> >> >> >> >> >> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> >>Map -> >> >> >>Map >> >> >> >>-> >> >> >> >> Map, Filter) >> >> >> >> >> >> >> >> >> >> >> >> Maybe it’s treating this whole flow as one pipeline and >>assigning >> >>it >> >> >>to >> >> >> >>a >> >> >> >> slot. What I really wanted was to have the custom source I >>built >> >>to >> >> >>have >> >> >> >> running instances on all nodes. I’m not really sure if that’s >>the >> >> >>right >> >> >> >> approach, but if we could add this as a feature that’d be >>great, >> >> >>since >> >> >> >> having more than one node running the same pipeline guarantees >>the >> >> >> >> pipeline is never offline. >> >> >> >> >> >> >> >> -Ali >> >> >> >> >> >> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <trohrm...@apache.org> >> >> wrote: >> >> >> >> >> >> >> >> >If I'm not mistaken, then the scheduler has already a >>preference >> >>to >> >> >> >>spread >> >> >> >> >independent pipelines out across the cluster. At least he >>uses a >> >> >>queue >> >> >> >>of >> >> >> >> >instances from which it pops the first element if it >>allocates a >> >>new >> >> >> >>slot. >> >> >> >> >This instance is then appended to the queue again, if it has >>some >> >> >> >> >resources >> >> >> >> >(slots) left. >> >> >> >> > >> >> >> >> >I would assume that you have a shuffle operation involved in >>your >> >> >>job >> >> >> >>such >> >> >> >> >that it makes sense for the scheduler to deploy all pipelines >>to >> >>the >> >> >> >>same >> >> >> >> >machine. >> >> >> >> > >> >> >> >> >Cheers, >> >> >> >> >Till >> >> >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <se...@apache.org> >>wrote: >> >> >> >> > >> >> >> >> >> Slots are like "resource groups" which execute entire >> >>pipelines. >> >> >>They >> >> >> >> >> frequently have more than one operator. >> >> >> >> >> >> >> >> >> >> What you can try as a workaround is decrease the number of >> >>slots >> >> >>per >> >> >> >> >> machine to cause the operators to be spread across more >> >>machines. >> >> >> >> >> >> >> >> >> >> If this is a crucial issue for your use case, it should be >> >>simple >> >> >>to >> >> >> >> >>add a >> >> >> >> >> "preference to spread out" to the scheduler... >> >> >> >> >> >> >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali >> >><ali.kash...@emc.com >> >> > >> >> >> >> >>wrote: >> >> >> >> >> >> >> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e. >> >>Make >> >> >> >>sure >> >> >> >> >>the >> >> >> >> >> > parallel instances of the task are distributed across the >> >> >>cluster. >> >> >> >> >>When I >> >> >> >> >> > run my flink job with a parallelism of 16, all the >>parallel >> >> >>tasks >> >> >> >>are >> >> >> >> >> > assigned to the first task manager. >> >> >> >> >> > >> >> >> >> >> > - Ali >> >> >> >> >> > >> >> >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <u...@apache.org> >> wrote: >> >> >> >> >> > >> >> >> >> >> > > >> >> >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali >> >><ali.kash...@emc.com> >> >> >> >> wrote: >> >> >> >> >> > >> Do the parallel instances of each task get distributed >> >>across >> >> >> >>the >> >> >> >> >> > >>cluster or is it possible that they all run on the same >> >>node? >> >> >> >> >> > > >> >> >> >> >> > >Yes, slots are requested from all nodes of the cluster. >>But >> >> >>keep >> >> >> >>in >> >> >> >> >>mind >> >> >> >> >> > >that multiple tasks (forming a local pipeline) can be >> >> >>scheduled to >> >> >> >> >>the >> >> >> >> >> > >same slot (1 slot can hold many tasks). >> >> >> >> >> > > >> >> >> >> >> > >Have you seen this? >> >> >> >> >> > > >> >> >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo >> >> >> >> >>b >> >> >> >> >> > >_scheduling.html >> >> >> >> >> > > >> >> >> >> >> > >> If they can all run on the same node, what happens when >> >>that >> >> >> >>node >> >> >> >> >> > >>crashes? Does the job manager recreate them using the >> >> >>remaining >> >> >> >>open >> >> >> >> >> > >>slots? >> >> >> >> >> > > >> >> >> >> >> > >What happens: The job manager tries to restart the >>program >> >>with >> >> >> >>the >> >> >> >> >>same >> >> >> >> >> > >parallelism. Thus if you have enough free slots >>available in >> >> >>your >> >> >> >> >> > >cluster, this works smoothly (so yes, the >> >>remaining/available >> >> >> >>slots >> >> >> >> >>are >> >> >> >> >> > >used) >> >> >> >> >> > > >> >> >> >> >> > >With a YARN cluster the task manager containers are >> >>restarted >> >> >> >> >> > >automatically. In standalone mode, you have to take care >>of >> >> >>this >> >> >> >> >> yourself. >> >> >> >> >> > > >> >> >> >> >> > > >> >> >> >> >> > >Does this help? >> >> >> >> >> > > >> >> >> >> >> > > Ufuk >> >> >> >> >> > > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>