Re: Question regarding parallelism

2015-10-22 Thread Stephan Ewen
Hi!

The bottom of this page also has an illustration of task to task slots.
https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/config.html

There are two optimizations involved:

(1) Chaining:

Here sources, mappers, filters are chained together. This is pretty
classic, most systems do something like this to reduce thread communication
overhead. You can always decide that you do not want two operators to be in
teh same chain by calling "startNewChain()"

(2) Slot sharing

This is an optimization to not have small tasks occupy entire slots (or
JVMs), and make it easier to reason how many slots are needed, which is
basically max-parallelism many. One slot can hold by default one of each
operators, but not two of the same kind (such as two instances of the same
source).

You can always say that you do not want to share a slot by calling
"startNewResourceGroup()".

Hope that helps!

Greetings,
Stephan




On Wed, Oct 21, 2015 at 11:34 PM, Ufuk Celebi  wrote:

> Hey Jerry,
>
> On Wed, Oct 21, 2015 at 11:11 PM, Jerry Peng 
> wrote:
>>
>> When I submit the job, the number of task slots that gets used
>> (displayed on the UI) is only 20.  Why is that? The total number of
>> tasks listed on the ui is 55.
>
>
> Do you mean the number of task slots is 55 (you just wrote tasks)?
>
> Each task slot runs a pipeline of parallel sub tasks. In your case the
> number of used task slots corresponds to the maximum parallelism of the
> job, which is 20. You can have a look at [1]. There is a figure giving an
> example.
>
>
>> And also why does the
>> filter->project->flatmap get compress into one operator with a
>> parallelism of 20?  Can I not set the individual operators (i.e.
>> filter and project) to have an individual parallelism of 20?
>>
>
> This is an optimisation, which drastically reduces the overhead for the
> data exchange between operators. It skips serialisation and results in a
> simple chain of local method calls. This is possible, because all operators
> just forward their data. You can disable it via
> env.disableOperatorChaining().
>
>
> Does this help?
>
> – Ufuk
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-0.9/internals/job_scheduling.html
>


Reading multiple datasets with one read operation

2015-10-22 Thread Pieter Hameete
Good morning!

I have the following usecase:

My program reads nested data (in this specific case XML) based on
projections (path expressions) of this data. Often multiple paths are
projected onto the same input. I would like each path to result in its own
dataset.

Is it possible to generate more than 1 dataset using a readFile operation
to prevent reading the input twice?

I have thought about a workaround where the InputFormat would return
Tuple2s and the first field is the name of the dataset to which a record
belongs. This would however require me to filter the read data once for
each dataset or to do a groupReduce which is some overhead i'm looking to
prevent.

Is there a better (less overhead) workaround for doing this? Or is there
some mechanism in Flink that would allow me to do this?

Cheers!

- Pieter


Re: Reading multiple datasets with one read operation

2015-10-22 Thread Till Rohrmann
Hi Pieter,

at the moment there is no support to partition a `DataSet` into multiple
sub sets with one pass over it. If you really want to have distinct data
sets for each path, then you have to filter, afaik.

Cheers,
Till

On Thu, Oct 22, 2015 at 11:38 AM, Pieter Hameete  wrote:

> Good morning!
>
> I have the following usecase:
>
> My program reads nested data (in this specific case XML) based on
> projections (path expressions) of this data. Often multiple paths are
> projected onto the same input. I would like each path to result in its own
> dataset.
>
> Is it possible to generate more than 1 dataset using a readFile operation
> to prevent reading the input twice?
>
> I have thought about a workaround where the InputFormat would return
> Tuple2s and the first field is the name of the dataset to which a record
> belongs. This would however require me to filter the read data once for
> each dataset or to do a groupReduce which is some overhead i'm looking to
> prevent.
>
> Is there a better (less overhead) workaround for doing this? Or is there
> some mechanism in Flink that would allow me to do this?
>
> Cheers!
>
> - Pieter
>


Re: Reading multiple datasets with one read operation

2015-10-22 Thread Gábor Gévay
Hello!

> I have thought about a workaround where the InputFormat would return
> Tuple2s and the first field is the name of the dataset to which a record
> belongs. This would however require me to filter the read data once for
> each dataset or to do a groupReduce which is some overhead i'm
> looking to prevent.

I think that those two filters might not have that much overhead,
because of several optimizations Flink does under the hood:
- The dataset of Tuple2s won't be materialized, but instead will be
streamed directly to the two filter operators.
- The input format and the two filters will probably end up on the
same machine, because of chaining, so there won't be
serialization/deserialization between them.

Best,
Gabor



2015-10-22 11:38 GMT+02:00 Pieter Hameete :
> Good morning!
>
> I have the following usecase:
>
> My program reads nested data (in this specific case XML) based on
> projections (path expressions) of this data. Often multiple paths are
> projected onto the same input. I would like each path to result in its own
> dataset.
>
> Is it possible to generate more than 1 dataset using a readFile operation to
> prevent reading the input twice?
>
> I have thought about a workaround where the InputFormat would return Tuple2s
> and the first field is the name of the dataset to which a record belongs.
> This would however require me to filter the read data once for each dataset
> or to do a groupReduce which is some overhead i'm looking to prevent.
>
> Is there a better (less overhead) workaround for doing this? Or is there
> some mechanism in Flink that would allow me to do this?
>
> Cheers!
>
> - Pieter


Re: Reading multiple datasets with one read operation

2015-10-22 Thread Till Rohrmann
I fear that the filter operations are not chained because there are at
least two of them which have the same DataSet as input. However, it's true
that the intermediate results are not materialized.

It is also correct that the filter operators are deployed colocated to the
data sources. Thus, there is no network traffic. However, the data will
still be serialized/deserialized between the not-chained operators (also if
they reside on the same machine).



On Thu, Oct 22, 2015 at 11:49 AM, Gábor Gévay  wrote:

