Hi Kaan,

seems like ZMQ is using TCP and not HTTP. So I guess the easiest way would
be to use a ZMQ Java binding to access it [1].

But of course, it's much more complicated to write an iterator logic for
that. Not sure how ZMQ signals the end of such a graph? Maybe it closes the
socket and you can just read as much as possible.

[1] https://zeromq.org/languages/java/

On Tue, Apr 28, 2020 at 10:56 PM Kaan Sancak <kaans...@gmail.com> wrote:

> Hi Arvid,
>
> I am sorry for the late response. I had some deadlines, but I am back to
> work now.
> I have been trying to implement what we have talked. But I am having
> problems on the implementation.
> I have been using ZMQ to open sockets, because that is inheritenly
> supported in my graph generator. But, I couldn’t make the connection using
> input streams.
> Do you have any specific examples, where I can look and have a better idea
> on how to implement this?
>
> Best
> Kaan
>
> On Apr 24, 2020, at 4:37 AM, Arvid Heise <ar...@ververica.com> wrote:
>
> Hm, I confused sockets to work the other way around (so pulling like
> URLInputStream instead of listening). I'd go by providing the data on a
> port on each generator node. And then read from that in multiple sources.
>
> I think the best solution is to implement a custom InputFormat and then
> use readInput. You could implement a subclass of GenericInputFormat. You
> might even use IteratorInputFormat like this:
>
> private static class URLInputIterator implements Iterator<Tuple2<Long, 
> Long>>, Serializable {
>    private final URL url;
>    private Iterator<Tuple2<Long, Long>> inner;
>
>    private URLInputIterator(URL url) {
>       this.url = url;
>    }
>
>    private void readObject(ObjectInputStream in) throws IOException, 
> ClassNotFoundException {
>       InputStream inputStream = url.openStream();
>       inner = new BufferedReader(new InputStreamReader(inputStream, 
> StandardCharsets.UTF_8))
>          .lines()
>          .map(line -> {
>             String[] parts = line.split(";");
>             return new Tuple2<>(Long.parseLong(parts[0]), 
> Long.parseLong(parts[1]));
>          })
>          .iterator();
>    }
>
>    @Override
>    public boolean hasNext() {
>       return inner.hasNext();
>    }
>
>    @Override
>    public Tuple2<Long, Long> next() {
>       return inner.next();
>    }
> }
>
> env.fromCollection(new URLInputIterator(new URL("gen_node1", 9999)), 
> Types.TUPLE(Types.LONG, Types.LONG));
>
>
>
>
> On Fri, Apr 24, 2020 at 9:42 AM Kaan Sancak <kaans...@gmail.com> wrote:
>
>> Yes, that sounds like a great idea and actually that's what I am trying
>> to do.
>>
>>  Then you configure your analysis job to read from each of these sockets
>> with a separate source and union them before feeding them to the actual job?
>>
>>
>> Before trying to open the sockets on the slave nodes, first I have opened
>> just one socket at master node, and I also run the generator with one node
>> as well. I was able to read the graph, and the run my algorithm without any
>> problems. This was a test run to see whatever I can do it.
>>
>> After, I have opened bunch of sockets on my generators, now I am trying
>> to configure Flink to read from those sockets. However, I am having
>> problems while trying to assign each task manager to a separate socket. I
>> am assuming my problems are related to network binding. In my configuration
>> file,  jobmanager.rpc.address is set but I have not done
>> similar configurations for slave nodes.
>>
>> Am I on the right track, or is there an easier way to handle this?
>>
>> I think my point is how to do `read from each of these sockets with a
>> separate source` part.
>>
>> Thanks again
>>
>> Best
>> Kaan
>>
>>
>>
>> On Apr 24, 2020, at 3:11 AM, Arvid Heise <ar...@ververica.com> wrote:
>>
>> Hi Kaan,
>>
>> sorry, I haven't considered I/O as the bottleneck. I thought a bit more
>> about your issue and came to a rather simple solution.
>>
>> How about you open a socket on each of your generator nodes? Then you
>> configure your analysis job to read from each of these sockets with a
>> separate source and union them before feeding them to the actual job?
>>
>> You don't need to modify much on the analysis job and each source can be
>> independently read. WDYT?
>>
>> On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak <kaans...@gmail.com> wrote:
>>
>>> Thanks for the answer! Also thanks for raising some concerns about my
>>> question.
>>>
>>> Some of the graphs I have been using is larger than 1.5 tb, and I am
>>> currently an experiment stage of a project, and I am making modifications
>>> to my code and re-runing the experiments again. Currently, on some of the
>>> largest graphs I have been using, IO became an issue for me and keeps me
>>> wait for couple of hours.
>>>
>>> Moreover, I have a parallel/distributed graph generator, which I can run
>>> on the same set of nodes in my cluster. So what I wanted to do was, to run
>>> my Flink program and graph generator at the same time and feed the graph
>>> through generator, which should be faster than making IO from the disk. As
>>> you said, it is not essential for me to that, but I am trying to see what I
>>> am able to do using Flink and how can I solve such problems. I was also
>>> using another framework, and faced with the similar problem, I was able to
>>> reduce the graph read time from hours to minutes using this method.
>>>
>>>  Do you really have more main memory than disk space?
>>>
>>>
>>> My issue is actually not storage related, I am trying to see how can I
>>> reduce the IO time.
>>>
>>> One trick came to my mind is, creating dummy dataset, and using a map
>>> function on the dataset, I can open-up bunch of sockets and listen the
>>> generator, and collect the generated data. I am trying to see how it will
>>> turn out.
>>>
>>> Alternatively, if graph generation is rather cheap, you could also try
>>> to incorporate it directly into the analysis job.
>>>
>>>
>>> I am not familiar with the analysis jobs. I will look into it.
>>>
>>> Again, this is actually not a problem, I am just trying to experiment
>>> with the framework and see what I can do. I am very new to Flink, so my
>>> methods might be wrong. Thanks for the help!
>>>
>>> Best
>>> Kaan
>>>
>>>
>>> On Apr 23, 2020, at 10:51 AM, Arvid Heise <ar...@ververica.com> wrote:
>>>
>>> Hi Kaan,
>>>
>>> afaik there is no (easy) way to switch from streaming back to batch API
>>> while retaining all data in memory (correct me if I misunderstood).
>>>
>>> However, from your description, I also have some severe understanding
>>> problems. Why can't you dump the data to some file? Do you really have more
>>> main memory than disk space? Or do you have no shared memory between your
>>> generating cluster and the flink cluster?
>>>
>>> It almost sounds as if the issue at heart is rather to find a good
>>> serialization format on how to store the edges. The 70 billion edges could
>>> be stored in an array of id pairs, which amount to ~560 GB uncompressed
>>> data if stored in Avro (or any other binary serialization format) when ids
>>> are longs. That's not much by today's standards and could also be easily
>>> offloaded to S3.
>>>
>>> Alternatively, if graph generation is rather cheap, you could also try
>>> to incorporate it directly into the analysis job.
>>>
>>> On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak <kaans...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have been running some experiments on  large graph data, smallest
>>>> graph I have been using is around ~70 billion edges. I have a graph
>>>> generator, which generates the graph in parallel and feeds to the running
>>>> system. However, it takes a lot of time to read the edges, because even
>>>> though the graph generation process is parallel, in Flink I can only listen
>>>> from master node (correct me if I am wrong). Another option is dumping the
>>>> generated data to a file and reading with readFromCsv, however this is not
>>>> feasible in terms of storage management.
>>>>
>>>> What I want to do is, invoking my graph generator, using ipc/tcp
>>>> protocols  and reading the generated data from the sockets. Since the graph
>>>> data is also generated parallel in each node, I want to make use of ipc,
>>>> and read the data in parallel at each node. I made some online digging  but
>>>> couldn’t find something similar using dataset api. I would be glad if you
>>>> have some similar use cases or examples.
>>>>
>>>> Is it possible to use streaming environment to create the data in
>>>> parallel and switch to dataset api?
>>>>
>>>> Thanks in advance!
>>>>
>>>> Best
>>>> Kaan
>>>
>>>
>>>
>>> --
>>> Arvid Heise | Senior Java Developer
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>> --
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>> Stream Processing | Event Driven | Real Time
>>> --
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>>
>>>
>>
>> --
>> Arvid Heise | Senior Java Developer
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>> --
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>> Stream Processing | Event Driven | Real Time
>> --
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>>
>>
>
> --
> Arvid Heise | Senior Java Developer
> <https://www.ververica.com/>
>
> Follow us @VervericaData
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
> Stream Processing | Event Driven | Real Time
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to