Hi Kaan, explicitly mapping to physical nodes is currently not supported and would need some workarounds. I have readded user mailing list (please always also include it in response); maybe someone can help you with that.
Best, Arvid On Thu, Apr 30, 2020 at 10:12 AM Kaan Sancak <kaans...@gmail.com> wrote: > One quick question tho, on each generator node I am opening 24 sockets > (number of cores that I have). Is there a way to guarantee that while doing > the map function, each of the slave nodes distributes this 24 socket ports > between its task slots(each slave also have 24 slave), > Sorry, I have asked a lot questions. > > Stay safe! > Kaan > > On Thu, Apr 30, 2020 at 3:06 AM Kaan Sancak <kaans...@gmail.com> wrote: > >> Hi Arvid, >> As you said, I am only interested in batch processing right now. And it >> seems to be working fine now. >> >> Thanks for your help! >> Best >> Kaan >> >> On Thu, Apr 30, 2020 at 2:31 AM Arvid Heise <ar...@ververica.com> wrote: >> >>> Hi Kaan, >>> >>> not entirely sure I understand your solution. I gathered that you create >>> a dataset of TCP addresses and then use flatMap to fetch and output the >>> data? >>> >>> If so, then I think it's a good solution for batch processing (DataSet). >>> It doesn't work in DataStream because it doesn't play well with >>> checkpointing, but you seem to be interested only in batch. It's also not >>> the first time, I have seen this pattern being used in batch. >>> >>> In general, if it works and is fast enough, it's always a good solution >>> ;). No need to make it more complicated if you can solve it with simpler >>> means and you can maintain it more easily. >>> >>> Best, >>> >>> Arvid >>> >>> On Wed, Apr 29, 2020 at 10:19 PM Kaan Sancak <kaans...@gmail.com> wrote: >>> >>>> Hi Arvid, >>>> >>>> I have implemented a zmq listener class without extending any class of >>>> Flink. >>>> The listener has a constructor with the port number. >>>> >>>> Then in the execution I have created a dateset of string which has the >>>> port numbers. >>>> Then I used a flattop function, which returned Tuple2<Long, Long>. I >>>> opened the tcp sockets using localhost, so matching was quite easy. >>>> >>>> This seemed to work for me. What do you think about this >>>> implementation. Do you see any drawback? >>>> >>>> Best >>>> Kaan >>>> >>>> On Apr 29, 2020, at 7:40 AM, Arvid Heise <ar...@ververica.com> wrote: >>>> >>>> Hi Kaan, >>>> >>>> seems like ZMQ is using TCP and not HTTP. So I guess the easiest way >>>> would be to use a ZMQ Java binding to access it [1]. >>>> >>>> But of course, it's much more complicated to write an iterator logic >>>> for that. Not sure how ZMQ signals the end of such a graph? Maybe it closes >>>> the socket and you can just read as much as possible. >>>> >>>> [1] https://zeromq.org/languages/java/ >>>> >>>> On Tue, Apr 28, 2020 at 10:56 PM Kaan Sancak <kaans...@gmail.com> >>>> wrote: >>>> >>>>> Hi Arvid, >>>>> >>>>> I am sorry for the late response. I had some deadlines, but I am back >>>>> to work now. >>>>> I have been trying to implement what we have talked. But I am having >>>>> problems on the implementation. >>>>> I have been using ZMQ to open sockets, because that is inheritenly >>>>> supported in my graph generator. But, I couldn’t make the connection using >>>>> input streams. >>>>> Do you have any specific examples, where I can look and have a better >>>>> idea on how to implement this? >>>>> >>>>> Best >>>>> Kaan >>>>> >>>>> On Apr 24, 2020, at 4:37 AM, Arvid Heise <ar...@ververica.com> wrote: >>>>> >>>>> Hm, I confused sockets to work the other way around (so pulling like >>>>> URLInputStream instead of listening). I'd go by providing the data on >>>>> a port on each generator node. And then read from that in multiple >>>>> sources. >>>>> >>>>> I think the best solution is to implement a custom InputFormat and >>>>> then use readInput. You could implement a subclass of >>>>> GenericInputFormat. You might even use IteratorInputFormat like this: >>>>> >>>>> private static class URLInputIterator implements Iterator<Tuple2<Long, >>>>> Long>>, Serializable { >>>>> private final URL url; >>>>> private Iterator<Tuple2<Long, Long>> inner; >>>>> >>>>> private URLInputIterator(URL url) { >>>>> this.url = url; >>>>> } >>>>> >>>>> private void readObject(ObjectInputStream in) throws IOException, >>>>> ClassNotFoundException { >>>>> InputStream inputStream = url.openStream(); >>>>> inner = new BufferedReader(new InputStreamReader(inputStream, >>>>> StandardCharsets.UTF_8)) >>>>> .lines() >>>>> .map(line -> { >>>>> String[] parts = line.split(";"); >>>>> return new Tuple2<>(Long.parseLong(parts[0]), >>>>> Long.parseLong(parts[1])); >>>>> }) >>>>> .iterator(); >>>>> } >>>>> >>>>> @Override >>>>> public boolean hasNext() { >>>>> return inner.hasNext(); >>>>> } >>>>> >>>>> @Override >>>>> public Tuple2<Long, Long> next() { >>>>> return inner.next(); >>>>> } >>>>> } >>>>> >>>>> env.fromCollection(new URLInputIterator(new URL("gen_node1", 9999)), >>>>> Types.TUPLE(Types.LONG, Types.LONG)); >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Apr 24, 2020 at 9:42 AM Kaan Sancak <kaans...@gmail.com> >>>>> wrote: >>>>> >>>>>> Yes, that sounds like a great idea and actually that's what I am >>>>>> trying to do. >>>>>> >>>>>> Then you configure your analysis job to read from each of these >>>>>> sockets with a separate source and union them before feeding them to the >>>>>> actual job? >>>>>> >>>>>> >>>>>> Before trying to open the sockets on the slave nodes, first I have >>>>>> opened just one socket at master node, and I also run the generator with >>>>>> one node as well. I was able to read the graph, and the run my algorithm >>>>>> without any problems. This was a test run to see whatever I can do it. >>>>>> >>>>>> After, I have opened bunch of sockets on my generators, now I am >>>>>> trying to configure Flink to read from those sockets. However, I am >>>>>> having >>>>>> problems while trying to assign each task manager to a separate socket. I >>>>>> am assuming my problems are related to network binding. In my >>>>>> configuration >>>>>> file, jobmanager.rpc.address is set but I have not done >>>>>> similar configurations for slave nodes. >>>>>> >>>>>> Am I on the right track, or is there an easier way to handle this? >>>>>> >>>>>> I think my point is how to do `read from each of these sockets with >>>>>> a separate source` part. >>>>>> >>>>>> Thanks again >>>>>> >>>>>> Best >>>>>> Kaan >>>>>> >>>>>> >>>>>> >>>>>> On Apr 24, 2020, at 3:11 AM, Arvid Heise <ar...@ververica.com> wrote: >>>>>> >>>>>> Hi Kaan, >>>>>> >>>>>> sorry, I haven't considered I/O as the bottleneck. I thought a bit >>>>>> more about your issue and came to a rather simple solution. >>>>>> >>>>>> How about you open a socket on each of your generator nodes? Then you >>>>>> configure your analysis job to read from each of these sockets with a >>>>>> separate source and union them before feeding them to the actual job? >>>>>> >>>>>> You don't need to modify much on the analysis job and each source can >>>>>> be independently read. WDYT? >>>>>> >>>>>> On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak <kaans...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the answer! Also thanks for raising some concerns about >>>>>>> my question. >>>>>>> >>>>>>> Some of the graphs I have been using is larger than 1.5 tb, and I am >>>>>>> currently an experiment stage of a project, and I am making >>>>>>> modifications >>>>>>> to my code and re-runing the experiments again. Currently, on some of >>>>>>> the >>>>>>> largest graphs I have been using, IO became an issue for me and keeps me >>>>>>> wait for couple of hours. >>>>>>> >>>>>>> Moreover, I have a parallel/distributed graph generator, which I can >>>>>>> run on the same set of nodes in my cluster. So what I wanted to do was, >>>>>>> to >>>>>>> run my Flink program and graph generator at the same time and feed the >>>>>>> graph through generator, which should be faster than making IO from the >>>>>>> disk. As you said, it is not essential for me to that, but I am trying >>>>>>> to >>>>>>> see what I am able to do using Flink and how can I solve such problems. >>>>>>> I >>>>>>> was also using another framework, and faced with the similar problem, I >>>>>>> was >>>>>>> able to reduce the graph read time from hours to minutes using this >>>>>>> method. >>>>>>> >>>>>>> Do you really have more main memory than disk space? >>>>>>> >>>>>>> >>>>>>> My issue is actually not storage related, I am trying to see how can >>>>>>> I reduce the IO time. >>>>>>> >>>>>>> One trick came to my mind is, creating dummy dataset, and using a >>>>>>> map function on the dataset, I can open-up bunch of sockets and listen >>>>>>> the >>>>>>> generator, and collect the generated data. I am trying to see how it >>>>>>> will >>>>>>> turn out. >>>>>>> >>>>>>> Alternatively, if graph generation is rather cheap, you could also >>>>>>> try to incorporate it directly into the analysis job. >>>>>>> >>>>>>> >>>>>>> I am not familiar with the analysis jobs. I will look into it. >>>>>>> >>>>>>> Again, this is actually not a problem, I am just trying to >>>>>>> experiment with the framework and see what I can do. I am very new to >>>>>>> Flink, so my methods might be wrong. Thanks for the help! >>>>>>> >>>>>>> Best >>>>>>> Kaan >>>>>>> >>>>>>> >>>>>>> On Apr 23, 2020, at 10:51 AM, Arvid Heise <ar...@ververica.com> >>>>>>> wrote: >>>>>>> >>>>>>> Hi Kaan, >>>>>>> >>>>>>> afaik there is no (easy) way to switch from streaming back to batch >>>>>>> API while retaining all data in memory (correct me if I misunderstood). >>>>>>> >>>>>>> However, from your description, I also have some severe >>>>>>> understanding problems. Why can't you dump the data to some file? Do you >>>>>>> really have more main memory than disk space? Or do you have no shared >>>>>>> memory between your generating cluster and the flink cluster? >>>>>>> >>>>>>> It almost sounds as if the issue at heart is rather to find a good >>>>>>> serialization format on how to store the edges. The 70 billion edges >>>>>>> could >>>>>>> be stored in an array of id pairs, which amount to ~560 GB uncompressed >>>>>>> data if stored in Avro (or any other binary serialization format) when >>>>>>> ids >>>>>>> are longs. That's not much by today's standards and could also be easily >>>>>>> offloaded to S3. >>>>>>> >>>>>>> Alternatively, if graph generation is rather cheap, you could also >>>>>>> try to incorporate it directly into the analysis job. >>>>>>> >>>>>>> On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak <kaans...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I have been running some experiments on large graph data, smallest >>>>>>>> graph I have been using is around ~70 billion edges. I have a graph >>>>>>>> generator, which generates the graph in parallel and feeds to the >>>>>>>> running >>>>>>>> system. However, it takes a lot of time to read the edges, because even >>>>>>>> though the graph generation process is parallel, in Flink I can only >>>>>>>> listen >>>>>>>> from master node (correct me if I am wrong). Another option is dumping >>>>>>>> the >>>>>>>> generated data to a file and reading with readFromCsv, however this is >>>>>>>> not >>>>>>>> feasible in terms of storage management. >>>>>>>> >>>>>>>> What I want to do is, invoking my graph generator, using ipc/tcp >>>>>>>> protocols and reading the generated data from the sockets. Since the >>>>>>>> graph >>>>>>>> data is also generated parallel in each node, I want to make use of >>>>>>>> ipc, >>>>>>>> and read the data in parallel at each node. I made some online digging >>>>>>>> but >>>>>>>> couldn’t find something similar using dataset api. I would be glad if >>>>>>>> you >>>>>>>> have some similar use cases or examples. >>>>>>>> >>>>>>>> Is it possible to use streaming environment to create the data in >>>>>>>> parallel and switch to dataset api? >>>>>>>> >>>>>>>> Thanks in advance! >>>>>>>> >>>>>>>> Best >>>>>>>> Kaan >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Arvid Heise | Senior Java Developer >>>>>>> <https://www.ververica.com/> >>>>>>> >>>>>>> Follow us @VervericaData >>>>>>> -- >>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>>>> Conference >>>>>>> Stream Processing | Event Driven | Real Time >>>>>>> -- >>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>>> -- >>>>>>> Ververica GmbH >>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>>>> Ji (Toni) Cheng >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Arvid Heise | Senior Java Developer >>>>>> <https://www.ververica.com/> >>>>>> >>>>>> Follow us @VervericaData >>>>>> -- >>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>>> Conference >>>>>> Stream Processing | Event Driven | Real Time >>>>>> -- >>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>> -- >>>>>> Ververica GmbH >>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>>> Ji (Toni) Cheng >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Arvid Heise | Senior Java Developer >>>>> <https://www.ververica.com/> >>>>> >>>>> Follow us @VervericaData >>>>> -- >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>> Conference >>>>> Stream Processing | Event Driven | Real Time >>>>> -- >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> -- >>>>> Ververica GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>> Ji (Toni) Cheng >>>>> >>>>> >>>>> >>>> >>>> -- >>>> Arvid Heise | Senior Java Developer >>>> <https://www.ververica.com/> >>>> >>>> Follow us @VervericaData >>>> -- >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> Stream Processing | Event Driven | Real Time >>>> -- >>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> -- >>>> Ververica GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>>> (Toni) Cheng >>>> >>>> >>>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng