Re: Can Connected Components run on a streaming dataset using iterate delta?

2020-02-22 Thread kant kodali
Hi,

Thanks for that but Looks like it is already available
https://github.com/vasia/gelly-streaming in streaming but I wonder why this
is not part of Flink? there are no releases either.

Thanks!

On Tue, Feb 18, 2020 at 9:13 AM Yun Gao  wrote:

>Hi Kant,
>
>   As far as I know, I think the current example connected
> components implementation based on DataSet API could not be extended to
> streaming data or incremental batch directly.
>
>   From the algorithm's perspective, if the graph only add edge
> and never remove edge, I think the connected components should be able to
> be updated incrementally when the graph changes: When some edges are added,
> a new search should be started from the sources of the added edges to
> propagate its component ID. This will trigger a new pass of update of the
> following vertices, and the updates continues until no vertices' component
> ID get updated. However, if there are also edge removes, I think the
> incremental computation should not be easily achieved.
>
>   To implement the above logic on Flink, I think currently
> there should be two possible methods:
> 1) Use DataSet API and DataSet iteration, maintains
> the graph structure and the latest computation result in a storage, and
> whenever there are enough changes to the graph, submits a new DataSet job
> to recompute the result. The job should load the edges, the latest
> component id and whether it is the source of the newly added edges for each
> graph vertex, and then start the above incremental computation logic.
> 2) Flink also provide DataStream iteration API[1] that
> enables iterating on the unbounded data. In this case the graph
> modification should be modeled as a datastream, and some operators inside
> the iteration should maintain the graph structure and current component id.
> whenever there are enough changes, it starts a new pass of computation.
>
> Best,
>  Yun
>
> [1] Flink DataStream iteration,
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html#iterations
>
> --
> From:kant kodali 
> Send Time:2020 Feb. 18 (Tue.) 15:11
> To:user 
> Subject:Can Connected Components run on a streaming dataset using iterate
> delta?
>
> Hi All,
>
> I am wondering if connected components
> 
> can run on a streaming data? or say incremental batch?
>
> I see that with delta iteration not all vertices need to participate at
> every iteration which is great but in my case the graph is evolving over
> time other words new edges are getting added over time. If so, does the
> algorithm needs to run on the entire graph or can it simply run on the new
> batch of edges?
>
> Finally, What is the best way to compute connected components on Graphs
> evolving over time? Should I use streaming or batch or any custom
> incremental approach? Also, the documentation take maxIterations as an
> input. How can I come up with a good number for max iterations? and once I
> come up with a good number for max Iterations is the algorithm guaranteed
> to converge?
>
>
> Thanks,
> Kant
>
>
>


Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

2020-02-22 Thread Niels Basjes
Yes that's it!

My code
https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperInline.java#L144
now does this:

DataStream resultDataStream = ...


List result = new ArrayList<>(5);
DataStreamUtils
.collect(resultDataStream)
.forEachRemaining(result::add);

assertEquals(2, result.size());


And as you explained because the 'collect' already does an execute this
works like a charm.

Niels




On Sat, Feb 22, 2020 at 1:38 AM Robert Metzger  wrote:

> Hey,
> you are right. I'm also seeing this exception now. It was hidden in other
> log output.
>
> The solution to all this confusion is simple: DataStreamUtils.collect() Is
> like an execute().
>
> The stream graph is cleared on each execute(). That's why collect() and
> then execute() lead to the "no operators defined" error.
> However, if you have collect(), print(), execute(), then the print() is
> filling the stream graph again, and you are executing two Flink jobs: the
> collect job and the execute job.
>
> I hope I got it right this time :)
>
> Best,
> Robert
>
> On Fri, Feb 21, 2020 at 4:47 PM Niels Basjes  wrote:
>
>> I tried this in Flink 1.10.0 :
>>
>> @Test
>> public void experimentalTest() throws Exception {
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream input = env.fromElements("One", "Two");
>> //DataStream input = env.addSource(new 
>> StringSourceFunction());
>> List result = new ArrayList<>(5);
>> DataStreamUtils.collect(input).forEachRemaining(result::add);
>> env.execute("Flink Streaming Java API Skeleton");
>> }
>>
>>
>> Results in
>>
>>
>> java.lang.IllegalStateException: No operators defined in streaming topology. 
>> Cannot execute.
>>
>>  at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
>>  at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
>>  at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
>>  at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>  at 
>> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.experimentalTest(TestUserAgentAnalysisMapperInline.java:177)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> ...
>>
>>
>>
>> On Fri, Feb 21, 2020 at 1:00 PM Robert Metzger 
>> wrote:
>>
>>> Hey Niels,
>>>
>>> This minimal Flink job executes in Flink 1.10:
>>>
>>> public static void main(String[] args) throws Exception {
>>>final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>DataStream input = env.addSource(new StringSourceFunction());
>>>List result = new ArrayList<>(5);
>>>DataStreamUtils.collect(input).forEachRemaining(result::add);
>>>env.execute("Flink Streaming Java API Skeleton");
>>> }
>>>
>>> Maybe the TestUserAgentAnalysisMapperInline class is doing some magic
>>> that breaks with the StreamGraphGenerator?
>>>
>>> Best,
>>> Robert
>>>
>>> On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes  wrote:
>>>
 Hi Gordon,

 Thanks. This works for me.

 I find it strange that when I do this it works (I made the differences
 bold)

 List result = new ArrayList<>(5);

 DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);

 *resultDataStream.print();*

 environment.execute();


 how ever this does not work

 List result = new ArrayList<>(5);

 DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);

 environment.execute();


 and this also does not work

 *resultDataStream.print();*

 List result = new ArrayList<>(5);

 DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);

 environment.execute();


 In both these cases it fails with


 java.lang.IllegalStateException: *No operators defined in streaming
 topology. Cannot execute.*

 at
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
 at
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
 at
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
 at
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
 at