> Hello!
>
> > I have thought about a workaround where the InputFormat would return
> > Tuple2s and the first field is the name of the dataset to which a record
> > belongs. This would however require me to filter the read data once for
> > each dataset or to do a groupReduce which is some overhead i'm
> > looking to prevent.
>
> I think that those two filters might not have that much overhead,
> because of several optimizations Flink does under the hood:
> - The dataset of Tuple2s won't be materialized, but instead will be
> streamed directly to the two filter operators.
> - The input format and the two filters will probably end up on the
> same machine, because of chaining, so there won't be
> serialization/deserialization between them.
>
> Best,
> Gabor
>
>
>
> 2015-10-22 11:38 GMT+02:00 Pieter Hameete :
> > Good morning!
> >
> > I have the following usecase:
> >
> > My program reads nested data (in this specific case XML) based on
> > projections (path expressions) of this data. Often multiple paths are
> > projected onto the same input. I would like each path to result in its
> own
> > dataset.
> >
> > Is it possible to generate more than 1 dataset using a readFile
> operation to
> > prevent reading the input twice?
> >
> > I have thought about a workaround where the InputFormat would return
> Tuple2s
> > and the first field is the name of the dataset to which a record belongs.
> > This would however require me to filter the read data once for each
> dataset
> > or to do a groupReduce which is some overhead i'm looking to prevent.
> >
> > Is there a better (less overhead) workaround for doing this? Or is there
> > some mechanism in Flink that would allow me to do this?
> >
> > Cheers!
> >
> > - Pieter
>


Re: Reading multiple datasets with one read operation

2015-10-22 Thread Fabian Hueske
It might even be materialized (to disk) if both derived data sets are
joined.

2015-10-22 12:01 GMT+02:00 Till Rohrmann :

> I fear that the filter operations are not chained because there are at
> least two of them which have the same DataSet as input. However, it's true
> that the intermediate results are not materialized.
>
> It is also correct that the filter operators are deployed colocated to the
> data sources. Thus, there is no network traffic. However, the data will
> still be serialized/deserialized between the not-chained operators (also if
> they reside on the same machine).
>
>
>
> On Thu, Oct 22, 2015 at 11:49 AM, Gábor Gévay  wrote:
>
>> Hello!
>>
>> > I have thought about a workaround where the InputFormat would return
>> > Tuple2s and the first field is the name of the dataset to which a record
>> > belongs. This would however require me to filter the read data once for
>> > each dataset or to do a groupReduce which is some overhead i'm
>> > looking to prevent.
>>
>> I think that those two filters might not have that much overhead,
>> because of several optimizations Flink does under the hood:
>> - The dataset of Tuple2s won't be materialized, but instead will be
>> streamed directly to the two filter operators.
>> - The input format and the two filters will probably end up on the
>> same machine, because of chaining, so there won't be
>> serialization/deserialization between them.
>>
>> Best,
>> Gabor
>>
>>
>>
>> 2015-10-22 11:38 GMT+02:00 Pieter Hameete :
>> > Good morning!
>> >
>> > I have the following usecase:
>> >
>> > My program reads nested data (in this specific case XML) based on
>> > projections (path expressions) of this data. Often multiple paths are
>> > projected onto the same input. I would like each path to result in its
>> own
>> > dataset.
>> >
>> > Is it possible to generate more than 1 dataset using a readFile
>> operation to
>> > prevent reading the input twice?
>> >
>> > I have thought about a workaround where the InputFormat would return
>> Tuple2s
>> > and the first field is the name of the dataset to which a record
>> belongs.
>> > This would however require me to filter the read data once for each
>> dataset
>> > or to do a groupReduce which is some overhead i'm looking to prevent.
>> >
>> > Is there a better (less overhead) workaround for doing this? Or is there
>> > some mechanism in Flink that would allow me to do this?
>> >
>> > Cheers!
>> >
>> > - Pieter
>>
>
>


Re: Reading multiple datasets with one read operation

2015-10-22 Thread Pieter Hameete
Thanks for your responses!

The derived datasets would indeed be grouped after the filter operations.
Why would this cause them to be materialized to disk? And if I understand
correctly the the data source will not chain to more than one filter,
causing (de)serialization to transfer the records from the data source to
the 2 or more filters on the same worker.

I would guess that in the end this approach would still be faster than
reading the entire input multiple times (we are talking 100GB+ on max 32
workers) but I would have to run some experiments to confirm that.



2015-10-22 12:06 GMT+02:00 Fabian Hueske :

> It might even be materialized (to disk) if both derived data sets are
> joined.
>
> 2015-10-22 12:01 GMT+02:00 Till Rohrmann :
>
>> I fear that the filter operations are not chained because there are at
>> least two of them which have the same DataSet as input. However, it's true
>> that the intermediate results are not materialized.
>>
>> It is also correct that the filter operators are deployed colocated to
>> the data sources. Thus, there is no network traffic. However, the data will
>> still be serialized/deserialized between the not-chained operators (also if
>> they reside on the same machine).
>>
>>
>>
>> On Thu, Oct 22, 2015 at 11:49 AM, Gábor Gévay  wrote:
>>
>>> Hello!
>>>
>>> > I have thought about a workaround where the InputFormat would return
>>> > Tuple2s and the first field is the name of the dataset to which a
>>> record
>>> > belongs. This would however require me to filter the read data once for
>>> > each dataset or to do a groupReduce which is some overhead i'm
>>> > looking to prevent.
>>>
>>> I think that those two filters might not have that much overhead,
>>> because of several optimizations Flink does under the hood:
>>> - The dataset of Tuple2s won't be materialized, but instead will be
>>> streamed directly to the two filter operators.
>>> - The input format and the two filters will probably end up on the
>>> same machine, because of chaining, so there won't be
>>> serialization/deserialization between them.
>>>
>>> Best,
>>> Gabor
>>>
>>>
>>>
>>> 2015-10-22 11:38 GMT+02:00 Pieter Hameete :
>>> > Good morning!
>>> >
>>> > I have the following usecase:
>>> >
>>> > My program reads nested data (in this specific case XML) based on
>>> > projections (path expressions) of this data. Often multiple paths are
>>> > projected onto the same input. I would like each path to result in its
>>> own
>>> > dataset.
>>> >
>>> > Is it possible to generate more than 1 dataset using a readFile
>>> operation to
>>> > prevent reading the input twice?
>>> >
>>> > I have thought about a workaround where the InputFormat would return
>>> Tuple2s
>>> > and the first field is the name of the dataset to which a record
>>> belongs.
>>> > This would however require me to filter the read data once for each
>>> dataset
>>> > or to do a groupReduce which is some overhead i'm looking to prevent.
>>> >
>>> > Is there a better (less overhead) workaround for doing this? Or is
>>> there
>>> > some mechanism in Flink that would allow me to do this?
>>> >
>>> > Cheers!
>>> >
>>> > - Pieter
>>>
>>
>>
>


Multiple keys in reduceGroup ?

2015-10-22 Thread LINZ, Arnaud
Hello,

Trying to understand why my code was giving strange results, I’ve ended up 
adding “useless” controls in my code and came with what seems to me a bug. I 
group my dataset according to a key, but in the reduceGroup function I am 
passed values with different keys.

