Re: EMR vCores and slot allocation

2016-05-02 Thread Robert Metzger
Hi Ken,
sorry for the late response. The number of CPU cores we show in the web
interface is based on what the JVM tells us from
"Runtime.getRuntime().availableProcessors();". I'm not sure how tthe
processor count behaves on Amazon VMs.

Given that each of your servers has 8 vCores, I would set the slot count to
8 per Taskmanager.


On Fri, Apr 29, 2016 at 1:58 AM, Ken Krugler 
wrote:

> Based on what Flink reports in the JobManager GUI, it looks like it thinks
> that the EC2 instances I’m using for my EMR jobs only have 4 physical cores.
>
> Which would make sense, as Amazon describes these servers as having 8
> vCores.
>
> From
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html,
> the recommended configuration would then be 4 slots/TaskManager, yes?
>
> Thanks,
>
> — Ken
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
>


Any way for Flink Elasticsearch connector reflecting IP change of Elasticsearch cluster?

2016-05-02 Thread Sendoh
Hi,

When using Elasticsearch connector, Is there a way to reflect IP change of
Elasticsearch cluster?
We use DNS of Elasticsearch in data sink, e.g. elasticsearch-dev.foo.de.
However, when we replace the old Elasticsearch cluster with a new one, the
Elasticsearch connector cannot write into the new one due to IP change.

This is an important feature for us because we don't have to restart Flink
job. The reason might be Flink-Elasticsearch2 connector looks up for the IP
from DNS only once.
Thus, one way might be when the response of writing into Elasticsearch says
not success, let Flink environment create a new data sink?

We use Flink Elasticsearch-connector2(for Elasticsearch2.x) on AWS

Best,

Sendoh



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Any-way-for-Flink-Elasticsearch-connector-reflecting-IP-change-of-Elasticsearch-cluster-tp6597.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Problem in creating quickstart project using archetype (Scala)

2016-05-02 Thread Aljoscha Krettek
Hi,
I'm sorry for the inconvenience, for the -SNAPSHOT release versions one
must also append the address of the repository to the command, like this:

$ mvn archetype:generate \
   -DarchetypeGroupId=org.apache.flink \
   -DarchetypeArtifactId=flink-quickstart-scala \
   -DarchetypeVersion=1.1-SNAPSHOT \
   -DarchetypeCatalog=
https://repository.apache.org/content/repositories/snapshots/

or you could use the stable version:

mvn archetype:generate \
  -DarchetypeGroupId=org.apache.flink \
  -DarchetypeArtifactId=flink-quickstart-java \
  -DarchetypeVersion=1.0.2

Cheers,
Aljoscha

On Thu, 28 Apr 2016 at 19:02 nsengupta  wrote:

> Hello all,
>
> I don't know if anyone else has faced his; I haven't so far.
>
> When I try to create a new project template following the instructions
> here
> <
> https://ci.apache.org/projects/flink/flink-docs-master/quickstart/scala_api_quickstart.html#requirements
> >
> , it fails.
>
> This is what happens (along with the command I give):
>
> nirmalya@Cheetah:~/Workspace-Flink$ mvn archetype:generate
> \
> >   -DarchetypeGroupId=org.apache.flink  \
> >   -DarchetypeArtifactId=flink-quickstart-scala \
> >   -DarchetypeVersion=1.1-SNAPSHOT
> [INFO] Scanning for projects...
> [INFO]
> [INFO]
> 
> [INFO] Building Maven Stub Project (No POM) 1
> [INFO]
> 
> [INFO]
> [INFO] >>> maven-archetype-plugin:2.3:generate (default-cli) >
> generate-sources @ standalone-pom >>>
> [INFO]
> [INFO] <<< maven-archetype-plugin:2.3:generate (default-cli) <
> generate-sources @ standalone-pom <<<
> [INFO]
> [INFO] --- maven-archetype-plugin:2.3:generate (default-cli) @
> standalone-pom ---
> [INFO] Generating project in Interactive mode
> [INFO] Archetype repository not defined. Using the one from
> [org.apache.flink:flink-quickstart-scala:1.0.2] found in catalog remote
> [INFO]
> 
> [INFO] BUILD FAILURE
> [INFO]
> 
> [INFO] Total time: 01:15 min
> [INFO] Finished at: 2016-04-28T22:22:57+05:30
> [INFO] Final Memory: 14M/226M
> [INFO]
> 
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-archetype-plugin:2.3:generate (default-cli)
> on project standalone-pom: The desired archetype does not exist
> (org.apache.flink:flink-quickstart-scala:1.1-SNAPSHOT) -> [Help 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the -e
> switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions,
> please
> read the following articles:
> [ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> nirmalya@Cheetah:~/Workspace-Flink$
>
>
> Could someone please point out the mistake?
>
> -- Nirmalya
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-in-creating-quickstart-project-using-archetype-Scala-tp6560.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Problem with writeAsText

2016-05-02 Thread Punit Naik
Please ignore this question as I forgot to do a env.execute

On Mon, May 2, 2016 at 11:45 AM, Punit Naik  wrote:

> I have a Dataset which contains only strings. But when I execute a
> writeAsText and supply a folder inside the string, it finishes with the
> following output but does not write any text files:
>
> org.apache.flink.api.java.operators.DataSink[String] = DataSink
> '' (TextOutputFormat (file://path/to/output) - UTF-8)
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>



-- 
Thank You

Regards

Punit Naik


Re: Multiple windows with large number of partitions

2016-05-02 Thread Aljoscha Krettek
Hi,
what do you mean by "still experiencing the same issues"? Is the key count
still very hight, i.e. 500k windows?

For the watermark generation, specifying a lag of 2 days is very
conservative. If the watermark is this conservative I guess there will
never arrive elements that are behind the watermark, thus you wouldn't need
the late-element handling in your triggers. The late-element handling in
Triggers is only required to compensate for the fact that the watermark can
be a heuristic and not always correct.

Cheers,
Aljoscha

On Thu, 28 Apr 2016 at 21:24 Christopher Santiago 
wrote:

> Hi Aljoscha,
>
>
> Aljoscha Krettek wrote
> >>is there are reason for keying on both the "date only" field and the
> "userid". I think you should be fine by just specifying that you want 1-day
> windows on your timestamps.
>
> My mistake, this was from earlier tests that I had performed.  I removed it
> and went to keyBy(2) and I am still experiencing the same issues.
>
>
> Aljoscha Krettek wrote
> >>Also, do you have a timestamp extractor in place that takes the timestamp
> from your data and sets it as the internal timestamp field.
>
> Yes there is, it is from the BoundedOutOfOrdernessGenerator example:
>
> public static class BoundedOutOfOrdernessGenerator implements
> AssignerWithPeriodicWatermarks> {
> private static final long serialVersionUID = 1L;
> private final long maxOutOfOrderness =
> Time.days(2).toMilliseconds();
> private long currentMaxTimestamp;
>
> @Override
> public long extractTimestamp(Tuple3
> element, long previousElementTimestamp) {
> long timestamp = element.f0.getMillis();
> currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
> return timestamp;
> }
>
> @Override
> public Watermark getCurrentWatermark() {
> return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
> }
> }
>
> Thanks,
> Chris
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-windows-with-large-number-of-partitions-tp6521p6562.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Perform a groupBy on an already groupedDataset

2016-05-02 Thread Punit Naik
Hello

I wanted to perform a groupBy on an already grouped dataset. How do I do
this?

-- 
Thank You

Regards

Punit Naik


Re: TypeVariable problems

2016-05-02 Thread Aljoscha Krettek
Hi,
for user functions that have generics, such as you have, you have to
manually specify the types somehow. This can either be done using
InputTypeConfigurable/OutputTypeConfigurable or maybe using
stream.returns().

Cheers,
Aljoscha

On Fri, 29 Apr 2016 at 12:25 Martin Neumann  wrote:

> Hej,
>
> I have a construct of different generic classes stacked on each other to
> create a library (so the type variables get handed on). And I have some
> trouble getting it to work.
> The current offender is a Class with 3 type variables 
> internally it calls:
>
> .fold(new Tuple3<>(keyInit ,new Tuple2(0d,0d), 
> valueFold.getInit()), new 
> CountSumFold(keySelector,valueSelector,valueFold))
>
> I initiated the class with ,NullValue> but
> when I try to run it it I get the exception:
> org.apache.flink.api.common.functions.InvalidTypesException: Type of
> TypeVariable 'K' in 'class se.sics.anomaly.bs.models.CountSumFold' could
> not be determined.
>
> Anyone has an idea on how to fix this problem?
>
> cheers Martin
>
>


Re: TypeVariable problems

2016-05-02 Thread Martin Neumann
Hi Aljosha

Thanks for your answer!
I tried using returns but it does not work since the only place where I
could call it is within the function that has all the generic types so
there is no useful type hint to give. I could make the user hand over the
class definition for the type as well but that would complicate the
interface and I like to avoid that. Is there documentation or example
somewhere so I can see how to use
InputTypeConfigurable/OutputTypeConfigurable.

if you have time you can also reach me on hangout (might be faster than
email). :-)

cheers Martin


On Mon, May 2, 2016 at 11:23 AM, Aljoscha Krettek 
wrote:

> Hi,
> for user functions that have generics, such as you have, you have to
> manually specify the types somehow. This can either be done using
> InputTypeConfigurable/OutputTypeConfigurable or maybe using
> stream.returns().
>
> Cheers,
> Aljoscha
>
> On Fri, 29 Apr 2016 at 12:25 Martin Neumann  wrote:
>
>> Hej,
>>
>> I have a construct of different generic classes stacked on each other to
>> create a library (so the type variables get handed on). And I have some
>> trouble getting it to work.
>> The current offender is a Class with 3 type variables 
>> internally it calls:
>>
>> .fold(new Tuple3<>(keyInit ,new Tuple2(0d,0d), 
>> valueFold.getInit()), new 
>> CountSumFold(keySelector,valueSelector,valueFold))
>>
>> I initiated the class with ,NullValue> but
>> when I try to run it it I get the exception:
>> org.apache.flink.api.common.functions.InvalidTypesException: Type of
>> TypeVariable 'K' in 'class se.sics.anomaly.bs.models.CountSumFold' could
>> not be determined.
>>
>> Anyone has an idea on how to fix this problem?
>>
>> cheers Martin
>>
>>


Re: EMR vCores and slot allocation

2016-05-02 Thread Fabian Hueske
The slot configuration should depend on the complexity of jobs.
Since each slot runs a "slice" of a program, one slot might potentially
execute many concurrent tasks.

For complex jobs you should allocate more than one core for each slot.


2016-05-02 10:12 GMT+02:00 Robert Metzger :

> Hi Ken,
> sorry for the late response. The number of CPU cores we show in the web
> interface is based on what the JVM tells us from
> "Runtime.getRuntime().availableProcessors();". I'm not sure how tthe
> processor count behaves on Amazon VMs.
>
> Given that each of your servers has 8 vCores, I would set the slot count
> to 8 per Taskmanager.
>
>
> On Fri, Apr 29, 2016 at 1:58 AM, Ken Krugler 
> wrote:
>
>> Based on what Flink reports in the JobManager GUI, it looks like it
>> thinks that the EC2 instances I’m using for my EMR jobs only have 4
>> physical cores.
>>
>> Which would make sense, as Amazon describes these servers as having 8
>> vCores.
>>
>> From
>> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html,
>> the recommended configuration would then be 4 slots/TaskManager, yes?
>>
>> Thanks,
>>
>> — Ken
>>
>> --
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> custom big data solutions & training
>> Hadoop, Cascading, Cassandra & Solr
>>
>>
>>
>>
>


Re: Count of Grouped DataSet

2016-05-02 Thread Fabian Hueske
Hi Nirmalya,

the solution with List.size() won't use a combiner and won't be efficient
for large data sets with large groups.
I would recommend to add a 1 and use GroupedDataSet.sum().

2016-05-01 12:48 GMT+02:00 nsengupta :

> Hello all,
>
> This is how I have moved ahead with the implementation of finding count of
> a
> GroupedDataSet:
>
> *val k = envDefault
>   .fromElements((1,1,2,"A"),(1,1,2,"B"),(2,1,3,"B"),(3,1,4,"C"))
>   .groupBy(1,2)
>   .reduceGroup(nextGroup => {
> val asList = nextGroup.toList
> (asList.head._2,asList.head._3,asList.size)
>   })
>
> k.print()*
>
> While this produces the expected output alright, I am not sure if this the
> ideal, idiomatic way to implement what I need. Could you please confirm? If
> there is a better way, I would like to be wiser of course.
>
> -- Nirmalya
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Count-of-Grouped-DataSet-tp6592p6594.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Unable to write stream as csv

2016-05-02 Thread Fabian Hueske
Have you checked the log files as well?

2016-05-01 14:07 GMT+02:00 subash basnet :

> Hello there,
>
> If anyone could help me know why the below *result* DataStream get's
> written as text, but not as csv?. As it's in a tuple format I guess it
> should be the same for both text and csv. It shows no error just simply
> doesn't write to file when result is written as csv.
>
> DataStream> *result* =
> keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
> new FoldFunction>() {
> @Override
> public Tuple2 fold(Tuple2 acc,
> WikipediaEditEvent event) {
> acc.f0 = event.getUser();
> acc.f1 += event.getByteDiff();
> return acc;
> }
> });
>
> *result.writeAsText(.);
> --> It
> is working. **result.writeAsCsv(.);
> ---> It
> is not working. *
>
> Best Regards,
> Subash Basnet
>
> On Wed, Apr 27, 2016 at 4:14 PM, subash basnet  wrote:
>
>> Hello all,
>>
>> I am able to write the Wikipedia edit data to the kafka and as a text
>> file as per the given example of WikipediaAnalysis. But when I try to write
>> it as csv, the blank files initially created never gets filled with data.
>> Below is the code:
>>
>> DataStream> result =
>> keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
>> new FoldFunction>() {
>> @Override
>> public Tuple2 fold(Tuple2 acc,
>> WikipediaEditEvent event) {
>> acc.f0 = event.getUser();
>> acc.f1 += event.getByteDiff();
>> return acc;
>> }
>> });
>>
>> *result.writeAsText("file:///home/softwares/flink-1.0.0/kmeans/streaming/result",
>> FileSystem.WriteMode.OVERWRITE); *-> works
>>
>> *result.writeAsCsv("file:///home/softwares/flink-1.0.0/kmeans/streaming/result",
>> FileSystem.WriteMode.OVERWRITE);* --> doesn't
>> work
>>
>> Why is data getting written to file as text but not as csv?
>>
>> Best Regards,
>> Subash Basnet
>>
>>
>


Re: Any way for Flink Elasticsearch connector reflecting IP change of Elasticsearch cluster?

2016-05-02 Thread Fabian Hueske
Yes, it looks like the connector only creates the connection once when it
starts and fails if the host is no longer reachable.
It should be possible to catch that failure and try to re-open the
connection.
I opened a JIRA for this issue (FLINK-3857).

Would you like to implement the improvement?

2016-05-02 9:38 GMT+02:00 Sendoh :

> Hi,
>
> When using Elasticsearch connector, Is there a way to reflect IP change of
> Elasticsearch cluster?
> We use DNS of Elasticsearch in data sink, e.g. elasticsearch-dev.foo.de.
> However, when we replace the old Elasticsearch cluster with a new one, the
> Elasticsearch connector cannot write into the new one due to IP change.
>
> This is an important feature for us because we don't have to restart Flink
> job. The reason might be Flink-Elasticsearch2 connector looks up for the IP
> from DNS only once.
> Thus, one way might be when the response of writing into Elasticsearch says
> not success, let Flink environment create a new data sink?
>
> We use Flink Elasticsearch-connector2(for Elasticsearch2.x) on AWS
>
> Best,
>
> Sendoh
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Any-way-for-Flink-Elasticsearch-connector-reflecting-IP-change-of-Elasticsearch-cluster-tp6597.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Perform a groupBy on an already groupedDataset

2016-05-02 Thread Fabian Hueske
Grouping a grouped dataset is not supported.
You can group on multiple keys: dataSet.groupBy(1,2).

Can you describe your use case if that does not solve the problem?



2016-05-02 10:34 GMT+02:00 Punit Naik :

> Hello
>
> I wanted to perform a groupBy on an already grouped dataset. How do I do
> this?
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>


Re: Unable to write stream as csv

2016-05-02 Thread Aljoscha Krettek
I think there is a problem with the interaction of legacy OutputFormats and
streaming programs. Flush is not called, the CsvOutputFormat only writes in
flush(), therefore we don't see any results.

On Mon, 2 May 2016 at 11:59 Fabian Hueske  wrote:

> Have you checked the log files as well?
>
> 2016-05-01 14:07 GMT+02:00 subash basnet :
>
>> Hello there,
>>
>> If anyone could help me know why the below *result* DataStream get's
>> written as text, but not as csv?. As it's in a tuple format I guess it
>> should be the same for both text and csv. It shows no error just simply
>> doesn't write to file when result is written as csv.
>>
>> DataStream> *result* =
>> keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
>> new FoldFunction>() {
>> @Override
>> public Tuple2 fold(Tuple2 acc,
>> WikipediaEditEvent event) {
>> acc.f0 = event.getUser();
>> acc.f1 += event.getByteDiff();
>> return acc;
>> }
>> });
>>
>> *result.writeAsText(.);
>> --> It
>> is working. **result.writeAsCsv(.);
>> ---> It
>> is not working. *
>>
>> Best Regards,
>> Subash Basnet
>>
>> On Wed, Apr 27, 2016 at 4:14 PM, subash basnet 
>> wrote:
>>
>>> Hello all,
>>>
>>> I am able to write the Wikipedia edit data to the kafka and as a text
>>> file as per the given example of WikipediaAnalysis. But when I try to write
>>> it as csv, the blank files initially created never gets filled with data.
>>> Below is the code:
>>>
>>> DataStream> result =
>>> keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
>>> new FoldFunction>() {
>>> @Override
>>> public Tuple2 fold(Tuple2 acc,
>>> WikipediaEditEvent event) {
>>> acc.f0 = event.getUser();
>>> acc.f1 += event.getByteDiff();
>>> return acc;
>>> }
>>> });
>>>
>>> *result.writeAsText("file:///home/softwares/flink-1.0.0/kmeans/streaming/result",
>>> FileSystem.WriteMode.OVERWRITE); *-> works
>>>
>>> *result.writeAsCsv("file:///home/softwares/flink-1.0.0/kmeans/streaming/result",
>>> FileSystem.WriteMode.OVERWRITE);* --> doesn't
>>> work
>>>
>>> Why is data getting written to file as text but not as csv?
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>>
>>
>


Re: Perform a groupBy on an already groupedDataset

2016-05-02 Thread Punit Naik
It solved my problem!

On Mon, May 2, 2016 at 3:45 PM, Fabian Hueske  wrote:

> Grouping a grouped dataset is not supported.
> You can group on multiple keys: dataSet.groupBy(1,2).
>
> Can you describe your use case if that does not solve the problem?
>
>
>
> 2016-05-02 10:34 GMT+02:00 Punit Naik :
>
>> Hello
>>
>> I wanted to perform a groupBy on an already grouped dataset. How do I do
>> this?
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>


-- 
Thank You

Regards

Punit Naik


Re: Regarding Broadcast of datasets in streaming context

2016-05-02 Thread Biplob Biswas
Hi Gyula,

I understand more now how this thing might work and its fascinating.
Although I still have one question with the coflatmap function.

First, let me explain what I understand and whether its correct or not: 
1. The connected iterative stream ensures that the coflatmap function
receive the points and the centroids which are broadcasted on each iteration
defined by closewith.

2. So in the coflatmap function, on one map I get the points and on the
other map function i get the centroids which are broadcasted.

Now comes the part I am assuming a bit because I dont understand from the
theory.
3. Assuming I can use the broadcasted centroids, I calculate the nearest
centroid from the streamed point and I update the centroid and only use one
of the collectors to return the updated centroids list back.


The question here is, I am assuming that this operation is not done in
parallel as if streams are sent in parallel how would I ensure correct
update of the centroids as multiple points can try to update the same
centroid in parallel .

I hope I made myself clear with this.

Thanks and Regards
Biplob
Biplob Biswas wrote
> Hi Gyula,
> 
> I read your workaround and started reading about flink iterations,
> coflatmap operators and other things. Now, I do understand a few things
> but the solution you provided is not completely clear to me.
> 
> I understand the following things from your post.
> 1. You initially have a datastream of points, on which you iterate and the
> 'withFeedbackType' defines the type of the connected stream so rather than
> "Points" the type is  "Centroids" now.
> 
> 2.On this connected stream (which I understand, only have the streamed
> points right now), you run a flat map operator. And you mention 
/
> "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
> and update its local centroids (and periodically output the centroids) and
> on the other input would send centroids of other flatmaps and would merge
> them to the local."
/
> I dont understand this part completely, if i am not wrong, you are saying
> that the co flatmap function would have 2 map functions. Now i dont
> understand this part .. as to what specifically am i doing in each map
> function?
> 
> 3. lastly, the updated centroids which came back from the coflatmap
> function is fed back to the stream again and this is the part i get lost
> again ... how is this centroid fed back and if this is fed back what
> happens to the point stream? and if it does somehow is fed back, how do i
> catch it in the coflatmap function? 
> 
> 
> If I understand this a bit, then in your code the first set of centroids
> are created in the coflatmap function and you dont already have a list of
> centroids to start with? Am i assuming it correct?
> 
> I underwent the process of iteration in the Kmeans example from this
> following link:
> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
> 
> and I understand how this is working .. but i am stil not clear how ur
> example is working. 
> 
> Could you please explain it a bit more? with some examples maybe?
> 
> Thanks a lot.
> Gyula Fóra-2 wrote
>> Hi Biplob,
>> 
>> I have implemented a similar algorithm as Aljoscha mentioned.
>> 
>> First things to clarify are the following:
>> There is currently no abstraction for keeping objects (in you case
>> centroids) in a centralized way that can be updated/read by all
>> operators.
>> This would probably be very costly and is actually not necessary in your
>> case.
>> 
>> Broadcast a stream in contrast with other partitioning methods mean that
>> the events will be replicated to all downstream operators. This not a
>> magical operator that will make state available among parallel instances.
>> 
>> Now let me explain what I think you want from Flink and how to do it :)
>> 
>> You have input data stream and a set of centroids to be updated based on
>> the incoming records. As you want to do this in parallel you have an
>> operator (let's say a flatmap) that keeps the centroids locally and
>> updates
>> it on it's inputs. Now you have a set of independently updated centroids,
>> so you want to merge them and update the centroids in each flatmap.
>> 
>> Let's see how to do this. Given that you have your centroids locally,
>> updating them is super easy, so I will not talk about that. The
>> problematic
>> part is periodically merging end "broadcasting" the centroids so all the
>> flatmaps eventually see the same (they don't have to always be the same
>> for
>> clustering probably). There is no operator for sending state (centroids)
>> between subtasks so you have to be clever here. We can actually use
>> cyclic
>> streams to solve this problem by sending the centroids as simple events
>> to
>> a CoFlatMap:
>> 
>> DataStream
>> 
>>  input = ...
>> ConnectedIterativeStreams inputsAndCentroids =
>> i

Re: Checking actual config values used by TaskManager

2016-05-02 Thread Maximilian Michels
Hi Ken,

When you're running Yarn, the Flink configuration is created once and
shared among all nodes (JobManager and TaskManagers). Please have a
look at the JobManager tab on the web interface. It shows you the
configuration.

Cheers,
Max

On Fri, Apr 29, 2016 at 3:18 PM, Ken Krugler
 wrote:
> Hi Timur,
>
> On Apr 28, 2016, at 10:40pm, Timur Fayruzov 
> wrote:
>
> If you're talking about parameters that were set on JVM startup then `ps
> aux|grep flink` on an EMR slave node should do the trick, that'll give you
> the full command line.
>
>
> No, I’m talking about values that come from flink-conf.yaml.
>
> Maybe there’s no good reason to worry, but in Hadoop land you can have
> parameters set via the conf on the client, which in turn get overridden by
> values from conf files on the nodes, which you can then override via command
> line parameters, which in turn can be changed by the user code.
>
> Plus parameters that can be flagged as final/unmodifiable, and thus some of
> the above actually don’t change anything.
>
> So it’s a common issue where what you think you set as a value isn’t
> actually being used, and that’s why examining the job conf that was actually
> deployed with tasks is critical.
>
> — Ken
>
>
>
> On Thu, Apr 28, 2016 at 9:00 PM, Ken Krugler 
> wrote:
>>
>> Hi all,
>>
>> I’m running jobs on EMR via YARN, and wondering how to check exactly what
>> configuration settings are actually being used.
>>
>> This is mostly for the TaskManager.
>>
>> I know I can modify the conf/flink-conf.yaml file, and (via the CLI) I can
>> use -yD param=value.
>>
>> But my experience with Hadoop makes me want to see the exact values being
>> used, versus assuming I know what’s been set :)
>>
>> Thanks,
>>
>> — Ken
>
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>


first() function in DataStream

2016-05-02 Thread subash basnet
Hello all,

In DataSet *first(n)* function can be called to get 'n' no. of elements in
the DataSet, how could similar operations be done in DataStream to get 'n'
no. of elements from the current DataStream.




Best Regards,
Subash Basnet


Re: join performance

2016-05-02 Thread Aljoscha Krettek
Hi Henry,
yes, with early firings you would have the problem of duplicate emission.
I'm afraid I don't have a solution for that right now.

For the "another question" I think you are right that this would be session
windowing. Please have a look at this blog post that I wrote recently:
http://data-artisans.com/session-windowing-in-flink/. And please get back
to us if you have more questions or feedback.

Cheers,
Aljoscha

On Fri, 29 Apr 2016 at 19:18 Henry Cai  wrote:

> So is the window defined as hour-window or second-window?
>
> If I am using hour-window, I guess I need to modify the trigger to fire
> early (e.g. every minute)?  But I don't want to repeatedly emit the same
> joined records for every minute (i.e. on 2nd minute, I only want to emit
> the changes introduced by new coming records between 1st and 2nd minute)
>
> If I am using second-window, I wasn't sure why the record will still be
> put into the correct window based on hour gap?
>
> Another question is on which type of window, I need to match record a from
> stream a to record b in stream b if abs(a.time - b.time) < 1-hour, so it's
> not really a tumbling window on absolute wall clock, is this a session
> window?
>
> On Fri, Apr 29, 2016 at 4:36 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> you are right, everything will be emitted in a huge burst at the end of
>> the hour. If you want to experiment a bit you can write a custom Trigger
>> based on EventTimeTrigger that will delay firing of windows. You would
>> change onEventTime() to not fire but instead register a processing-time
>> timer at a random point in the future. Then, in onProcessingTime() you
>> would trigger the actual window processing. Elements will still be put into
>> the correct windows based on event time, just the firing of the windows
>> will change by doing this.
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 29 Apr 2016 at 08:53 Henry Cai  wrote:
>>
>>> But the join requirement is to match the records from two streams
>>> occurring within one hour (besides the normal join key condition), if I use
>>> the second join window, those records wouldn't be in the same window any
>>> more.
>>>
>>>
>>>
>>> On Thu, Apr 28, 2016 at 11:47 PM, Ashutosh Kumar <
>>> kmr.ashutos...@gmail.com> wrote:
>>>
 Time unit can be in seconds as well. Is there specific need to get
 bursts hourly?

 On Fri, Apr 29, 2016 at 11:48 AM, Henry Cai  wrote:

> For the below standard stream/stream join, does flink store the
> results of stream 1 and stream 2 into state store for the current hour and
> at the end of the hour window it will fire the window by iterating through
> all stored elements in the state store to find join matches?
>
> My concern is during most of the time in the hour, the output
> (assuming the output is going to another stream) will be idle and on each
> hour mark there will be huge outputs of joined records emitted, any way to
> make it more gradual?
>
>
> dataStream.join(otherStream)
> .where(0).equalTo(1)
> .window(TumblingEventTimeWindows.of(Time.hours(1)))
> .apply (new JoinFunction () {...});
>
>

>>>
>


Re: Flink Iterations Ordering

2016-05-02 Thread Aljoscha Krettek
Hi,
as I understand it the order of elements will not be preserved across
iteration supersets. But maybe some-one else knows more.

Cheers,
Aljoscha

On Thu, 28 Apr 2016 at 00:23 David Kim 
wrote:

> Hello all,
>
> I read the documentation at [1] on iterations and had a question on
> whether an assumption is safe to make.
>
> As partial solutions are continuously looping through the step function,
> when new elements are added as iteration inputs will the insertion order of
> all of the elements be preserved?
>
> Example:
>
> Current partial solutions going through step function are: A, B, C.
>
> At a later time we add (in order) elements D, E.
>
> Will the iteration result always be A,B,C,D,E?
>
> References:
>
> [1] 
> *https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html
> *
>


Re: Understanding Sliding Windows

2016-05-02 Thread Aljoscha Krettek
Hi,
there is no way to skip the first 5 minutes since Flink doesn't know where
your "time" begins. Elements are just put into window "buckets" that are
emitted at the appropriate time.

Cheers,
Aljoscha

On Wed, 27 Apr 2016 at 07:01 Piyush Shrivastava 
wrote:

> Hello Dominik,
>
> Thanks for the information. Since my window is getting triggered every 10
> seconds, the results I am getting before 5 minutes would be irrelevant as I
> need to consider data coming in every 5 minutes. Is there a way I can skip
> the results that are output before the first 5 minutes?
> Thanks and Regards,
> Piyush Shrivastava 
> [image: WeboGraffiti]
> http://webograffiti.com
>
>
> On Tuesday, 26 April 2016 8:54 PM, Dominik Choma 
> wrote:
>
>
> Piyush,
>
> You created sliding window witch is triggered every 10 seconds
> Flink fires up this window every 10 seconds, without waiting at 5 min
> buffer to be filled up
> It seems to me that first argument is rather "maximum data buffer
> retention" than " the initial threshold"
>
> Dominik
>
>
>
> Dominik
>
> 2016-04-26 12:16 GMT+02:00 Piyush Shrivastava :
>
> Hi all,
> I wanted to know how exactly sliding windows produce results in Flink.
> Suppose I create a sliding window of 5 minutes which is refreshed in every
> 10 seconds:
>
> .timeWindow(Time.minutes(5), Time.seconds(10))
>
> So in every 10 seconds we are looking at data from the past 5 minutes. But
> what happens before the initial 5 minutes have passed?
> Suppose we start the computation at 10:00. At 10:05 we will get the result
> for 10:00-10:05. But what are the results which we get in between this?
> i.e. at 10:00:10, 10:00:20 and so on.
> Basically why do Flink start producing results before the initial
> threshold has passed? What do these results signify?
>
> Thanks and Regards,
> Piyush Shrivastava 
> [image: WeboGraffiti]
> http://webograffiti.com
>
>
>
>
>


Re: Regarding Broadcast of datasets in streaming context

2016-05-02 Thread Gyula Fóra
Hey,

I think you got the good idea :)

So your coflatmap will get all the centroids that you have sent to the
stream in the closeWith call. This means that whenever you collect a new
set of centroids they will be iterated back. This means you don't always
want to send the centroids out on the collector, only periodically.

The order in which these come is pretty much arbitrary so you need to make
sure to add some logic by which you can order it if this is important.

Im not sure if this helped or not :D

Gyula

Biplob Biswas  ezt írta (időpont: 2016. máj. 2.,
H, 13:13):

> Hi Gyula,
>
> I understand more now how this thing might work and its fascinating.
> Although I still have one question with the coflatmap function.
>
> First, let me explain what I understand and whether its correct or not:
> 1. The connected iterative stream ensures that the coflatmap function
> receive the points and the centroids which are broadcasted on each
> iteration
> defined by closewith.
>
> 2. So in the coflatmap function, on one map I get the points and on the
> other map function i get the centroids which are broadcasted.
>
> Now comes the part I am assuming a bit because I dont understand from the
> theory.
> 3. Assuming I can use the broadcasted centroids, I calculate the nearest
> centroid from the streamed point and I update the centroid and only use one
> of the collectors to return the updated centroids list back.
>
>
> The question here is, I am assuming that this operation is not done in
> parallel as if streams are sent in parallel how would I ensure correct
> update of the centroids as multiple points can try to update the same
> centroid in parallel .
>
> I hope I made myself clear with this.
>
> Thanks and Regards
> Biplob
> Biplob Biswas wrote
> > Hi Gyula,
> >
> > I read your workaround and started reading about flink iterations,
> > coflatmap operators and other things. Now, I do understand a few things
> > but the solution you provided is not completely clear to me.
> >
> > I understand the following things from your post.
> > 1. You initially have a datastream of points, on which you iterate and
> the
> > 'withFeedbackType' defines the type of the connected stream so rather
> than
> > "Points" the type is  "Centroids" now.
> >
> > 2.On this connected stream (which I understand, only have the streamed
> > points right now), you run a flat map operator. And you mention
> /
> > "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
> > and update its local centroids (and periodically output the centroids)
> and
> > on the other input would send centroids of other flatmaps and would merge
> > them to the local."
> /
> > I dont understand this part completely, if i am not wrong, you are saying
> > that the co flatmap function would have 2 map functions. Now i dont
> > understand this part .. as to what specifically am i doing in each map
> > function?
> >
> > 3. lastly, the updated centroids which came back from the coflatmap
> > function is fed back to the stream again and this is the part i get lost
> > again ... how is this centroid fed back and if this is fed back what
> > happens to the point stream? and if it does somehow is fed back, how do i
> > catch it in the coflatmap function?
> >
> >
> > If I understand this a bit, then in your code the first set of centroids
> > are created in the coflatmap function and you dont already have a list of
> > centroids to start with? Am i assuming it correct?
> >
> > I underwent the process of iteration in the Kmeans example from this
> > following link:
> >
> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
> >
> > and I understand how this is working .. but i am stil not clear how ur
> > example is working.
> >
> > Could you please explain it a bit more? with some examples maybe?
> >
> > Thanks a lot.
> > Gyula Fóra-2 wrote
> >> Hi Biplob,
> >>
> >> I have implemented a similar algorithm as Aljoscha mentioned.
> >>
> >> First things to clarify are the following:
> >> There is currently no abstraction for keeping objects (in you case
> >> centroids) in a centralized way that can be updated/read by all
> >> operators.
> >> This would probably be very costly and is actually not necessary in your
> >> case.
> >>
> >> Broadcast a stream in contrast with other partitioning methods mean that
> >> the events will be replicated to all downstream operators. This not a
> >> magical operator that will make state available among parallel
> instances.
> >>
> >> Now let me explain what I think you want from Flink and how to do it :)
> >>
> >> You have input data stream and a set of centroids to be updated based on
> >> the incoming records. As you want to do this in parallel you have an
> >> operator (let's say a flatmap) that keeps the centroids locally and
> >> updates
> >> it on it's inputs. Now you have a set of independently updated
> c

Re: Regarding Broadcast of datasets in streaming context

2016-05-02 Thread Biplob Biswas
Hi Gyula,

Could you explain a bit why i wouldn't want the centroids to be collected
after every point? 

I mean, once I get a streamed point via map1 function .. i would want to
compare the distance of the point with a centroid which arrives via map2
function and i keep on comparing for every centroid which comes in
subsequently, once the update of the centroid happens shouldn't i collect
the entire set? Thus, updating a centroid and collecting it back for the
next point in the iteration.

I may not be getting the concept properly here, so an example snippet would
help in a long run. 

Thanks & Regards
Biplob
Gyula Fóra wrote
> Hey,
> 
> I think you got the good idea :)
> 
> So your coflatmap will get all the centroids that you have sent to the
> stream in the closeWith call. This means that whenever you collect a new
> set of centroids they will be iterated back. This means you don't always
> want to send the centroids out on the collector, only periodically.
> 
> The order in which these come is pretty much arbitrary so you need to make
> sure to add some logic by which you can order it if this is important.
> 
> Im not sure if this helped or not :D
> 
> Gyula
> 
> Biplob Biswas <

> revolutionisme@

> > ezt írta (időpont: 2016. máj. 2.,
> H, 13:13):
> 
>> Hi Gyula,
>>
>> I understand more now how this thing might work and its fascinating.
>> Although I still have one question with the coflatmap function.
>>
>> First, let me explain what I understand and whether its correct or not:
>> 1. The connected iterative stream ensures that the coflatmap function
>> receive the points and the centroids which are broadcasted on each
>> iteration
>> defined by closewith.
>>
>> 2. So in the coflatmap function, on one map I get the points and on the
>> other map function i get the centroids which are broadcasted.
>>
>> Now comes the part I am assuming a bit because I dont understand from the
>> theory.
>> 3. Assuming I can use the broadcasted centroids, I calculate the nearest
>> centroid from the streamed point and I update the centroid and only use
>> one
>> of the collectors to return the updated centroids list back.
>>
>>
>> The question here is, I am assuming that this operation is not done in
>> parallel as if streams are sent in parallel how would I ensure correct
>> update of the centroids as multiple points can try to update the same
>> centroid in parallel .
>>
>> I hope I made myself clear with this.
>>
>> Thanks and Regards
>> Biplob
>> Biplob Biswas wrote
>> > Hi Gyula,
>> >
>> > I read your workaround and started reading about flink iterations,
>> > coflatmap operators and other things. Now, I do understand a few things
>> > but the solution you provided is not completely clear to me.
>> >
>> > I understand the following things from your post.
>> > 1. You initially have a datastream of points, on which you iterate and
>> the
>> > 'withFeedbackType' defines the type of the connected stream so rather
>> than
>> > "Points" the type is  "Centroids" now.
>> >
>> > 2.On this connected stream (which I understand, only have the streamed
>> > points right now), you run a flat map operator. And you mention
>> /
>> > "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
>> events
>> > and update its local centroids (and periodically output the centroids)
>> and
>> > on the other input would send centroids of other flatmaps and would
>> merge
>> > them to the local."
>> /
>> > I dont understand this part completely, if i am not wrong, you are
>> saying
>> > that the co flatmap function would have 2 map functions. Now i dont
>> > understand this part .. as to what specifically am i doing in each map
>> > function?
>> >
>> > 3. lastly, the updated centroids which came back from the coflatmap
>> > function is fed back to the stream again and this is the part i get
>> lost
>> > again ... how is this centroid fed back and if this is fed back what
>> > happens to the point stream? and if it does somehow is fed back, how do
>> i
>> > catch it in the coflatmap function?
>> >
>> >
>> > If I understand this a bit, then in your code the first set of
>> centroids
>> > are created in the coflatmap function and you dont already have a list
>> of
>> > centroids to start with? Am i assuming it correct?
>> >
>> > I underwent the process of iteration in the Kmeans example from this
>> > following link:
>> >
>> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
>> >
>> > and I understand how this is working .. but i am stil not clear how ur
>> > example is working.
>> >
>> > Could you please explain it a bit more? with some examples maybe?
>> >
>> > Thanks a lot.
>> > Gyula Fóra-2 wrote
>> >> Hi Biplob,
>> >>
>> >> I have implemented a similar algorithm as Aljoscha mentioned.
>> >>
>> >> First things to clarify are the following:
>> >> There is currently no abstraction for keeping objects (in you case

Measuring latency in a DataStream

2016-05-02 Thread Robert Schmidtke
Hi everyone,

I have implemented a way to measure latency in a DataStream (I hope): I'm
consuming a Kafka topic and I'm union'ing the resulting stream with a
custom source that emits a (machine-local) timestamp every 1000ms (using
currentTimeMillis). On the consuming end I'm distinguishing between the
Kafka events and the timestamps. When encountering a timestamp, I take the
difference of the processing machine's local time and the timestamp found
in the stream, expecting a positive difference (with the processing
machine's timestamp being larger than the timestamp found in the stream).
However, the opposite is the case. Now I am wondering about when events are
actually processed.

Union the Stream from Kafka+my custom source, batching them in 10s windows
(which is what I do), I expect 10 timestamps with ascending values and a
rough gap of 1000ms in the stream:
https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68

On the receiving end I again take the currentTimeMillis in my fold
function, expecting the resulting value to be larger (most of the time)
than the timestamps encountered in the stream:
https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53

The system clocks are in sync up to 1ms.

Maybe I am not clear about when certain timestamps are created (i.e. when
the UDFs are invoked) or how windows are processed. Any advice is greatly
appreciated, also alternative approaches to calculating latency.

I'm on Flink 0.10.2 by the way.

Thanks in advance for the help!

Robert

-- 
My GPG Key ID: 336E2680


RE: Flink on Azure HDInsight

2016-05-02 Thread Brig Lamoreaux
Thanks Stephan,

Turns out Azure Table is slightly different than Azure HDInsight. Both use 
Azure Storage however, HDInsight allows HDFS over Azure Storage.

I’d be curious if anyone has tried to use Flink on top of Azure HDInsight.

Thanks,
Brig

From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan 
Ewen
Sent: Saturday, April 30, 2016 9:36 PM
To: user@flink.apache.org
Subject: Re: Flink on Azure HDInsight

Hi!

As far as I know, some people have been using Flink together with Azure, and we 
try and do some release validation on Azure as well.

There is even a section in the docs that describes how to use Hadoop's Azure 
Table formats with Flink
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/connectors.html#access-microsoft-azure-table-storage

I am not aware of any Azure specific issues at this point...

Greetings,
Stephan



On Fri, Apr 29, 2016 at 11:18 AM, Brig Lamoreaux 
mailto:brig.lamore...@microsoft.com>> wrote:
Hi All,

Are there any issues with Flink on Azure HDInsight?

Thanks,
Brig Lamoreaux

Data Solution Architect
US Desert/Mountain Tempe




[MSFT_logo_Gray DE sized SIG1.png]








S3 Checkpoint Storage

2016-05-02 Thread John Sherwood
Hello all,

I'm attempting to set up a taskmanager cluster using S3 as the
highly-available store. It looks like the main thing is just setting the `
state.backend.fs.checkpointdir` to the appropriate s3:// URI, but as
someone rather new to accessing S3 from Java, how should I provide Flink
with the credentials necessary to access the bucket I've set up?

Thanks for your time & any help!


Re: Count of Grouped DataSet

2016-05-02 Thread nsengupta
Hello Fabian,

Thanks for taking time to provide your recommendation, This is how I have
implemented:

case class Something(f1: Int,f2: Int,f3: Int,f4: String ) // My
application's data structure

*val k = envDefault
 
.fromElements(Something(1,1,2,"A"),Something(1,1,2,"B"),Something(2,1,3,"A"),Something(3,1,4,"C"))
  .map(e => (e.f1, e.f2, e.f3, e.f4,1))  // I create a temporary tuple
  .groupBy(1,2)
  .sum(4)
  .map(e => (Something(e._1,e._2,e._3,e._4),e._5))
  .print*

The output is 
*(Something(2,1,3,A),1)
(Something(1,1,2,B),2)
(Something(3,1,4,C),1)*

I need to create a temporary tuple, because I need group by fields of the
case class; yet, I need to sum the fifth (newly added) field. Somehow, I
feel this is clunky! 

Is this a preferred way? Is there a better (performant, yet idiomatic) way?
Please make me wiser.

-- Nirmalya



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Count-of-Grouped-DataSet-tp6592p6623.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Problem in creating quickstart project using archetype (Scala)

2016-05-02 Thread nsengupta
Fantastic! Many thanks for clarifying, Aljoscha! 

I blindly followed what that page said. Instead, I should have tried with
the stable version.

- Nirmalya



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-in-creating-quickstart-project-using-archetype-Scala-tp6560p6624.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: S3 Checkpoint Storage

2016-05-02 Thread Fabian Hueske
Hi John,

S3 keys are configured via Hadoop's configuration files.
Check out the documentation for AWS setups [1].

Cheers, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html

2016-05-02 20:22 GMT+02:00 John Sherwood :

> Hello all,
>
> I'm attempting to set up a taskmanager cluster using S3 as the
> highly-available store. It looks like the main thing is just setting the `
> state.backend.fs.checkpointdir` to the appropriate s3:// URI, but as
> someone rather new to accessing S3 from Java, how should I provide Flink
> with the credentials necessary to access the bucket I've set up?
>
> Thanks for your time & any help!
>


Re: Multiple windows with large number of partitions

2016-05-02 Thread Christopher Santiago
Hi Aljoscha,

Yes, there is still a high partition/window count since I have to keyby the
userid so that I get unique users.  I believe what I see happening is that
the second window with the timeWindowAll is not getting all the results or
the results from the previous window are changing when the second window is
running.  I can see the date/unique user count increase and decrease as it
is running for a particular day.

I can share the eclipse project and the sample data file I am working off
of with you if that would be helpful.

Thanks,
Chris

On Mon, May 2, 2016 at 12:55 AM, Aljoscha Krettek [via Apache Flink User
Mailing List archive.]  wrote:

> Hi,
> what do you mean by "still experiencing the same issues"? Is the key count
> still very hight, i.e. 500k windows?
>
> For the watermark generation, specifying a lag of 2 days is very
> conservative. If the watermark is this conservative I guess there will
> never arrive elements that are behind the watermark, thus you wouldn't need
> the late-element handling in your triggers. The late-element handling in
> Triggers is only required to compensate for the fact that the watermark can
> be a heuristic and not always correct.
>
> Cheers,
> Aljoscha
>
> On Thu, 28 Apr 2016 at 21:24 Christopher Santiago <[hidden email]
> > wrote:
>
>> Hi Aljoscha,
>>
>>
>> Aljoscha Krettek wrote
>> >>is there are reason for keying on both the "date only" field and the
>> "userid". I think you should be fine by just specifying that you want
>> 1-day
>> windows on your timestamps.
>>
>> My mistake, this was from earlier tests that I had performed.  I removed
>> it
>> and went to keyBy(2) and I am still experiencing the same issues.
>>
>>
>> Aljoscha Krettek wrote
>> >>Also, do you have a timestamp extractor in place that takes the
>> timestamp
>> from your data and sets it as the internal timestamp field.
>>
>> Yes there is, it is from the BoundedOutOfOrdernessGenerator example:
>>
>> public static class BoundedOutOfOrdernessGenerator implements
>> AssignerWithPeriodicWatermarks> {
>> private static final long serialVersionUID = 1L;
>> private final long maxOutOfOrderness =
>> Time.days(2).toMilliseconds();
>> private long currentMaxTimestamp;
>>
>> @Override
>> public long extractTimestamp(Tuple3
>> element, long previousElementTimestamp) {
>> long timestamp = element.f0.getMillis();
>> currentMaxTimestamp = Math.max(timestamp,
>> currentMaxTimestamp);
>> return timestamp;
>> }
>>
>> @Override
>> public Watermark getCurrentWatermark() {
>> return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>> }
>> }
>>
>> Thanks,
>> Chris
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-windows-with-large-number-of-partitions-tp6521p6562.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-windows-with-large-number-of-partitions-tp6521p6601.html
> To unsubscribe from Multiple windows with large number of partitions, click
> here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-windows-with-large-number-of-partitions-tp6521p6626.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Measuring latency in a DataStream

2016-05-02 Thread Igor Berman
1. why are you doing join instead of something like
System.currentTimeInMillis()? at the end you have tuple of your data with
timestamp anyways...so why just not to wrap you data in tuple2 with
additional info of creation ts?

2. are you sure that consumer/producer machines' clocks are in sync?
you can use ntp for this.

On 2 May 2016 at 20:02, Robert Schmidtke  wrote:

> Hi everyone,
>
> I have implemented a way to measure latency in a DataStream (I hope): I'm
> consuming a Kafka topic and I'm union'ing the resulting stream with a
> custom source that emits a (machine-local) timestamp every 1000ms (using
> currentTimeMillis). On the consuming end I'm distinguishing between the
> Kafka events and the timestamps. When encountering a timestamp, I take the
> difference of the processing machine's local time and the timestamp found
> in the stream, expecting a positive difference (with the processing
> machine's timestamp being larger than the timestamp found in the stream).
> However, the opposite is the case. Now I am wondering about when events are
> actually processed.
>
> Union the Stream from Kafka+my custom source, batching them in 10s windows
> (which is what I do), I expect 10 timestamps with ascending values and a
> rough gap of 1000ms in the stream:
>
> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68
>
> On the receiving end I again take the currentTimeMillis in my fold
> function, expecting the resulting value to be larger (most of the time)
> than the timestamps encountered in the stream:
>
> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53
>
> The system clocks are in sync up to 1ms.
>
> Maybe I am not clear about when certain timestamps are created (i.e. when
> the UDFs are invoked) or how windows are processed. Any advice is greatly
> appreciated, also alternative approaches to calculating latency.
>
> I'm on Flink 0.10.2 by the way.
>
> Thanks in advance for the help!
>
> Robert
>
> --
> My GPG Key ID: 336E2680
>


Scala compilation error

2016-05-02 Thread Srikanth
Hello,

I'm fac

val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic",
new SimpleStringSchema(), properties))
val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
BidderRawLogs(b)).keyBy(b => b.strategyId)

val metaStrategy: KeyedStream[(Int, String), Int] =
env.readTextFile("path").name("Strategy")
 .map((1, _) ).keyBy(_._1)

val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
(Int, BidderRawLogs, (Int, String))] =
 new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
staticTypeInfo)
val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
{}.getTypeInfo()

val funName = "test"
val joinedStream = bidderStream.connect(metaStrategy)
.transform(funName, joinOperator, outTypeInfo)


Re: Scala compilation error

2016-05-02 Thread Srikanth
Sorry for the previous incomplete email. Didn't realize I hit send!

I was facing a weird compilation error in Scala when I did
val joinedStream = stream1.connect(stream2)
.transform("funName", outTypeInfo, joinOperator)

It turned out to be due to a difference in API signature between Scala and
Java API. I was refering to javadoc. Is there a scaladoc?

Java API has
public  SingleOutputStreamOperator transform(
String functionName,
TypeInformation outTypeInfo,
TwoInputStreamOperator operator)

Scala API has
def transform[R: TypeInformation](
  functionName: String,
  operator: TwoInputStreamOperator[IN1, IN2, R])

Srikanth

On Mon, May 2, 2016 at 7:18 PM, Srikanth  wrote:

> Hello,
>
> I'm fac
>
> val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic",
> new SimpleStringSchema(), properties))
> val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
> BidderRawLogs(b)).keyBy(b => b.strategyId)
>
> val metaStrategy: KeyedStream[(Int, String), Int] =
> env.readTextFile("path").name("Strategy")
>  .map((1, _) ).keyBy(_._1)
>
> val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
> val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
> val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
> (Int, BidderRawLogs, (Int, String))] =
>  new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
> staticTypeInfo)
> val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
> {}.getTypeInfo()
>
> val funName = "test"
> val joinedStream = bidderStream.connect(metaStrategy)
> .transform(funName, joinOperator, outTypeInfo)
>
>


Re: How to perform this join operation?

2016-05-02 Thread Elias Levy
Thanks for the suggestion.  I ended up implementing it a different way.

What is needed is a mechanism to give each stream a different window
assigner, and then let Flink perform the join normally given the assigned
windows.

Specifically, for my use case what I need is a sliding window for one
stream and a trailing window for the other stream.  A trailing window is
just a TimeWindow where the window end time is the event time, rounded up
or down some amount, and the window start time is is end time minus some
given parameter.

For instance:

class TrailingEventTimeWindows(asize: Long, around: Long) extends
WindowAssigner[Object, TimeWindow] {
  val size  = asize
  val round = around

  override def assignWindows(element: Object, timestamp: Long):
Collection[TimeWindow] = {
if (timestamp > java.lang.Long.MIN_VALUE) {
  val end = (timestamp - (timestamp % round)) + round
  Collections.singletonList(new TimeWindow(end - size, end))
} else {
  // Long.MIN_VALUE is currently assigned when no timestamp is present
  throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no
timestamp marker). " +
  "Is the time characteristic set to 'ProcessingTime', or did you
forget to call " +
  "'DataStream.assignTimestampsAndWatermarks(...)'?")
}
  }

  def getSize: Long = size

  override def getDefaultTrigger(env: JStreamExecutionEnvironment):
Trigger[Object, TimeWindow] = EventTimeTrigger.create()

  override def toString: String = s"TrailingEventTimeWindows($size)"

  override def getWindowSerializer(executionConfig: ExecutionConfig):
TypeSerializer[TimeWindow] = new TimeWindow.Serializer()
}

object TrailingEventTimeWindows {
  def of(size: Time, round: Time) = new
TrailingEventTimeWindows(size.toMilliseconds(), round.toMilliseconds())
}



If the Flink API where different, then I could do something like this to
join the streams:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// (time, key,  id)
val events: DataStream[(Int,Int,Int)] = env.fromElements( (1000, 100,
10), (2000, 200, 10), (3000, 100, 20), (4000, 300, 30), (7000, 100, 40) )
// (time, file)
val changes: DataStream[(Int,Int)] = env.fromElements( (2000, 300),
(4000, 100) )

val windowedKeyedChanges = changes
  .assignAscendingTimestamps( _._1 )
  .keyBy(1)
  .window(TrailingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)))

val windowedKeyedEvents =
  events.assignAscendingTimestamps( _._1 )
.keyBy(2)
.timeWindow(Time.seconds(5), Time.seconds(1))

val results = windowedKeyedEvents.join(windowedKeyedChanges)
  .apply { }


Alas, the Flink API makes this more complicated.  Instead of allowing you
to joined to keyed windowed streams, you join two unkeyed unwind owed
streams and tell it how to key them and join them using
join().where().equalTo().window().  Since that construct only takes a
single WindowAssigner I created a window assigner that uses a different
assigner for each stream being joined:

class DualWindowAssigner[T1 <: Object, T2 <: Object](assigner1:
WindowAssigner[Object, TimeWindow], assigner2: WindowAssigner[Object,
TimeWindow]) extends WindowAssigner[Object, TimeWindow] {
  val windowAssigner1 = assigner1
  val windowAssigner2 = assigner2

  override def assignWindows(element: Object, timestamp: Long):
Collection[TimeWindow] = {
val e = element.asInstanceOf[TaggedUnion[T1,T2]]
if (e.isOne) {
  windowAssigner1.assignWindows(e.getOne, timestamp)
} else {
  windowAssigner2.assignWindows(e.getTwo, timestamp)
}
  }

  override def getDefaultTrigger(env: JStreamExecutionEnvironment):
Trigger[Object, TimeWindow] = EventTimeTrigger.create()

  override def toString: String = s"DualWindowAssigner"

  override def getWindowSerializer(executionConfig: ExecutionConfig):
TypeSerializer[TimeWindow] = new TimeWindow.Serializer()
}


Then I can do:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// (time, key,  id)
val events: DataStream[(Int,Int,Int)] = env.fromElements( (1000, 100,
10), (1500, 300, 20), (2000, 200, 10), (3000, 100, 20), (4000, 300, 30),
(7000, 100, 40) )
// (time, key)
val changes: DataStream[(Int,Int)] = env.fromElements( (2000, 300),
(4000, 100) )

val eventsWithTime = events.assignAscendingTimestamps( _._1 )
val changesWithTime = changes.assignAscendingTimestamps( _._1 )

val results = eventsWithTime.join(changesWithTime)
  .where( _._2 ).equalTo( _._2 )
  .window(new DualWindowAssigner[Tuple3[Int,Int,Int],Tuple2[Int,Int]](
SlidingEventTimeWindows.of( Time.seconds(4), Time.seconds(1)),
TrailingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))
  ))
  .apply { (x1, x2) => (x1, x2) }

results.print()


This works as Flink will consider two TimeWindows the sam

TimeWindow overload?

2016-05-02 Thread Elias Levy
Looking over the code, I see that Flink creates a TimeWindow object each
time the WindowAssigner is created.  I have not yet tested this, but I am
wondering if this can become problematic if you have a very long sliding
window with a small slide, such as a 24 hour window with a 1 minute slide.
It seems this would create 1,440 TimeWindow objects per event.  Event a low
event rates this would seem to result in an explosion of TimeWindow
objects: at 1,000 events per second, you'd be creating 1,440,000 TImeWindow
objects.  After 24 hours you'd have nearly 125 billion TM objects that
would just begin to be purged.

Does this analysis seem right?

I suppose that means you should not use long length sliding window with
small slides.


Insufficient number of network buffers

2016-05-02 Thread Tarandeep Singh
Hi,

I have written ETL jobs in Flink (DataSet API). When I execute them in IDE,
they run and finish fine. When I try to run them on my cluster, I get
"Insufficient number of network buffers" error.

I have 5 machines in my cluster with 4 cores each. TaskManager is given 3GB
each. I increased the number of buffers to 5000, but got the same error.
When I increased it further (say 7500), I get  exception listed below.

The DAG or execution plan is pretty big. What is recommended way to run
your jobs when the DAG becomes huge? Shall I break it into parts by calling
execute on execution environment in between jobs ?

Thanks,
Tarandeep

Exception I got after I tried to run with 7500 buffers:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: Update task on instance
d4f3f517b33e5fa8a9932fc06a0aef3b @ dev-cluster-slave1 - 4 slots - URL:
akka.tcp://flink@172.22.13.39:52046/user/taskmanager failed due to:
at
org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
at akka.dispatch.OnFailure.internal(Future.scala:228)
at akka.dispatch.OnFailure.internal(Future.scala:227)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
... 2 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@172.22.13.39:52046/user/taskmanager#-1857397999]]
after [1 ms]
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)