Re: [DISCUSS] Drop Savepoint Compatibility with Flink 1.2

2020-02-22 Thread Ufuk Celebi
Hey Stephan,

+1.

Reading over the linked ticket and your description here, I think it makes
a lot of sense to go ahead with this. Since it's possible to upgrade via
intermediate Flink releases as a fail-safe I don't have any concerns.

– Ufuk


On Fri, Feb 21, 2020 at 4:34 PM Till Rohrmann  wrote:
>
> +1 for dropping savepoint compatibility with Flink 1.2.
>
> Cheers,
> Till
>
> On Thu, Feb 20, 2020 at 6:55 PM Stephan Ewen  wrote:
>>
>> Thank you for the feedback.
>>
>> Here is the JIRA issue with some more explanation also about the
background and implications:
>> https://jira.apache.org/jira/browse/FLINK-16192
>>
>> Best,
>> Stephan
>>
>>
>> On Thu, Feb 20, 2020 at 2:26 PM vino yang  wrote:
>>>
>>> +1 for dropping Savepoint compatibility with Flink 1.2
>>>
>>> Flink 1.2 is quite far away from the latest 1.10. Especially after the
release of Flink 1.9 and 1.10, the code and architecture have undergone
major changes.
>>>
>>> Currently, I am updating state migration tests for Flink 1.10. I can
still see some binary snapshot files of version 1.2. If we agree on this
topic, we may be able to alleviate some of the burdens(remove those binary
files) when the migration tests would be updated later.
>>>
>>> Best,
>>> Vino
>>>
>>> Theo Diefenthal  于2020年2月20日周四
下午9:04写道:

 +1 for dropping compatibility.

 I personally think that it is very important for a project to keep a
good pace in developing that old legacy stuff must be dropped from time to
time. As long as there is an upgrade routine (via going to another flink
release) that's fine.

 
 Von: "Stephan Ewen" 
 An: "dev" , "user" 
 Gesendet: Donnerstag, 20. Februar 2020 11:11:43
 Betreff: [DISCUSS] Drop Savepoint Compatibility with Flink 1.2

 Hi all!
 For some cleanup and simplifications, it would be helpful to drop
Savepoint compatibility with Flink version 1.2. That version was released
almost three years ago.

 I would expect that no one uses that old version any more in a way
that they actively want to upgrade directly to 1.11.

 Even if, there is still the way to first upgrade to another version
(like 1.9) and then upgrade to 1.11 from there.

 Any concerns to drop that support?

 Best,
 Stephan


 --
 SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln
 Theo Diefenthal

 T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575
 theo.diefent...@scoop-software.de - www.scoop-software.de
 Sitz der Gesellschaft: Köln, Handelsregister: Köln,
 Handelsregisternummer: HRB 36625
 Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen,
 Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel

>


yarn session: one JVM per task

2020-02-22 Thread David Morin
Hi,
My app is based on a lib that is not thread safe (yet...).
In waiting of the patch has been pushed, how can I be sure that my Sink that 
uses this lib is in one JVM ?
Context: I use one Yarn session and send my Flink jobs to this session

Regards,
David


Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-22 Thread M Singh
Hey Folks:
I am trying to figure out the options for running Flink on Kubernetes and am 
trying to find out the pros and cons of running in Flink Session vs Flink 
Cluster mode 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#flink-session-cluster-on-kubernetes).
I understand that in job mode there is no need to submit the job since it is 
part of the job image.  But what are other the pros and cons of this approach 
vs session mode where a job manager is deployed and flink jobs can be submitted 
it ?  Are there any benefits with regards to:
1. Configuring the jobs 2. Scaling the taskmanager3. Restarting jobs4. Managing 
the flink jobs5. Passing credentials (in case of AWS, etc)6. Fault tolerence 
and recovery of jobs from failure
Also, we will be keeping the checkpoints for the jobs on S3.  Is there any need 
for specifying volume for the pods ?  If volume is required do we need 
provisioned volume and what are the recommended alternatives/considerations 
especially with AWS.
If there are any other considerations, please let me know.
Thanks for your advice.





Re: async io parallelism

2020-02-22 Thread Arvid Heise
Hi Alexey,

the short answer is: order is preserved in all cases.

Basically, ordered asyncIO maintains an internal FIFO queue where all
pending elements reside. All async results are saved into this queue, but
elements will only be outputted when the head element has a result.

So assume you have three input records i1, i2, i3 and get the outputs
asynchronously in the order o2, o1, o3 after 100 ms each, then there is no
output after receiving o2, then o1 and o2 are outputted after 200 ms, and
then o3 after 300 ms.

Best,

Arvid

On Sat, Feb 22, 2020 at 2:22 AM Alexey Trenikhun  wrote:

> Hello,
> Let's say, my elements are simple key-value pairs, elements are coming
> from Kafka, where they were partitioned by "key", then I do processing
> using KeyedProcessFunction (keyed by same "key"), then I enrich elements
> using ordered RichAsyncFunction, then output to another
> KeyedProcessFunction (keyed by same "key") and then write to Kafka topic,
> again partitioned by same "key", something like this:
>
> FlinkKafkaConsumer -> keyBy("key") -> Intake(KeyedProcessFunction)
> -> AsyncDataStream.orderedWait() -> keyBy("key")->Output(
> KeyedProcessFunction)->FlinkKafkaProducer
>
> Will it preserve order of events with same "key"?
>
>- Will Output function receive elements with same "key" in same order
>as they were originally in Kafka?
>- Will FlinkKafkaProducer writes elements with same "key" in same
>order as they were originally in Kafka?
>- Does it depend on parallelism of async IO? Documentation says "the
>stream order is preserved", but if there are multiple parallel instances of
>async function, does it mean order relative to each single instance? Or
>total stream order?
>
> Thanks,
> Alexey
>


Re: [ANNOUNCE] Apache Flink Python API(PyFlink) 1.9.2 released

2020-02-22 Thread Chad Dombrova
It’s an interesting idea to use the Beam SDK to execute PyFlink (i.e.
non-Beam) UDFs.
Thanks for the info.

On Thu, Feb 20, 2020 at 5:41 PM Xingbo Huang  wrote:

> Hi Chad,
> Beam Portability module solves the management of the Python runtime
> environment and the communication between the Python runtime environment
> Python VM and the Java runtime environment JVM in PyFlink. For details, you
> can refer to FLIP-58[1] and the Blog[2].
>
> Hope to resolve your doubts.
>
> Best,
> Xingbo
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
> [2]
> https://enjoyment.cool/2020/02/19/Deep-dive-how-to-support-Python-UDF-in-Apache-Flink-1-10/
>
> Chad Dombrova  于2020年2月21日周五 上午12:16写道:
>
>> Hi,
>> Very cool. I’m curious about the relationship between this feature and
>> Apache Beam.  What parts of Beam are used and for what?  Does this have any
>> impact on existing Beam users like myself who use the Beam python API on
>> top of Flink?  Can someone give me a brief overview or point me at the
>> right documentation?
>>
>> Thanks!
>> -chad
>>
>>
>> On Thu, Feb 20, 2020 at 5:39 AM Xingbo Huang  wrote:
>>
>>> Thanks a lot for the release.
>>> Great Work, Jincheng!
>>> Also thanks to participants who contribute to this release.
>>>
>>> Best,
>>> Xingbo
>>>
>>>
>>> Till Rohrmann  于2020年2月18日周二 下午11:40写道:
>>>
 Thanks for updating the 1.9.2 release wrt Flink's Python API Jincheng!

 Cheers,
 Till

 On Thu, Feb 13, 2020 at 12:25 PM Hequn Cheng  wrote:

> Thanks a lot for the release, Jincheng!
> Also thanks to everyone that make this release possible!
>
> Best,
> Hequn
>
> On Thu, Feb 13, 2020 at 2:18 PM Dian Fu  wrote:
>
> > Thanks for the great work, Jincheng.
> >
> > Regards,
> > Dian
> >
> > 在 2020年2月13日,下午1:32,jincheng sun  写道:
> >
> > Hi everyone,
> >
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink Python API(PyFlink) 1.9.2, which is the first release to PyPI
> for the
> > Apache Flink Python API 1.9 series.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> streaming
> > applications.
> >
> > The release is available for download at:
> >
> > https://pypi.org/project/apache-flink/1.9.2/#files
> >
> > Or installed using pip command:
> >
> > pip install apache-flink==1.9.2
> >
> > We would like to thank all contributors of the Apache Flink
> community who
> > helped to verify this release and made this release possible!
> >
> > Best,
> > Jincheng
> >
> >
> >
>