My code has the following pattern (mix of java & pseudo-code in []) :

inputDataSet [of InputRecord]
.joinWithTiny(referencesDataSet [of Reference])
.where([InputRecord SecondaryKeySelector]).equalTo([Reference KeySelector])

.groupBy([PrimaryKeySelector : Tuple2 -> 
value.f0.getPrimaryKey()])
.sortGroup([DateKeySelector], Order.ASCENDING)
.reduceGroup(new ReduceFunction() {
@Override
   public void reduce(Iterable< Tuple2> values,  
Collector out) throws Exception {
 // Issue : all values do not share the same key
  final List> listValues = new 
ArrayList>();
 for (final Tuple2value : values) { 
listValues.add(value); }

final long primkey = listValues.get(0).f0.getPrimaryKey();
   for (int i = 1; i < listValues.size(); i++) {
if (listValues.get(i).f0.getPrimaryKey() != primkey) {
  throw new IllegalStateException(primkey + " != " + 
listValues.get(i).f0.getPrimaryKey());
==> This exception is fired !
   }
}
}
}) ;

I use the current 0.10 snapshot. The issue appears in local cluster mode unit 
tests as well as in yarn mode (however it’s ok when I test it with very few 
elements).

The sortGroup is not the cause of the problem, as I do get the same error 
without it.

Have I misunderstood the grouping concept or is it really an awful bug?

Best regards,
Arnaud






L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Re: Reading multiple datasets with one read operation

2015-10-22 Thread Fabian Hueske
In principle, a data set the branches needs only to be materialized if both
branches are pipelined until they are merged (i.e., in a hybrid-hash join).
Otherwise, the data flow might deadlock due to pipelining.

If you group both data sets before they are joined, the pipeline is broken
due to the blocking sort. Therefore, the branching data set should not be
materialized.

2015-10-22 12:18 GMT+02:00 Pieter Hameete :

> Thanks for your responses!
>
> The derived datasets would indeed be grouped after the filter operations.
> Why would this cause them to be materialized to disk? And if I understand
> correctly the the data source will not chain to more than one filter,
> causing (de)serialization to transfer the records from the data source to
> the 2 or more filters on the same worker.
>
> I would guess that in the end this approach would still be faster than
> reading the entire input multiple times (we are talking 100GB+ on max 32
> workers) but I would have to run some experiments to confirm that.
>
>
>
> 2015-10-22 12:06 GMT+02:00 Fabian Hueske :
>
>> It might even be materialized (to disk) if both derived data sets are
>> joined.
>>
>> 2015-10-22 12:01 GMT+02:00 Till Rohrmann :
>>
>>> I fear that the filter operations are not chained because there are at
>>> least two of them which have the same DataSet as input. However, it's true
>>> that the intermediate results are not materialized.
>>>
>>> It is also correct that the filter operators are deployed colocated to
>>> the data sources. Thus, there is no network traffic. However, the data will
>>> still be serialized/deserialized between the not-chained operators (also if
>>> they reside on the same machine).
>>>
>>>
>>>
>>> On Thu, Oct 22, 2015 at 11:49 AM, Gábor Gévay  wrote:
>>>
 Hello!

 > I have thought about a workaround where the InputFormat would return
 > Tuple2s and the first field is the name of the dataset to which a
 record
 > belongs. This would however require me to filter the read data once
 for
 > each dataset or to do a groupReduce which is some overhead i'm
 > looking to prevent.

 I think that those two filters might not have that much overhead,
 because of several optimizations Flink does under the hood:
 - The dataset of Tuple2s won't be materialized, but instead will be
 streamed directly to the two filter operators.
 - The input format and the two filters will probably end up on the
 same machine, because of chaining, so there won't be
 serialization/deserialization between them.

 Best,
 Gabor



 2015-10-22 11:38 GMT+02:00 Pieter Hameete :
 > Good morning!
 >
 > I have the following usecase:
 >
 > My program reads nested data (in this specific case XML) based on
 > projections (path expressions) of this data. Often multiple paths are
 > projected onto the same input. I would like each path to result in
 its own
 > dataset.
 >
 > Is it possible to generate more than 1 dataset using a readFile
 operation to
 > prevent reading the input twice?
 >
 > I have thought about a workaround where the InputFormat would return
 Tuple2s
 > and the first field is the name of the dataset to which a record
 belongs.
 > This would however require me to filter the read data once for each
 dataset
 > or to do a groupReduce which is some overhead i'm looking to prevent.
 >
 > Is there a better (less overhead) workaround for doing this? Or is
 there
 > some mechanism in Flink that would allow me to do this?
 >
 > Cheers!
 >
 > - Pieter

>>>
>>>
>>
>


Re: Multiple keys in reduceGroup ?

2015-10-22 Thread Stephan Ewen
Hi!

You are checking for equality / inequality with "!=" - can you check with
"equals()" ?

The key objects will most certainly be different in each record (as they
are deserialized individually), but they should be equal.

Stephan


On Thu, Oct 22, 2015 at 12:20 PM, LINZ, Arnaud 
wrote:

> Hello,
>
>
>
> Trying to understand why my code was giving strange results, I’ve ended up
> adding “useless” controls in my code and came with what seems to me a bug.
> I group my dataset according to a key, but in the reduceGroup function I am
> passed values with different keys.
>
>
>
> My code has the following pattern (mix of java & pseudo-code in []) :
>
>
>
> inputDataSet *[of InputRecord]*
>
> .joinWithTiny(referencesDataSet *[of Reference]*)
>
> .where(*[InputRecord SecondaryKeySelector]*).equalTo(*[Reference
> KeySelector]*)
>
> .groupBy(*[PrimaryKeySelector : Tuple2 -> value.*
> *f0**.getPrimaryKey()]*)
>
> .sortGroup(*[DateKeySelector]*, Order.*ASCENDING*)
>
> .reduceGroup(*new* ReduceFunction() {
>
> @Override
>
>*public* *void* reduce(Iterable< Tuple2>
> values,  Collector out) *throws* Exception {
>
>  // Issue : all values do not share the same key
>
>   *final* List> listValues = *new*
> ArrayList>();
>
>  *for* (*final* Tuple2value : values)
> { listValues.add(value); }
>
>
>
> *final* *long* primkey = listValues.get(0).f0.getPrimaryKey();
>
>*for* (*int* i = 1; i < listValues.size(); i++) {
>
> *if* (listValues.get(i).f0.getPrimaryKey() != primkey) {
>
>   *throw* *new* IllegalStateException(primkey + " != "
> + listValues.get(i).f0.getPrimaryKey());
>
> è This exception is fired !
>
>}
>
> }
>
> }
>
> }) ;
>
>
>
> I use the current 0.10 snapshot. The issue appears in local cluster mode
> unit tests as well as in yarn mode (however it’s ok when I test it with
> very few elements).
>
>
>
> The sortGroup is not the cause of the problem, as I do get the same error
> without it.
>
>
>
> Have I misunderstood the grouping concept or is it really an awful bug?
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
>
>
> --
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>


Re: Multiple keys in reduceGroup ?

2015-10-22 Thread Aljoscha Krettek
Hi,
but he’s comparing it to a primitive long, so shouldn’t the Long key be unboxed 
and the comparison still be valid?

My question is whether you enabled object-reuse-mode on the 
ExecutionEnvironment?

Cheers,
Aljoscha
> On 22 Oct 2015, at 12:31, Stephan Ewen  wrote:
> 
> Hi!
> 
> You are checking for equality / inequality with "!=" - can you check with 
> "equals()" ?
> 
> The key objects will most certainly be different in each record (as they are 
> deserialized individually), but they should be equal.
> 
> Stephan
> 
> 
> On Thu, Oct 22, 2015 at 12:20 PM, LINZ, Arnaud  
> wrote:
> Hello,
> 
>  
> 
> Trying to understand why my code was giving strange results, I’ve ended up 
> adding “useless” controls in my code and came with what seems to me a bug. I 
> group my dataset according to a key, but in the reduceGroup function I am 
> passed values with different keys.
> 
>  
> 
> My code has the following pattern (mix of java & pseudo-code in []) :
> 
>  
> 
> inputDataSet [of InputRecord]
> 
> .joinWithTiny(referencesDataSet [of Reference])
> 
> .where([InputRecord SecondaryKeySelector]).equalTo([Reference KeySelector])
> 
> 
> .groupBy([PrimaryKeySelector : Tuple2 -> 
> value.f0.getPrimaryKey()])
> 
> .sortGroup([DateKeySelector], Order.ASCENDING)
> 
> .reduceGroup(new ReduceFunction() {
> 
> @Override
> 
>public void reduce(Iterable< Tuple2> values,  
> Collector out) throws Exception {
> 
>  // Issue : all values do not share the same key
> 
>   final List> listValues = new 
> ArrayList>();
> 
>  for (final Tuple2value : values) { 
> listValues.add(value); }
> 
>
> 
> final long primkey = listValues.get(0).f0.getPrimaryKey();
> 
>for (int i = 1; i < listValues.size(); i++) {
> 
> if (listValues.get(i).f0.getPrimaryKey() != primkey) {
> 
>   throw new IllegalStateException(primkey + " != " + 
> listValues.get(i).f0.getPrimaryKey());
> 
> è This exception is fired !
> 
>}
> 
> }
> 
> }
> 
> }) ;
> 
>  
> 
> I use the current 0.10 snapshot. The issue appears in local cluster mode unit 
> tests as well as in yarn mode (however it’s ok when I test it with very few 
> elements).
> 
>  
> 
> The sortGroup is not the cause of the problem, as I do get the same error 
> without it.
> 
>  
> 
> Have I misunderstood the grouping concept or is it really an awful bug?
> 
>  
> 
> Best regards,
> 
> Arnaud
> 
>  
> 
>  
> 
>  
> 
> 
> 
> L'intégrité de ce message n'étant pas assurée sur internet, la société 
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
> n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
> l'expéditeur.
> 
> The integrity of this message cannot be guaranteed on the Internet. The 
> company that sent this message cannot therefore be held liable for its 
> content nor attachments. Any unauthorized use or dissemination is prohibited. 
> If you are not the intended recipient of this message, then please delete it 
> and notify the sender.
> 



Re: Multiple keys in reduceGroup ?

2015-10-22 Thread Till Rohrmann
If not, could you provide us with the program and test data to reproduce
the error?

Cheers,
Till

On Thu, Oct 22, 2015 at 12:34 PM, Aljoscha Krettek 
wrote:

> Hi,
> but he’s comparing it to a primitive long, so shouldn’t the Long key be
> unboxed and the comparison still be valid?
>
> My question is whether you enabled object-reuse-mode on the
> ExecutionEnvironment?
>
> Cheers,
> Aljoscha
> > On 22 Oct 2015, at 12:31, Stephan Ewen  wrote:
> >
> > Hi!
> >
> > You are checking for equality / inequality with "!=" - can you check
> with "equals()" ?
> >
> > The key objects will most certainly be different in each record (as they
> are deserialized individually), but they should be equal.
> >
> > Stephan
> >
> >
> > On Thu, Oct 22, 2015 at 12:20 PM, LINZ, Arnaud 
> wrote:
> > Hello,
> >
> >
> >
> > Trying to understand why my code was giving strange results, I’ve ended
> up adding “useless” controls in my code and came with what seems to me a
> bug. I group my dataset according to a key, but in the reduceGroup function
> I am passed values with different keys.
> >
> >
> >
> > My code has the following pattern (mix of java & pseudo-code in []) :
> >
> >
> >
> > inputDataSet [of InputRecord]
> >
> > .joinWithTiny(referencesDataSet [of Reference])
> >
> > .where([InputRecord SecondaryKeySelector]).equalTo([Reference
> KeySelector])
> >
> >
> > .groupBy([PrimaryKeySelector : Tuple2 ->
> value.f0.getPrimaryKey()])
> >
> > .sortGroup([DateKeySelector], Order.ASCENDING)
> >
> > .reduceGroup(new ReduceFunction() {
> >
> > @Override
> >
> >public void reduce(Iterable< Tuple2>
> values,  Collector out) throws Exception {
> >
> >  // Issue : all values do not share the same key
> >
> >   final List> listValues = new
> ArrayList>();
> >
> >  for (final Tuple2value : values) {
> listValues.add(value); }
> >
> >
> >
> > final long primkey = listValues.get(0).f0.getPrimaryKey();
> >
> >for (int i = 1; i < listValues.size(); i++) {
> >
> > if (listValues.get(i).f0.getPrimaryKey() != primkey) {
> >
> >   throw new IllegalStateException(primkey + " != " +
> listValues.get(i).f0.getPrimaryKey());
> >
> > è This exception is fired !
> >
> >}
> >
> > }
> >
> > }
> >
> > }) ;
> >
> >
> >
> > I use the current 0.10 snapshot. The issue appears in local cluster mode
> unit tests as well as in yarn mode (however it’s ok when I test it with
> very few elements).
> >
> >
> >
> > The sortGroup is not the cause of the problem, as I do get the same
> error without it.
> >
> >
> >
> > Have I misunderstood the grouping concept or is it really an awful bug?
> >
> >
> >
> > Best regards,
> >
> > Arnaud
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
> >
> > The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
> >
>
>


