Hi Kaan, seems like ZMQ is using TCP and not HTTP. So I guess the easiest way would be to use a ZMQ Java binding to access it [1].
But of course, it's much more complicated to write an iterator logic for that. Not sure how ZMQ signals the end of such a graph? Maybe it closes the socket and you can just read as much as possible. [1] https://zeromq.org/languages/java/ On Tue, Apr 28, 2020 at 10:56 PM Kaan Sancak <kaans...@gmail.com> wrote: > Hi Arvid, > > I am sorry for the late response. I had some deadlines, but I am back to > work now. > I have been trying to implement what we have talked. But I am having > problems on the implementation. > I have been using ZMQ to open sockets, because that is inheritenly > supported in my graph generator. But, I couldn’t make the connection using > input streams. > Do you have any specific examples, where I can look and have a better idea > on how to implement this? > > Best > Kaan > > On Apr 24, 2020, at 4:37 AM, Arvid Heise <ar...@ververica.com> wrote: > > Hm, I confused sockets to work the other way around (so pulling like > URLInputStream instead of listening). I'd go by providing the data on a > port on each generator node. And then read from that in multiple sources. > > I think the best solution is to implement a custom InputFormat and then > use readInput. You could implement a subclass of GenericInputFormat. You > might even use IteratorInputFormat like this: > > private static class URLInputIterator implements Iterator<Tuple2<Long, > Long>>, Serializable { > private final URL url; > private Iterator<Tuple2<Long, Long>> inner; > > private URLInputIterator(URL url) { > this.url = url; > } > > private void readObject(ObjectInputStream in) throws IOException, > ClassNotFoundException { > InputStream inputStream = url.openStream(); > inner = new BufferedReader(new InputStreamReader(inputStream, > StandardCharsets.UTF_8)) > .lines() > .map(line -> { > String[] parts = line.split(";"); > return new Tuple2<>(Long.parseLong(parts[0]), > Long.parseLong(parts[1])); > }) > .iterator(); > } > > @Override > public boolean hasNext() { > return inner.hasNext(); > } > > @Override > public Tuple2<Long, Long> next() { > return inner.next(); > } > } > > env.fromCollection(new URLInputIterator(new URL("gen_node1", 9999)), > Types.TUPLE(Types.LONG, Types.LONG)); > > > > > On Fri, Apr 24, 2020 at 9:42 AM Kaan Sancak <kaans...@gmail.com> wrote: > >> Yes, that sounds like a great idea and actually that's what I am trying >> to do. >> >> Then you configure your analysis job to read from each of these sockets >> with a separate source and union them before feeding them to the actual job? >> >> >> Before trying to open the sockets on the slave nodes, first I have opened >> just one socket at master node, and I also run the generator with one node >> as well. I was able to read the graph, and the run my algorithm without any >> problems. This was a test run to see whatever I can do it. >> >> After, I have opened bunch of sockets on my generators, now I am trying >> to configure Flink to read from those sockets. However, I am having >> problems while trying to assign each task manager to a separate socket. I >> am assuming my problems are related to network binding. In my configuration >> file, jobmanager.rpc.address is set but I have not done >> similar configurations for slave nodes. >> >> Am I on the right track, or is there an easier way to handle this? >> >> I think my point is how to do `read from each of these sockets with a >> separate source` part. >> >> Thanks again >> >> Best >> Kaan >> >> >> >> On Apr 24, 2020, at 3:11 AM, Arvid Heise <ar...@ververica.com> wrote: >> >> Hi Kaan, >> >> sorry, I haven't considered I/O as the bottleneck. I thought a bit more >> about your issue and came to a rather simple solution. >> >> How about you open a socket on each of your generator nodes? Then you >> configure your analysis job to read from each of these sockets with a >> separate source and union them before feeding them to the actual job? >> >> You don't need to modify much on the analysis job and each source can be >> independently read. WDYT? >> >> On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak <kaans...@gmail.com> wrote: >> >>> Thanks for the answer! Also thanks for raising some concerns about my >>> question. >>> >>> Some of the graphs I have been using is larger than 1.5 tb, and I am >>> currently an experiment stage of a project, and I am making modifications >>> to my code and re-runing the experiments again. Currently, on some of the >>> largest graphs I have been using, IO became an issue for me and keeps me >>> wait for couple of hours. >>> >>> Moreover, I have a parallel/distributed graph generator, which I can run >>> on the same set of nodes in my cluster. So what I wanted to do was, to run >>> my Flink program and graph generator at the same time and feed the graph >>> through generator, which should be faster than making IO from the disk. As >>> you said, it is not essential for me to that, but I am trying to see what I >>> am able to do using Flink and how can I solve such problems. I was also >>> using another framework, and faced with the similar problem, I was able to >>> reduce the graph read time from hours to minutes using this method. >>> >>> Do you really have more main memory than disk space? >>> >>> >>> My issue is actually not storage related, I am trying to see how can I >>> reduce the IO time. >>> >>> One trick came to my mind is, creating dummy dataset, and using a map >>> function on the dataset, I can open-up bunch of sockets and listen the >>> generator, and collect the generated data. I am trying to see how it will >>> turn out. >>> >>> Alternatively, if graph generation is rather cheap, you could also try >>> to incorporate it directly into the analysis job. >>> >>> >>> I am not familiar with the analysis jobs. I will look into it. >>> >>> Again, this is actually not a problem, I am just trying to experiment >>> with the framework and see what I can do. I am very new to Flink, so my >>> methods might be wrong. Thanks for the help! >>> >>> Best >>> Kaan >>> >>> >>> On Apr 23, 2020, at 10:51 AM, Arvid Heise <ar...@ververica.com> wrote: >>> >>> Hi Kaan, >>> >>> afaik there is no (easy) way to switch from streaming back to batch API >>> while retaining all data in memory (correct me if I misunderstood). >>> >>> However, from your description, I also have some severe understanding >>> problems. Why can't you dump the data to some file? Do you really have more >>> main memory than disk space? Or do you have no shared memory between your >>> generating cluster and the flink cluster? >>> >>> It almost sounds as if the issue at heart is rather to find a good >>> serialization format on how to store the edges. The 70 billion edges could >>> be stored in an array of id pairs, which amount to ~560 GB uncompressed >>> data if stored in Avro (or any other binary serialization format) when ids >>> are longs. That's not much by today's standards and could also be easily >>> offloaded to S3. >>> >>> Alternatively, if graph generation is rather cheap, you could also try >>> to incorporate it directly into the analysis job. >>> >>> On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak <kaans...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I have been running some experiments on large graph data, smallest >>>> graph I have been using is around ~70 billion edges. I have a graph >>>> generator, which generates the graph in parallel and feeds to the running >>>> system. However, it takes a lot of time to read the edges, because even >>>> though the graph generation process is parallel, in Flink I can only listen >>>> from master node (correct me if I am wrong). Another option is dumping the >>>> generated data to a file and reading with readFromCsv, however this is not >>>> feasible in terms of storage management. >>>> >>>> What I want to do is, invoking my graph generator, using ipc/tcp >>>> protocols and reading the generated data from the sockets. Since the graph >>>> data is also generated parallel in each node, I want to make use of ipc, >>>> and read the data in parallel at each node. I made some online digging but >>>> couldn’t find something similar using dataset api. I would be glad if you >>>> have some similar use cases or examples. >>>> >>>> Is it possible to use streaming environment to create the data in >>>> parallel and switch to dataset api? >>>> >>>> Thanks in advance! >>>> >>>> Best >>>> Kaan >>> >>> >>> >>> -- >>> Arvid Heise | Senior Java Developer >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> -- >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> Stream Processing | Event Driven | Real Time >>> -- >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >>> >>> >> >> -- >> Arvid Heise | Senior Java Developer >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> -- >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> Stream Processing | Event Driven | Real Time >> -- >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> >> >> > > -- > Arvid Heise | Senior Java Developer > <https://www.ververica.com/> > > Follow us @VervericaData > -- > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > Stream Processing | Event Driven | Real Time > -- > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > > > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng