Re: For each element in a dataset, do something with another dataset

2015-09-30 Thread Gábor Gévay
Hello, Alternatively, if dataset B fits in memory, but dataset A doesn't, then you can do it with broadcasting B to a RichMapPartitionFunction on A: In the open method of mapPartition, you sort B. Then, for each element of A, you do a binary search in B, and look at the index found by the binary s

Re: Reading multiple datasets with one read operation

2015-10-22 Thread Gábor Gévay
Hello! > I have thought about a workaround where the InputFormat would return > Tuple2s and the first field is the name of the dataset to which a record > belongs. This would however require me to filter the read data once for > each dataset or to do a groupReduce which is some overhead i'm > look

Re: Local collection data sink for the streaming API

2016-01-05 Thread Gábor Gévay
Hi Filipe, You can take a look at `DataStreamUtils.collect` in flink-contrib/flink-streaming-contrib. Best, Gábor 2016-01-05 16:14 GMT+01:00 Filipe Correia : > Hi, > > Collecting results locally (e.g., for unit testing) is possible in the > DataSet API by using "LocalCollectionOutputFormat", a

Re: Local collection data sink for the streaming API

2016-01-05 Thread Gábor Gévay
ream, rather than > org.apache.flink.streaming.api.scala.DataStream. Any suggestion on how > to handle this, other than creating my own scala implementation of > DataStreamUtils.collect()? > > Thanks, > > Filipe > > On Tue, Jan 5, 2016 at 3:33 PM, Gábor Gévay wrote: >> Hi Filipe, >> >>

Re: Redeployements and state

2016-01-14 Thread Gábor Gévay
Hello, You are probably looking for this feature: https://issues.apache.org/jira/browse/FLINK-2976 Best, Gábor 2016-01-14 11:05 GMT+01:00 Niels Basjes : > Hi, > > I'm working on a streaming application using Flink. > Several steps in the processing are state-full (I use custom Windows and > s

Re: Left join with unbalanced dataset

2016-02-02 Thread Gábor Gévay
Hello Arnaud, > Flink does not start the reduce operation until all lines have > been created (memory bottleneck is during the collection > of all lines) ; but theorically it is possible. The problem that `S.groupBy(...).reduce(...)` needs to fully materialize S comes from the fact that the imple

Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Gábor Gévay
Hello, > I think that there is actually a fundamental latency issue with > "exactly once sinks", no matter how you implement them in any systems: > You can only commit once you are sure that everything went well, > to a specific point where you are sure no replay will ever be needed. What if the

Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Gábor Gévay
The way I imagine this is that the sink would have its "own checkpoints" separately from the rest of the system, and with much smaller interval, and writes to Kafka (with "transactional cooperation", as Stephan mentioned) during making these checkpoints. And then when a replay happens from a global

Re: Best way to process data in many files? (FLINK-BATCH)

2016-02-24 Thread Gábor Gévay
Hello, > // For each "filename" in list do... > DataSet featureList = fileList > .flatMap(new ReadDataSetFromFile()) // flatMap because there > might multiple DataSets in a file What happens if you just insert .rebalance() before the flatMap? > This kind of DataSource will only b

Re: time spent for iteration

2016-03-09 Thread Gábor Gévay
Hello, If you increase the log level, you can see each step of the iteration separately in the log, with timestamps. Best, Gábor 2016-03-09 14:04 GMT+01:00 Riccardo Diomedi : > Is it possible to add timer for the time spent for iteration when iterate > operator or the delta iterate operator

Re: time spent for iteration

2016-03-09 Thread Gábor Gévay
his > information. > Wouldn't it make sense to expose this to the web UI for example? > We actually had a discussion about this some time ago [1]. > > -Vasia. > > [1]: https://issues.apache.org/jira/browse/FLINK-1759 > > On 9 March 2016 at 14:37, Gábor Gévay wrote: >>

Re: Multi-field "sum" function just like "keyBy"

2016-06-07 Thread Gábor Gévay
Hello, In the case of "sum", you can just specify them one after the other, like: stream.sum(1).sum(2) This works, because summing the two fields are independent. However, in the case of "keyBy", the information is needed from both fields at the same time to produce the key. Best, Gábor 2016

Re: Multi-field "sum" function just like "keyBy"

2016-06-07 Thread Gábor Gévay
do another sum call on that. Would tell me how did you manage to > do > > stream.sum().sum() > > Regards, > -Rami > > On 7 Jun 2016, at 16:13, Gábor Gévay wrote: > > Hello, > > In the case of "sum", you can just specify them one after the other, like: >

Re: Performance issues with GroupBy?

2016-07-26 Thread Gábor Gévay
Hello Robert, > Is there something I might could do to optimize the grouping? You can try to make your `RichGroupReduceFunction` implement the `GroupCombineFunction` interface, so that Flink can do combining before the shuffle, which might significantly reduce the network load. (How much the comb

Re: flink datastream reduce

2016-08-29 Thread Gábor Gévay
Hello, The result contains (a,Map(3 -> rt)) because reduce prints all intermediate results (sometimes called a "rolling reduce"). It's designed this way, because Flink streams are generally infinite, so there is no last element where you could print the "final" results. However, you can use window

Re: Nested iterations

2016-09-01 Thread Gábor Gévay
Hello Supun, Unfortunately, nesting of Flink's iteration constructs are not supported at the moment. There are some workarounds though: 1. You can start a Flink job for each step of the iteration. Starting a Flink job has some overhead, so this only works if there is a sufficient amount of work

Re: Nested iterations

2016-09-01 Thread Gábor Gévay
e any plans to support nested loops in the future? > > Thanks, > Supun.. > > On Thu, Sep 1, 2016 at 12:28 PM, Gábor Gévay wrote: >> >> Hello Supun, >> >> Unfortunately, nesting of Flink's iteration constructs are not >> supported at the moment. >>

Re: Streaming issue help needed

2016-09-15 Thread Gábor Gévay
Hello Vaidya, The error message is talking about the class "AbstractTime", which was removed long ago (before 1.0). Could you please double check that the Flink version is set appropriately everywhere, and that the binaries running in the cluster are really 1.1.2? Best, Gábor 2016-09-15 11:52

Re: can Flink use multi "addSink"?

2016-09-26 Thread Gábor Gévay
Hello, You can't call map on the sink, but instead you can continue from the stream that you have just before the sink: val stream = datastream.filter(new Myfilter()) val sink1 = stream.addSink(new Mysink()) val sink2 = stream.map(new MyMap()).addSink(MySink2()) Best, Gábor 2016-09-26 12:47 G

Re: Flink strange stream join behavior

2016-10-16 Thread Gábor Gévay
Hello, For your first question: > the number of tuples are same in both cases I guess you mean the total number of tuples here, right? So this means that you have fewer, but larger windows. Suppose that you have W windows, each with S tuples. Then your total input has W * S tuples, and your tota

Re: Looping over a DataSet and accesing another DataSet

2016-10-30 Thread Gábor Gévay
Hello, In Flink, one often used way to access data from multiple DataSets at the same time is to perform a join (Flink actually calls equi-joins [1] just "join"), just as in the database world. For example, in the algorithm that you linked, you access A[u] for every edge (u,v). I assume that you

Re: Retrieving values from a dataset of datasets

2016-11-15 Thread Gábor Gévay
Hello, How exactly do you represent the DataSet of DataSets? I'm asking because if you have something like a DataSet> that unfortunately doesn't work in Flink. Best, Gábor 2016-11-14 20:44 GMT+01:00 otherwise777 : > Hey There, > > I'm trying to calculate the betweenness in a graph with Flin

Re: Retrieving values from a dataset of datasets

2016-11-16 Thread Gábor Gévay
The short answer is that because DataSet is not serializable. I think the main underlying problem is that Flink needs to see all DataSet operations before launching the job. However, if you have a DataSet>, then operations on the inner DataSets will end up being specified inside the UDFs of operat

Re: spark vs flink batch performance

2016-11-18 Thread Gábor Gévay
Hello, Your program looks mostly fine, but there are a few minor things that might help a bit: Parallelism: In your attached flink-conf.yaml, you have 2 task slots per task manager, and if you have 1 task manager, then your total number of task slots is also 2. However, your default parallelism i

Re: spark vs flink batch performance

2016-11-18 Thread Gábor Gévay
ts because in those kind of >> tests, spark and flink was either on par or flink 10-15% faster than spark >> in the past. Aside from that are any configuration parameters you may >> propose to fine tune flink? >> >> Best, >> Anıl >> >> On Nov 18, 2016 12

Re: Equivalent of Rx combineLatest() on a join?

2016-12-13 Thread Gábor Gévay
Dear Denis, I think you can do it with a simple CoFlatMapFunction (without windows): To use a CoFlatMapFunction, you need to first connect [1] your streams, which results in a ConnectedStreams. Then you can call flatMap on this, and give a CoFlatMapFunction to it (where two different callbacks are

Re: How to get top N elements in a DataSet?

2017-01-24 Thread Gábor Gévay
Hello, Btw. there is a Jira about this: https://issues.apache.org/jira/browse/FLINK-2549 Note that the discussion there suggests a more efficient approach, which doesn't involve sorting the entire partitions. And if I remember correctly, this question comes up from time to time on the mailing lis

Re: Cyclic ConnectedStream

2017-01-28 Thread Gábor Gévay
Hello, Cyclic dataflows can be built using iterations: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html#iterations Best, Gábor 2017-01-28 18:39 GMT+01:00 Matt : > I have a ConnectedStream (A) that depends on another ConnectedStream (B), > which depends on th

Re: Cyclic ConnectedStream

2017-01-31 Thread Gábor Gévay
t; *statsStream* (flatMap2, where it's updated with an object from *Input2*) >> and finally to *predictionStream* (flatMap2). >> >> The same operator is never applied twice to the object, thus I would say >> this dataflow is cyclic only in the dependencies of the stream &

Re: Join with Default-Value

2017-02-10 Thread Gábor Gévay
Hello Sebastian, You can use DataSet.leftOuterJoin for this. Best, Gábor 2017-02-10 12:58 GMT+01:00 Sebastian Neef : > Hi, > > is it possible to assign a "default" value to elements that didn't match? > > For example I have the following two datasets: > > |DataSetA | DataSetB| > -

Re: Join with Default-Value

2017-02-10 Thread Gábor Gévay
I'm not sure what exactly is the problem, but could you check this FAQ item? http://flink.apache.org/faq.html#why-am-i-getting-a-nonserializableexception- Best, Gábor 2017-02-10 14:16 GMT+01:00 Sebastian Neef : > Hi, > > thanks! That's exactly what I needed. > > I'm not using: DataSetA.leftOute

Re: Deep Copy in FLINK, Kryo Copy is used in the different operator

2018-02-14 Thread Gábor Gévay
Hello, You might also be able to make Flink use a better serializer than Kryo. Flink falls back to Kryo when it can't use its own serializers, see here: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/types_serialization.html For example, it might help to make your type a POJO. Be

Re: Recovering snapshot state saved with TypeInformation generated by the implicit macro from the scala API

2018-04-13 Thread Gábor Gévay
Hello, A bit of an ugly hack, but maybe you could manually create a class named exactly io.relayr.counter.FttCounter$$anon$71$$anon$33, and copy-paste into it the code that the macro is expanded into [1]? Best, Gábor [1] https://stackoverflow.com/questions/11677609/how-do-i-print-an-expanded-ma

Re: Beginner question - sum multiple edges

2017-04-17 Thread Gábor Gévay
Hello Marc, You can group by edge, and then sum: edges .groupBy(0,1) // make first two fields a composite key .sum(2); // sum the value field This will turn multiple edges that have the same source and target into one edge, whose value will be the sum of the values of the original group of e

Re: Flink memory usage

2017-04-20 Thread Gábor Gévay
Hello, You could also try using a profiler that shows what objects are using what amount of memory. E.g., JProfiler or Java Flight Recorder [1]. Best, Gábor [1] https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/memleaks001.html On Thu, Apr 20, 2017 at 6:00 PM, Newport, B

Re: Beginner question - sum multiple edges

2017-04-23 Thread Gábor Gévay
).sum(2), > ExecutionEnvironment.getExecutionEnvironment()); > // reduce self-looping > Graph networkGraph = new Simplify<>().run(networkSumMultiEdges); > > > How can I reduce and combine (1 2 20) and (2 1 10) to one Tuple? > > > Best regards > Marc >

Re: Flink Vs Google Cloud Dataflow?

2017-08-04 Thread Gábor Gévay
Hello, Have you seen these two blog posts? They explain the relationship between Apache Flink, Apache Beam, and Google Cloud Dataflow. https://data-artisans.com/blog/why-apache-beam https://cloud.google.com/blog/big-data/2016/05/why-apache-beam-a-google-perspective Best, Gábor On Mon, Jul 31,

Re: termination of stream#iterate on finite streams

2017-09-05 Thread Gábor Gévay
Hello, There is a Flink Improvement Proposal to redesign the iterations: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132 This will address the termination issue. Best, Gábor On Mon, Sep 4, 2017 at 11:00 AM, Xingcan Cui wrote: > Hi Peter, > > That's a good idea, but

Re: DataSet: CombineHint heuristics

2017-09-05 Thread Gábor Gévay
Hi Urs, Yes, the 1/10th ratio is just a very loose rule of thumb. I would suggest to try both the SORT and HASH strategies with a workload that is as similar as possible to your production workload (similar data, similar parallelism, etc.), and see which one is faster for your specific use case.

Re: Bulk Iteration

2017-09-14 Thread Gábor Gévay
Hello Alieh, If you set the logging to a more verbose level, then Flink prints a log msg at every iteration. If you need the current iteration number inside your code, then you should create your UDF as an AbstractRichFunction, where you can call getIterationRuntimeContext(), which has getSuperst

Re: Dot notation not working for accessing case classes nested fields

2017-09-15 Thread Gábor Gévay
Hi Federico, Sorry, nested field expressions are not supported in these methods at the moment. I have created a JIRA issue for this: https://issues.apache.org/jira/browse/FLINK-7629 I think this should be easy to fix, as all the infrastructure for supporting this is already in place. I'll try to d

Re: Rule expression for CEP library

2017-09-25 Thread Gábor Gévay
Hello Shailesh, There is a Flink Improvement Proposal for Integration of SQL and CEP: https://cwiki.apache.org/confluence/display/FLINK/FLIP-20:+Integration+of+SQL+and+CEP Best, Gábor On Mon, Sep 25, 2017 at 3:21 PM, Shailesh Jain wrote: > Hi, > > Apart from the Java/Scala API for the CEP libr

Re: At end of complex parallel flow, how to force end step with parallel=1?

2017-10-03 Thread Gábor Gévay
Hi Garrett, You can call .setParallelism(1) on just this operator: ds.reduceGroup(new GroupReduceFunction...).setParallelism(1) Best, Gabor On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton wrote: > I have a complex alg implemented using the DataSet api and by default it > runs with parallel 90

Gelly vertex ID type requirements?

2015-07-30 Thread Gábor Gévay
Hello, I am having some trouble building a graph where the vertex ID type is a POJO. Specifically, it would be a class that has two fields: a long field, and a field which is of another class that has four byte fields. (Both classes implement the Comparable interface, as the Gelly guide specifies.

Re: Gelly vertex ID type requirements?

2015-07-30 Thread Gábor Gévay
Type nor an AtomicType. To > me it looks like the TupleTypeInfoBase condition should be generalized to > CompositeType. > > I will look into this. > > Cheers, Fabian > > 2015-07-30 14:18 GMT+02:00 Gábor Gévay : >> >> Hello, >> >> I am having some trouble b

Re: Gelly vertex ID type requirements?

2015-07-31 Thread Gábor Gévay
(FLINK-2447). Best, Gabor 2015-07-31 0:29 GMT+02:00 Fabian Hueske : > Hi, > > I opened a JIRA (FLINK-2442) and submitted a PR (#963) for the "Wrong field > type" problem. > Is the other problem is addressed in FLINK-2437? > > Cheers, Fabian > > 2015-07-30 16:

Re: Performance Issue

2015-09-09 Thread Gábor Gévay
Btw, there was a discussion about this problem a while back: https://mail-archives.apache.org/mod_mbox/flink-dev/201506.mbox/%3ccadxjeyci9_opro4oqtzhvi-gifek6_66ybtjz_pb0aqop_n...@mail.gmail.com%3E And here is the jira: https://issues.apache.org/jira/browse/FLINK-2181 Best, Gabor 2015-09-09 10:0