RE: Multiple keys in reduceGroup ?

2015-10-22 Thread LINZ, Arnaud
Hi,

I was using primitive types, and EnableObjectReuse was turned on.  My next move 
was to turn it off, and it did solved the problem.
It also increased execution time by 10%, but it’s hard to say if this overhead 
is due to the copy or to the change of behavior of the reduceGroup algorithm 
once it get the right data.

Since I never modify my objects, why object reuse isn’t working ?

Best regards,
Arnaud


De : Till Rohrmann [mailto:trohrm...@apache.org]
Envoyé : jeudi 22 octobre 2015 12:36
À : user@flink.apache.org
Objet : Re: Multiple keys in reduceGroup ?

If not, could you provide us with the program and test data to reproduce the 
error?

Cheers,
Till

On Thu, Oct 22, 2015 at 12:34 PM, Aljoscha Krettek 
mailto:aljos...@apache.org>> wrote:
Hi,
but he’s comparing it to a primitive long, so shouldn’t the Long key be unboxed 
and the comparison still be valid?

My question is whether you enabled object-reuse-mode on the 
ExecutionEnvironment?

Cheers,
Aljoscha
> On 22 Oct 2015, at 12:31, Stephan Ewen 
> mailto:se...@apache.org>> wrote:
>
> Hi!
>
> You are checking for equality / inequality with "!=" - can you check with 
> "equals()" ?
>
> The key objects will most certainly be different in each record (as they are 
> deserialized individually), but they should be equal.
>
> Stephan
>
>
> On Thu, Oct 22, 2015 at 12:20 PM, LINZ, Arnaud 
> mailto:al...@bouyguestelecom.fr>> wrote:
> Hello,
>
>
>
> Trying to understand why my code was giving strange results, I’ve ended up 
> adding “useless” controls in my code and came with what seems to me a bug. I 
> group my dataset according to a key, but in the reduceGroup function I am 
> passed values with different keys.
>
>
>
> My code has the following pattern (mix of java & pseudo-code in []) :
>
>
>
> inputDataSet [of InputRecord]
>
> .joinWithTiny(referencesDataSet [of Reference])
>
> .where([InputRecord SecondaryKeySelector]).equalTo([Reference KeySelector])
>
>
> .groupBy([PrimaryKeySelector : Tuple2 -> 
> value.f0.getPrimaryKey()])
>
> .sortGroup([DateKeySelector], Order.ASCENDING)
>
> .reduceGroup(new ReduceFunction() {
>
> @Override
>
>public void reduce(Iterable< Tuple2> values,  
> Collector out) throws Exception {
>
>  // Issue : all values do not share the same key
>
>   final List> listValues = new 
> ArrayList>();
>
>  for (final Tuple2value : values) { 
> listValues.add(value); }
>
>
>
> final long primkey = listValues.get(0).f0.getPrimaryKey();
>
>for (int i = 1; i < listValues.size(); i++) {
>
> if (listValues.get(i).f0.getPrimaryKey() != primkey) {
>
>   throw new IllegalStateException(primkey + " != " + 
> listValues.get(i).f0.getPrimaryKey());
>
> è This exception is fired !
>
>}
>
> }
>
> }
>
> }) ;
>
>
>
> I use the current 0.10 snapshot. The issue appears in local cluster mode unit 
> tests as well as in yarn mode (however it’s ok when I test it with very few 
> elements).
>
>
>
> The sortGroup is not the cause of the problem, as I do get the same error 
> without it.
>
>
>
> Have I misunderstood the grouping concept or is it really an awful bug?
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
>
>
>
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société 
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
> n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
> l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The 
> company that sent this message cannot therefore be held liable for its 
> content nor attachments. Any unauthorized use or dissemination is prohibited. 
> If you are not the intended recipient of this message, then please delete it 
> and notify the sender.
>



Re: Multiple keys in reduceGroup ?

2015-10-22 Thread Stephan Ewen
With object reuse activated, Flink heavily reuses objects. Each call to the
Iterator in the reduceGroup function gives back one of the same two
objects, with has been filled with different contents.

Your list of all values will effectively only contain two different objects.

Further more, the look-ahead, which determines that a new key starts, will
also reuse one of these objects, which is why some elements in your list
have their contents already overwritten with the look-ahead key.

The contract for object reuse mode is the following: An object is only
valid until you request a new value from the iterator. After that, the
object's contents may have changed due to reuse.

This effectively means accumulating objects in a list with object reuse
mode requires you to manually copy them into the list.



On Thu, Oct 22, 2015 at 1:30 PM, LINZ, Arnaud 
wrote:

> Hi,
>
>
>
> I was using primitive types, and EnableObjectReuse was turned on.  My next
> move was to turn it off, and it did solved the problem.
>
> It also increased execution time by 10%, but it’s hard to say if this
> overhead is due to the copy or to the change of behavior of the reduceGroup
> algorithm once it get the right data.
>
>
>
> Since I never modify my objects, why object reuse isn’t working ?
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
> *De :* Till Rohrmann [mailto:trohrm...@apache.org]
> *Envoyé :* jeudi 22 octobre 2015 12:36
> *À :* user@flink.apache.org
> *Objet :* Re: Multiple keys in reduceGroup ?
>
>
>
> If not, could you provide us with the program and test data to reproduce
> the error?
>
>
>
> Cheers,
>
> Till
>
>
>
> On Thu, Oct 22, 2015 at 12:34 PM, Aljoscha Krettek 
> wrote:
>
> Hi,
> but he’s comparing it to a primitive long, so shouldn’t the Long key be
> unboxed and the comparison still be valid?
>
> My question is whether you enabled object-reuse-mode on the
> ExecutionEnvironment?
>
> Cheers,
> Aljoscha
>
> > On 22 Oct 2015, at 12:31, Stephan Ewen  wrote:
> >
> > Hi!
> >
> > You are checking for equality / inequality with "!=" - can you check
> with "equals()" ?
> >
> > The key objects will most certainly be different in each record (as they
> are deserialized individually), but they should be equal.
> >
> > Stephan
> >
> >
> > On Thu, Oct 22, 2015 at 12:20 PM, LINZ, Arnaud 
> wrote:
> > Hello,
> >
> >
> >
> > Trying to understand why my code was giving strange results, I’ve ended
> up adding “useless” controls in my code and came with what seems to me a
> bug. I group my dataset according to a key, but in the reduceGroup function
> I am passed values with different keys.
> >
> >
> >
> > My code has the following pattern (mix of java & pseudo-code in []) :
> >
> >
> >
> > inputDataSet [of InputRecord]
> >
> > .joinWithTiny(referencesDataSet [of Reference])
> >
> > .where([InputRecord SecondaryKeySelector]).equalTo([Reference
> KeySelector])
> >
> >
> > .groupBy([PrimaryKeySelector : Tuple2 ->
> value.f0.getPrimaryKey()])
> >
> > .sortGroup([DateKeySelector], Order.ASCENDING)
> >
> > .reduceGroup(new ReduceFunction() {
> >
> > @Override
> >
> >public void reduce(Iterable< Tuple2>
> values,  Collector out) throws Exception {
> >
> >  // Issue : all values do not share the same key
> >
> >   final List> listValues = new
> ArrayList>();
> >
> >  for (final Tuple2value : values) {
> listValues.add(value); }
> >
> >
> >
> > final long primkey = listValues.get(0).f0.getPrimaryKey();
> >
> >for (int i = 1; i < listValues.size(); i++) {
> >
> > if (listValues.get(i).f0.getPrimaryKey() != primkey) {
> >
> >   throw new IllegalStateException(primkey + " != " +
> listValues.get(i).f0.getPrimaryKey());
> >
> > è This exception is fired !
> >
> >}
> >
> > }
> >
> > }
> >
> > }) ;
> >
> >
> >
> > I use the current 0.10 snapshot. The issue appears in local cluster mode
> unit tests as well as in yarn mode (however it’s ok when I test it with
> very few elements).
> >
> >
> >
> > The sortGroup is not the cause of the problem, as I do get the same
> error without it.
> >
> >
> >
> > Have I misunderstood the grouping concept or is it really an awful bug?
> >
> >
> >
> > Best regards,
> >
> > Arnaud
> >
> >
> >
> >
> >
> >
> >
> >
> >
>
> > L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
> >
> > The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
> >
>
>
>


Re: Multiple keys in reduceGroup ?

2015-10-22 Thread Till Rohrmann
You don’t modify the objects, however, the ReusingKeyGroupedIterator, which
is the iterator you have in your reduce function, does. Internally it uses
two objects, in your case of type Tuple2, to
deserialize the input records. These two objects are alternately returned
when you call next on the iterator. Since you only store references to
these two objects in your ArrayList, you will see any changes made to these
two objects.

However, this only explains why the values of your elements change and not
the key. To understand why you observe different keys in your group you
have to know that the ReusingKeyGroupedIterator does a look ahead to see
whether the next element has the same key value. The look ahead is stored
in one of the two objects. When the iterator detects that the next element
has a new key, then it will finish the iterator. However, you’ll will see
the key value of the next group in half of your elements.

If you want to accumulate input data while using reuse object mode you
should copy the input elements.
​

On Thu, Oct 22, 2015 at 1:30 PM, LINZ, Arnaud 
wrote:

> Hi,
>
>
>
> I was using primitive types, and EnableObjectReuse was turned on.  My next
> move was to turn it off, and it did solved the problem.
>
> It also increased execution time by 10%, but it’s hard to say if this
> overhead is due to the copy or to the change of behavior of the reduceGroup
> algorithm once it get the right data.
>
>
>
> Since I never modify my objects, why object reuse isn’t working ?
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
> *De :* Till Rohrmann [mailto:trohrm...@apache.org]
> *Envoyé :* jeudi 22 octobre 2015 12:36
> *À :* user@flink.apache.org
> *Objet :* Re: Multiple keys in reduceGroup ?
>
>
>
> If not, could you provide us with the program and test data to reproduce
> the error?
>
>
>
> Cheers,
>
> Till
>
>
>
> On Thu, Oct 22, 2015 at 12:34 PM, Aljoscha Krettek 
> wrote:
>
> Hi,
> but he’s comparing it to a primitive long, so shouldn’t the Long key be
> unboxed and the comparison still be valid?
>
> My question is whether you enabled object-reuse-mode on the
> ExecutionEnvironment?
>
> Cheers,
> Aljoscha
>
> > On 22 Oct 2015, at 12:31, Stephan Ewen  wrote:
> >
> > Hi!
> >
> > You are checking for equality / inequality with "!=" - can you check
> with "equals()" ?
> >
> > The key objects will most certainly be different in each record (as they
> are deserialized individually), but they should be equal.
> >
> > Stephan
> >
> >
> > On Thu, Oct 22, 2015 at 12:20 PM, LINZ, Arnaud 
> wrote:
> > Hello,
> >
> >
> >
> > Trying to understand why my code was giving strange results, I’ve ended
> up adding “useless” controls in my code and came with what seems to me a
> bug. I group my dataset according to a key, but in the reduceGroup function
> I am passed values with different keys.
> >
> >
> >
> > My code has the following pattern (mix of java & pseudo-code in []) :
> >
> >
> >
> > inputDataSet [of InputRecord]
> >
> > .joinWithTiny(referencesDataSet [of Reference])
> >
> > .where([InputRecord SecondaryKeySelector]).equalTo([Reference
> KeySelector])
> >
> >
> > .groupBy([PrimaryKeySelector : Tuple2 ->
> value.f0.getPrimaryKey()])
> >
> > .sortGroup([DateKeySelector], Order.ASCENDING)
> >
> > .reduceGroup(new ReduceFunction() {
> >
> > @Override
> >
> >public void reduce(Iterable< Tuple2>
> values,  Collector out) throws Exception {
> >
> >  // Issue : all values do not share the same key
> >
> >   final List> listValues = new
> ArrayList>();
> >
> >  for (final Tuple2value : values) {
> listValues.add(value); }
> >
> >
> >
> > final long primkey = listValues.get(0).f0.getPrimaryKey();
> >
> >for (int i = 1; i < listValues.size(); i++) {
> >
> > if (listValues.get(i).f0.getPrimaryKey() != primkey) {
> >
> >   throw new IllegalStateException(primkey + " != " +
> listValues.get(i).f0.getPrimaryKey());
> >
> > è This exception is fired !
> >
> >}
> >
> > }
> >
> > }
> >
> > }) ;
> >
> >
> >
> > I use the current 0.10 snapshot. The issue appears in local cluster mode
> unit tests as well as in yarn mode (however it’s ok when I test it with
> very few elements).
> >
> >
> >
> > The sortGroup is not the cause of the problem, as I do get the same
> error without it.
> >
> >
> >
> > Have I misunderstood the grouping concept or is it really an awful bug?
> >
> >
> >
> > Best regards,
> >
> > Arnaud
> >
> >
> >
> >
> >
> >
> >
> >
> >
>
> > L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
> >
> > The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> c

Running continuously on yarn with kerberos

2015-10-22 Thread Niels Basjes
Hi,

I want to write a long running (i.e. never stop it) streaming flink
application on a kerberos secured Hadoop/Yarn cluster. My application needs
to do things with files on HDFS and HBase tables on that cluster so having
the correct kerberos tickets is very important. The stream is to be
ingested from Kafka.

One of the things with Kerberos is that the tickets expire after a
predetermined time. My knowledge about kerberos is very limited so I hope
you guys can help me.

My question is actually quite simple: Is there an howto somewhere on how to
correctly run a long running flink application with kerberos that includes
a solution for the kerberos ticket timeout  ?

Thanks

Niels Basjes


RE: Multiple keys in reduceGroup ?

2015-10-22 Thread LINZ, Arnaud
Hi,

Thanks a lot for the explanation. I cannot even say that it wasn’t stated in 
the documentation, I’ve simply missed the iterator part :


“by default, user defined functions (like map() or reduce()) are getting new 
objects on each call (or through an iterator). So it is possible to keep 
references to the objects inside the function (for example in a List).
There is a switch at the ExectionConfig which allows users to enable the object 
reuse mode:
env.getExecutionConfig().enableObjectReuse()
For mutable types, Flink will reuse object instances. In practice that means 
that a map() function will always receive the same object instance (with its 
fields set to new values). The object reuse mode will lead to better 
performance because fewer objects are created, but the user has to manually 
take care of what they are doing with the object references.”
Greetings,
Arnaud

De : Till Rohrmann [mailto:trohrm...@apache.org]
Envoyé : jeudi 22 octobre 2015 13:45
À : user@flink.apache.org
Objet : Re: Multiple keys in reduceGroup ?


You don’t modify the objects, however, the ReusingKeyGroupedIterator, which is 
the iterator you have in your reduce function, does. Internally it uses two 
objects, in your case of type Tuple2, to deserialize 
the input records. These two objects are alternately returned when you call 
next on the iterator. Since you only store references to these two objects in 
your ArrayList, you will see any changes made to these two objects.

However, this only explains why the values of your elements change and not the 
key. To understand why you observe different keys in your group you have to 
know that the ReusingKeyGroupedIterator does a look ahead to see whether the 
next element has the same key value. The look ahead is stored in one of the two 
objects. When the iterator detects that the next element has a new key, then it 
will finish the iterator. However, you’ll will see the key value of the next 
group in half of your elements.

If you want to accumulate input data while using reuse object mode you should 
copy the input elements.
​

On Thu, Oct 22, 2015 at 1:30 PM, LINZ, Arnaud 
mailto:al...@bouyguestelecom.fr>> wrote:
Hi,

I was using primitive types, and EnableObjectReuse was turned on.  My next move 
was to turn it off, and it did solved the problem.
It also increased execution time by 10%, but it’s hard to say if this overhead 
is due to the copy or to the change of behavior of the reduceGroup algorithm 
once it get the right data.

Since I never modify my objects, why object reuse isn’t working ?

Best regards,
Arnaud


De : Till Rohrmann [mailto:trohrm...@apache.org]
Envoyé : jeudi 22 octobre 2015 12:36
À : user@flink.apache.org
Objet : Re: Multiple keys in reduceGroup ?

If not, could you provide us with the program and test data to reproduce the 
error?

Cheers,
Till

On Thu, Oct 22, 2015 at 12:34 PM, Aljoscha Krettek 
mailto:aljos...@apache.org>> wrote:
Hi,
but he’s comparing it to a primitive long, so shouldn’t the Long key be unboxed 
and the comparison still be valid?

My question is whether you enabled object-reuse-mode on the 
ExecutionEnvironment?

Cheers,
Aljoscha
> On 22 Oct 2015, at 12:31, Stephan Ewen 
> mailto:se...@apache.org>> wrote:
>
> Hi!
>
> You are checking for equality / inequality with "!=" - can you check with 
> "equals()" ?
>
> The key objects will most certainly be different in each record (as they are 
> deserialized individually), but they should be equal.
>
> Stephan
>
>
> On Thu, Oct 22, 2015 at 12:20 PM, LINZ, Arnaud 
> mailto:al...@bouyguestelecom.fr>> wrote:
> Hello,
>
>
>
> Trying to understand why my code was giving strange results, I’ve ended up 
> adding “useless” controls in my code and came with what seems to me a bug. I 
> group my dataset according to a key, but in the reduceGroup function I am 
> passed values with different keys.
>
>
>
> My code has the following pattern (mix of java & pseudo-code in []) :
>
>
>
> inputDataSet [of InputRecord]
>
> .joinWithTiny(referencesDataSet [of Reference])
>
> .where([InputRecord SecondaryKeySelector]).equalTo([Reference KeySelector])
>
>
> .groupBy([PrimaryKeySelector : Tuple2 -> 
> value.f0.getPrimaryKey()])
>
> .sortGroup([DateKeySelector], Order.ASCENDING)
>
> .reduceGroup(new ReduceFunction() {
>
> @Override
>
>public void reduce(Iterable< Tuple2> values,  
> Collector out) throws Exception {
>
>  // Issue : all values do not share the same key
>
>   final List> listValues = new 
> ArrayList>();
>
>  for (final Tuple2value : values) { 
> listValues.add(value); }
>
>
>
> final long primkey = listValues.get(0).f0.getPrimaryKey();
>
>for (int i = 1; i < listValues.size(); i++) {
>
> if (listValues.get(i).f0.getPrimaryKey() != primkey) {
>
>   throw new IllegalStateException(primkey + " != " + 
> listValues.get(i).f0.getPrimaryKey());
>
> 

Re: Flink+avro integration

2015-10-22 Thread aawhitaker
Stephan Ewen wrote
> This is actually not a bug, or a POJO or Avro problem. It is simply a
> limitation in the functionality, as the exception message says:
> "Specifying
> fields by name is only supported on Case Classes (for now)."
> 
> Try this with a regular reduce function that selects the max and it should
> work fine...

Thanks. Just to be sure, this is also a limitation of the Java
implementation of Flink, right?





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-avro-integration-tp3162p3236.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Zeppelin Integration

2015-10-22 Thread Till Rohrmann
Hi Trevor,

that’s actually my bad since I only tested my branch against a remote
cluster. I fixed the problem (not properly starting the
LocalFlinkMiniCluster) so that you can now use Zeppelin also in local mode.
Just check out my branch again.

Cheers,
Till
​

On Wed, Oct 21, 2015 at 10:00 PM, Trevor Grant 
wrote:

> Hey Till,
>
> I cloned your branch of Zeplin and while it will compile, it fails tests
> on timeout, which consequently was the same issue I was having when trying
> to use Zeppelin.
>
> Ideas?
>
>
> ---
> Test set: org.apache.zeppelin.flink.FlinkInterpreterTest
>
> ---
> Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 100.347
> sec <<< FAILURE! - in org.apache.zeppelin.flink.FlinkInterpreterTest
> org.apache.zeppelin.flink.FlinkInterpreterTest  Time elapsed: 100.347 sec
>  <<< ERROR!
> java.util.concurrent.TimeoutException: Futures timed out after [10
> milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.getLeaderIndex(FlinkMiniCluster.scala:171)
> at
> org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.getLeaderRPCPort(LocalFlinkMiniCluster.scala:132)
> at
> org.apache.zeppelin.flink.FlinkInterpreter.getPort(FlinkInterpreter.java:136)
> at
> org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:98)
> at
> org.apache.zeppelin.flink.FlinkInterpreterTest.setUp(FlinkInterpreterTest.java:42)
>
> org.apache.zeppelin.flink.FlinkInterpreterTest  Time elapsed: 100.347 sec
>  <<< ERROR!
> java.lang.NullPointerException: null
> at
> org.apache.zeppelin.flink.FlinkInterpreter.close(FlinkInterpreter.java:221)
> at
> org.apache.zeppelin.flink.FlinkInterpreterTest.tearDown(FlinkInterpreterTest.java:48)
>
>
>
> Trevor Grant
> Data Scientist
> https://github.com/rawkintrevo
> http://stackexchange.com/users/3002022/rawkintrevo
>
> *"Fortunate is he, who is able to know the causes of things."  -Virgil*
>
>
> On Wed, Oct 21, 2015 at 11:57 AM, Till Rohrmann 
> wrote:
>
>> Hi Trevor,
>>
>> in order to use Zeppelin with a different Flink version in local mode,
>> meaning that Zeppelin starts a LocalFlinkMiniCluster when executing your
>> jobs, you have to build Zeppelin and change the flink.version property
>> in the zeppelin/flink/pom.xml file to the version you want to use.
>>
>> If you want to let Zeppelin submit jobs to a remote cluster, you should
>> build Zeppelin with the version of your cluster. That’s because internally
>> Zeppelin will use this version to construct a JobGraph which is then
>> submitted to the cluster. In order to configure the remote cluster, you
>> have to go the *Interpreter* page and scroll down to the *flink*
>> section. There you have to specify the address of your cluster under
>> *host* and the port under *port*. This should then be used to submit
>> jobs to the Flink cluster.
>>
>> I hope this answers your question.
>>
>> Btw: If you want to use Zeppelin with the latest Flink 0.10-SNAPSHOT
>> version, you should checkout my branch
>> https://github.com/tillrohrmann/incubator-zeppelin/tree/flink-0.10-SNAPSHOT
>> where I’ve made the necessary changes.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Wed, Oct 21, 2015 at 5:00 PM, Trevor Grant 
>> wrote:
>>
>>> I'm setting up some Flink/Spark/Zeppelin at work.  Spark+Zeppelin seems
>>> to be relatively well supported and configurable but the Flink is not so
>>> much.
>>>
>>> I want Zeppelin to run against my 0.10 build instead of the 0.6 build
>>> that ships with Zeppelin.  My best guess at the moment on how to accomplish
>>> this is to create a symbolic link from the /opt/zepplin/flink folder to
>>> /opt/flink-0.10, but this feels dirty and wrong.
>>>
>>> Does anyone out there have any experience connecting Zeppelin to a
>>> non-prepackaged Flink build?
>>>
>>> I feel like there is a great opporutnity for a HOWTO write up if non
>>> currently exists.
>>>
>>> I'm asking on the Zeppelin user mailing list too as soon as I am added.
>>>
>>> Thanks for any help
>>>
>>> tg
>>>
>>>
>>> Trevor Grant
>>> Data Scientist
>>> https://github.com/rawkintrevo
>>> http://stackexchange.com/users/3002022/rawkintrevo
>>>
>>> *"Fortunate is he, who is able to know the causes of things."  -Virgil*
>>>
>>>
>>
>


reading csv file from null value

2015-10-22 Thread Philip Lee
Hi,

I am trying to load the dataset with the part of null value by using
readCsvFile().

// e.g  _date|_click|_sales|_item|_web_page|_user

case class WebClick(_click_date: Long, _click_time: Long, _sales: Int,
_item: Int,_page: Int, _user: Int)

private def getWebClickDataSet(env: ExecutionEnvironment): DataSet[WebClick] = {

  env.readCsvFile[WebClick](
webClickPath,
fieldDelimiter = "|",
includedFields = Array(0, 1, 2, 3, 4, 5),
// lenient = true
  )
}


Well, I know there is an option to ignore malformed value, but I have to
read the dataset even though it has null value.

as it follows, dataset (third column is null) looks like
37794|24669||16705|23|54810
but I have to read null value as well because I have to use filter or where
function ( _sales == null )

Is there any detail suggestion to do it?

Thanks,
Philip







-- 

==

*Hae Joon Lee*


Now, in Germany,

M.S. Candidate, Interested in Distributed System, Iterative Processing

Dept. of Computer Science, Informatik in German, TUB

Technical University of Berlin


In Korea,

M.S. Candidate, Computer Architecture Laboratory

Dept. of Computer Science, KAIST


Rm# 4414 CS Dept. KAIST

373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)


Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea

==


Re: Flink+avro integration

2015-10-22 Thread Till Rohrmann
In the Java API, we only support the `max` operation for tuple types where
you reference the fields via indices.

Cheers,
Till

On Thu, Oct 22, 2015 at 4:04 PM, aawhitaker  wrote:

> Stephan Ewen wrote
> > This is actually not a bug, or a POJO or Avro problem. It is simply a
> > limitation in the functionality, as the exception message says:
> > "Specifying
> > fields by name is only supported on Case Classes (for now)."
> >
> > Try this with a regular reduce function that selects the max and it
> should
> > work fine...
>
> Thanks. Just to be sure, this is also a limitation of the Java
> implementation of Flink, right?
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-avro-integration-tp3162p3236.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>