Re: Proposal: YARN session per-job Kerberos authentication

2016-03-24 Thread Robert Metzger
Hi Stefano,

I think the proposed feature is not limited to YARN sessions. With the code
in place, also standalone clusters would allow us to authenticate file
system access with the user who submitted the job.

I would recommend you to do some prototyping and come up with a design
document first. The change has quite some implications.
Some things that come into my mind:
- Filesystem implementations are currently instantiated once. I think we
would need to securely instantiate filesystems per user (imagine multiple
users on Flink) (That's why the YARN session user owns the file system /
Hbase access)
- There is currently no over-the-write encryption (its on my TODO list) in
Flink, so how do you transfer the security tokens? (YARN is currently doing
that for us. so we don't need to worry about that)

There are probably more implications you'll find while implementing this.


On Wed, Mar 23, 2016 at 7:06 PM, Maximilian Michels  wrote:

> Hi Stefano,
>
> Sounds great. Please go ahead! Note that Flink already provides the
> proposed feature for per-job Yarn clusters. However, it is a valuable
> addition to realize this feature for the Yarn session.
>
> The only blocker that I can think of is probably this PR which changes
> a lot of the Yarn classes: https://github.com/apache/flink/pull/1741
> There are also changes planned for the client side to decouple the
> Yarn support from the job submission process and make it easier to
> integrate other frameworks (like Mesos). I don't think that will block
> your contribution since a lot of the logic is probably going to be
> contained in separate classes which can be integrated even when code
> changes. Let's just stay in sync.
>
> If you like, you could start off by opening an issue and submitting a
> short design document.
>
> Cheers,
> Max
>
> On Wed, Mar 23, 2016 at 3:55 PM, Stefano Baghino
>  wrote:
> > Hello everybody,
> >
> > some of us at Radicalbit spent the last few weeks experimenting to
> improve
> > the understanding of the compatibility of Flink with secure cluster
> > environments and with Kerberos in particular.
> >
> > We’ve found a possible area of improvement and would like to work on it
> as
> > part of our effort to contribute to Flink in the open: after a few tests
> > and a short exchange on the user mailing list
> > <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kerberos-on-YARN-delegation-or-proxying-tp5315p5318.html
> >
> > we’ve come to realize that currently a long-running session on YARN acts
> on
> > behalf of the user that originally ran the session, not the one who’s
> > submitting the job.
> >
> > We think it would be a nice improvement to be able to have a single Flink
> > session that keeps running with several users submitting their jobs with
> > their own credentials.
> >
> > We’d like to develop, test and document this improvement.
> >
> > Do you think this is feasible? Are there any blockers we should be aware
> of
> > before undertaking this task? Would this be something of interest for the
> > community? Are there any other ongoing efforts that aim toward this?
> >
> > We’d love to have the feedback of the community on this, thank you in
> > advance to anyone who’s willing to share their insight and opinion with
> us.
> >
> > --
> > BR,
> > Stefano Baghino
> >
> > Software Engineer @ Radicalbit
>


Apache Flink: aligning watermark among parallel tasks

2016-03-24 Thread Ozan DENİZ
We are using periodic event time window with watermark. We have currently 4 
parallel tasks in our Flink App.

During the streaming process, all the 4 tasks' watermark values must be close 
to trigger window event.





  For example;

  
  Task 1 watermark value = 8

  
  Task 2 watermark value = 1

  
  Task 3 watermark value = 8

  
  Task 4 watermark value = 8





Task 2 is waiting for log to update its watermark. However, 
the condition can occur before Task 2's update and we want to fire the 
window event before it. 




Is there any mechanism to align all the parallel tasks' watermarks or fire the 
window event without waiting for other tasks?

  

[jira] [Created] (FLINK-3665) Range partitioning lacks support to define sort orders

2016-03-24 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-3665:


 Summary: Range partitioning lacks support to define sort orders
 Key: FLINK-3665
 URL: https://issues.apache.org/jira/browse/FLINK-3665
 Project: Flink
  Issue Type: Improvement
  Components: DataSet API
Affects Versions: 1.0.0
Reporter: Fabian Hueske
 Fix For: 1.1.0


{{DataSet.partitionByRange()}} does not allow to specify the sort order of 
fields. This is fine if range partitioning is used to reduce skewed 
partitioning. 
However, it is not sufficient if range partitioning is used to sort a data set 
in parallel. 

Since {{DataSet.partitionByRange()}} is {{@Public}} API and cannot be easily 
changed, I propose to add a method {{withOrders(Order... orders)}} to 
{{PartitionOperator}}. The method should throw an exception if the partitioning 
method of {{PartitionOperator}} is not range partitioning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3666) Remove Nephele references

2016-03-24 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-3666:
---

 Summary: Remove Nephele references
 Key: FLINK-3666
 URL: https://issues.apache.org/jira/browse/FLINK-3666
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Chesnay Schepler
Priority: Trivial


There still exist a few references to nephele which should be removed:

{code}
flink\docs\setup\local_setup.md:
   79  $ tail log/flink-*-jobmanager-*.log
   80  INFO ... - Initializing memory manager with 409 megabytes of memory
   81: INFO ... - Trying to load 
org.apache.flinknephele.jobmanager.scheduler.local.LocalScheduler as scheduler
   82  INFO ... - Setting up web info server, using web-root directory ...
   83: INFO ... - Web info server will display information about nephele 
job-manager on localhost, port 8081.
   84  INFO ... - Starting web info server for JobManager on port 8081
   85  ~~~
   ..
  118  $ cd flink
  119  $ bin/start-local.sh
  120: Starting Nephele job manager
  121  ~~~
{code}

{code}
flink\flink-runtime\src\main\java\org\apache\flink\runtime\operators\TaskContext.java:
   70:  AbstractInvokable getOwningNepheleTask();
{code}

{code}
flink\flink-runtime\src\main\java\org\apache\flink\runtime\operators\BatchTask.java:
 1149* @param message The main message for the log.
 1150* @param taskName The name of the task.
 1151:   * @param parent The nephele task that contains the code producing the 
message.
 1152*
 1153* @return The string for logging.
 
 1254*/
 1255   @SuppressWarnings("unchecked")
 1256:  public static  Collector initOutputs(AbstractInvokable 
nepheleTask, ClassLoader cl, TaskConfig config,
 1257   
List> chainedTasksTarget,
 1258   
List> eventualOutputs,
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3667) Generalize client<->cluster communication

2016-03-24 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-3667:
-

 Summary: Generalize client<->cluster communication
 Key: FLINK-3667
 URL: https://issues.apache.org/jira/browse/FLINK-3667
 Project: Flink
  Issue Type: Improvement
  Components: YARN Client
Reporter: Maximilian Michels
Assignee: Maximilian Michels


Here are some notes I took when inspecting the client<->cluster classes with 
regard to future integration of other resource management frameworks in 
addition to Yarn (e.g. Mesos).

{noformat}

1 Cluster Client Abstraction


1.1 Status Quo
──

1.1.1 FlinkYarnClient
╌

  • Holds the cluster configuration (Flink-specific and Yarn-specific)
  • Contains the deploy() method to deploy the cluster
  • Creates the Hadoop Yarn client
  • Receives the initial job manager address
  • Bootstraps the FlinkYarnCluster


1.1.2 FlinkYarnCluster
╌╌

  • Wrapper around the Hadoop Yarn client
  • Queries cluster for status updates
  • Life time methods to start and shutdown the cluster
  • Flink specific features like shutdown after job completion


1.1.3 ApplicationClient
╌╌╌

  • Acts as a middle-man for asynchronous cluster communication
  • Designed to communicate with Yarn, not used in Standalone mode


1.1.4 CliFrontend
╌

  • Deeply integrated with FlinkYarnClient and FlinkYarnCluster
  • Constantly distinguishes between Yarn and Standalone mode
  • Would be nice to have a general abstraction in place


1.1.5 Client


  • Job submission and Job related actions, agnostic of resource framework


1.2 Proposal


1.2.1 ClusterConfig (before: AbstractFlinkYarnClient)
╌

  • Extensible cluster-agnostic config
  • May be extended by specific cluster, e.g. YarnClusterConfig


1.2.2 ClusterClient (before: AbstractFlinkYarnClient)
╌

  • Deals with cluster (RM) specific communication
  • Exposes framework agnostic information
  • YarnClusterClient, MesosClusterClient, StandaloneClusterClient


1.2.3 FlinkCluster (before: AbstractFlinkYarnCluster)
╌

  • Basic interface to communicate with a running cluster
  • Receives the ClusterClient for cluster-specific communication
  • Should not have to care about the specific implementations of the
client


1.2.4 ApplicationClient
╌╌╌

  • Can be changed to work cluster-agnostic (first steps already in
FLINK-3543)


1.2.5 CliFrontend
╌

  • CliFrontend does never have to differentiate between different
cluster types after it has determined which cluster class to load.
  • Base class handles framework agnostic command line arguments
  • Pluggables for Yarn, Mesos handle specific commands


{noformat}

I would like to create/refactor the affected classes to set us up for a more 
flexible client side resource management abstraction.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Streaming KV store abstraction

2016-03-24 Thread Nam-Luc Tran
>Sorry for the late answer, I completely missed this email. (Thanks Robert
for pointing out).
No problem ;)

>Now that you have everything set up, in flatMap1 (for events) you would
query the state : state.value() and enrich your data
>in flatMap2 you would update the state: state.update(newState)

In this example, how are the states in the enrichments stream (enrichments
= DataStream>) and the value state declared inside
YourCoFlatMap linked?

>in flatMap2 you would update the state: state.update(newState)
Wouldn't that only update the state declared in YourCoFlatMap, and not the
state in the enrichments stream?

Cheers,


2016-03-23 15:38 GMT+01:00 Gyula Fóra :

> Hi!
>
> Sorry for the late answer, I completely missed this email. (Thanks Robert
> for pointing out).
>
> You won't be able to use that project as it was dependent on an earlier
> snapshot version that still had completely different state semantics.
> I don't think it is realistic that I will re-implment this any time soon,
> but I think you can easily do what you want in the following way:
>
> Let's say you have 2 streams, the first contains the enrichment data per
> key let's say enrichments = DataStream> .
> The second stream is the event stream that you want to enrich: events =
> DataStream>
>
> To apply the enrichments the easiest is to use a CoFlatMap with a
> partitioned value state inside:
>
> events.connect(enrichments).keyBy(0,0).flatMap(new YourCoFlatMap())
>
> In this case if you declare a value state inside YourCoFlatMap it will be
> kept per key. For example in the open method:
> state = getRuntimeContext().getState(new ValueStateDescriptor("stateName",
> type, defaultValue)).
>
> Now that you have everything set up, in flatMap1 (for events) you would
> query the state : state.value() and enrich your data
> in flatMap2 you would update the state: state.update(newState)
>
> Does this make sense to you? Or is the use case completely different?
>
> Cheers,
> Gyula
>
> Nam-Luc Tran  ezt írta (időpont: 2016. márc. 18.,
> P, 18:25):
>
> > Hi Gyula,
> >
> > I'm currently looking after ways to enrich streams with external data.
> Have
> > you got any update on the topic in general or on StreamKV?
> >
> > I've checked out the code but it won't build, mainly because
> > StateCheckpointer has been removed since [FLINK-2808]. Any hint on a
> quick
> > replacement, before I dive in deeper?
> >
> > Cheers,
> >
> > 2015-09-15 20:29 GMT+02:00 Stephan Ewen :
> >
> > > I think that is actually a cool way to kick of an addition to the
> system.
> > > Gives you a lot of flexibility and releasing and testing...
> > >
> > > It helps, though, to upload maven artifacts for it!
> > >
> > > On Tue, Sep 15, 2015 at 7:18 PM, Gyula Fóra  wrote:
> > >
> > > > Hey All,
> > > >
> > > > We decided to make this a standalone library until it is stable
> enough
> > > and
> > > > then we can decide whether we want to keep it like that or include in
> > the
> > > > project:
> > > >
> > > > https://github.com/gyfora/StreamKV
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > > Gianmarco De Francisci Morales  ezt írta (időpont:
> > > 2015.
> > > > szept. 9., Sze, 20:25):
> > > >
> > > > > Yes, pretty clear. I guess semantically it's still a co-group, but
> > > > > implemented slightly differently.
> > > > >
> > > > > Thanks!
> > > > >
> > > > > --
> > > > > Gianmarco
> > > > >
> > > > > On 9 September 2015 at 15:37, Gyula Fóra 
> > wrote:
> > > > >
> > > > > > Hey Gianmarco,
> > > > > >
> > > > > > So the implementation looks something different:
> > > > > >
> > > > > > The update stream is received by a stateful KVStoreOperator which
> > > > stores
> > > > > > the K-V pairs as their partitioned state.
> > > > > >
> > > > > > The query for the 2 cities is assigned an ID yes, and is split to
> > > the 2
> > > > > > cities, and each of these are  sent to the same KVStoreOperator
> as
> > > the
> > > > > > update stream. The output is the value for each key practically
> > (qid,
> > > > > > city1, temp1) which is retreived from the operator state , and
> this
> > > > > output
> > > > > > is merged in a next operator to form the KV[] output on which the
> > > user
> > > > > can
> > > > > > execute the difference if he wants.
> > > > > >
> > > > > > So actually no co-group is happening although semantically it
> might
> > > be
> > > > > > similar. Instead I use stateful operators to be much more
> > efficient.
> > > > > >
> > > > > > Does this answer you question?
> > > > > >
> > > > > > Gyula
> > > > > >
> > > > > > Gianmarco De Francisci Morales  ezt írta
> > (időpont:
> > > > > 2015.
> > > > > > szept. 9., Sze, 14:29):
> > > > > >
> > > > > > > Just a silly question.
> > > > > > > For the example you described, in a data flow model, you would
> do
> > > > > > something
> > > > > > > like this:
> > > > > > >
> > > > > > > Have query ids added to the city pairs (qid, city1, city2),
> > > > > > > then split the query stream on the two cities and co-group it
> > with
> > > 

Re: RollingSink

2016-03-24 Thread Aljoscha Krettek
Hi,
(sending from my other handle since the apache mail relay seems to be down for 
me)

I’m not aware of anyone having tested the RollingSink with anything besides 
“hdfs://“ and “file://“. That the file is empty is strange. Is something like 
revokeLease() necessary for your custom HCFS?

Cheers,
Aljoscha
> On 23 Mar 2016, at 17:53, Vijay Srinivasaraghavan 
>  wrote:
> 
> Hi Aljoscha,
> It was my bad that I have copied some wrong class files during one of the 
> step. I have retried the same steps that I mentioned earlier and with that I 
> am able to see all the debug statements that I have added to the RollingSink..
> I have found another interesting issue here. I am using HCFS (Hadoop 
> Compatible File System) implementation of the filesystem that we have built 
> in-house (not stock HDFC). As part of the recovery process in the 
> restoreState() method of RollingSink class, we are trying to invoke 
> revokeLease() API which is available only in DistributedFileSystem (or any 
> inherited class) whereas the HCFS contact class that we have implemented is 
> FileSystem. Since the codepath will not invoke revokeLease() for our HCFS 
> implementation class, I am seeing the part file with empty content though the 
> file is renamed from "in-progress" to actual file name.
> Question: Do you know if RollingSink implementation is tested with any Hadoop 
> Compatible File System like GlusterFS, etc.,?  
> RegardsVijay 
> 
> On Wednesday, March 23, 2016 7:42 AM, Aljoscha Krettek  
> wrote:
> 
> 
> Hmm, that’s strange. Could you maybe send one of the TaskManager logs?
> 
> Cheers,
> Aljoscha
>> On 23 Mar 2016, at 15:28, Vijay  wrote:
>> 
>> Yes, I have updated on all cluster nodes and restarted entire cluster. 
>> 
>> Do you see any problems with the steps that I followed?
>> 
>> Regards,
>> Vijay
>> 
>> Sent from my iPhone
>> 
>>> On Mar 23, 2016, at 7:18 AM, Aljoscha Krettek  wrote:
>>> 
>>> Hi,
>>> did you update the log4j.properties file on all nodes where the 
>>> TaskManagers run and did you restart the whole cluster?
>>> 
>>> Cheers,
>>> Aljoscha
 On 23 Mar 2016, at 15:02, Vijay  wrote:
 
 Hi Aljoscha,
 
 I am using standalone flink cluster (3 node). I am running flink job by 
 submitting/uploading jar through Flink UI.
 
 I have built flink from maven and modified the RollingSink code to add new 
 debug statements.
 
 I have also packaged the streaming file system connector package 
 (including RollingSink changes) to the job jar file. Modified changes 
 include both Sytem.out as well as logger statements.
 
 Updated log4j property file to DEBUG
 
 Regards,
 Vijay
 
 Sent from my iPhone
 
> On Mar 23, 2016, at 6:48 AM, Aljoscha Krettek  wrote:
> 
> Hi,
> what where the steps you took? By the way, are you running this on yarn 
> or in standalone mode? How are you starting the Flink job? Do you still 
> don’t see DEBUG entries in the log?
> 
> Cheers,
> Aljoscha
>> On 23 Mar 2016, at 14:32, Vijay  wrote:
>> 
>> I have changed the properties file but it did not help.
>> 
>> Regards,
>> Vijay
>> 
>> Sent from my iPhone
>> 
>>> On Mar 23, 2016, at 5:39 AM, Aljoscha Krettek  
>>> wrote:
>>> 
>>> Ok, then you should be able to change the log level to DEBUG in 
>>> conf/log4j.properties.
>>> 
 On 23 Mar 2016, at 12:41, Vijay  wrote:
 
 I think only the ERROR category gets displayed in the log file
 
 Regards,
 Vijay
 
 Sent from my iPhone
 
> On Mar 23, 2016, at 2:30 AM, Aljoscha Krettek  
> wrote:
> 
> Hi,
> are you seeing the regular log output from the RollingSink in the 
> TaskManager logs?
> 
> Cheers,
> Aljoscha
>> On 22 Mar 2016, at 20:03, Vijay Srinivasaraghavan 
>>  wrote:
>> 
>> I have tried both log4j logger as well as System.out.println option 
>> but none of these worked. 
>> 
>> From what I have seen so far is the Filesystem streaming connector 
>> classes are not packaged in the grand jar 
>> (flink-dist_2.10-1.1-SNAPSHOT.jar) that is copied under 
>> /build-target/lib location as part of Flink maven build 
>> step.
>> 
>> So, I manually copied (overwrite) the compiled class files from 
>> org.apache.flink.streaming.connectors.fs package to the my "Flink 
>> job" distribution jar (otherwise it was using standard jars that are 
>> defined as mvn dependency in Articatory) and then uploaded the jar 
>> to Job Manager.
>> 
>> Am I missing something? How do I enable logging for the RollingSink 
>> class?
>> 
>> 
>> org.apache.flink
>> flink-connector-filesystem_2.11
>> ${flink.version}
>>

Re: RollingSink

2016-03-24 Thread Aljoscha Krettek
Hi,
I’m not aware of anyone having tested the RollingSink with anything besides
“hdfs://“ and “file://“. That the file is empty is strange. Is something
like revokeLease() necessary for your custom HCFS?

Cheers,
Aljoscha

On Wed, 23 Mar 2016 at 17:53 Vijay Srinivasaraghavan
 wrote:

> Hi Aljoscha,
> It was my bad that I have copied some wrong class files during one of the
> step. I have retried the same steps that I mentioned earlier and with that
> I am able to see all the debug statements that I have added to the
> RollingSink..
> I have found another interesting issue here. I am using HCFS (Hadoop
> Compatible File System) implementation of the filesystem that we have built
> in-house (not stock HDFC). As part of the recovery process in the
> restoreState() method of RollingSink class, we are trying to invoke
> revokeLease() API which is available only in DistributedFileSystem (or any
> inherited class) whereas the HCFS contact class that we have implemented is
> FileSystem. Since the codepath will not invoke revokeLease() for our HCFS
> implementation class, I am seeing the part file with empty content though
> the file is renamed from "in-progress" to actual file name.
> Question: Do you know if RollingSink implementation is tested with any
> Hadoop Compatible File System like GlusterFS, etc.,?
> RegardsVijay
>
> On Wednesday, March 23, 2016 7:42 AM, Aljoscha Krettek <
> aljos...@apache.org> wrote:
>
>
>  Hmm, that’s strange. Could you maybe send one of the TaskManager logs?
>
> Cheers,
> Aljoscha
> > On 23 Mar 2016, at 15:28, Vijay  wrote:
> >
> > Yes, I have updated on all cluster nodes and restarted entire cluster.
> >
> > Do you see any problems with the steps that I followed?
> >
> > Regards,
> > Vijay
> >
> > Sent from my iPhone
> >
> >> On Mar 23, 2016, at 7:18 AM, Aljoscha Krettek 
> wrote:
> >>
> >> Hi,
> >> did you update the log4j.properties file on all nodes where the
> TaskManagers run and did you restart the whole cluster?
> >>
> >> Cheers,
> >> Aljoscha
> >>> On 23 Mar 2016, at 15:02, Vijay  wrote:
> >>>
> >>> Hi Aljoscha,
> >>>
> >>> I am using standalone flink cluster (3 node). I am running flink job
> by submitting/uploading jar through Flink UI.
> >>>
> >>> I have built flink from maven and modified the RollingSink code to add
> new debug statements.
> >>>
> >>> I have also packaged the streaming file system connector package
> (including RollingSink changes) to the job jar file. Modified changes
> include both Sytem.out as well as logger statements.
> >>>
> >>> Updated log4j property file to DEBUG
> >>>
> >>> Regards,
> >>> Vijay
> >>>
> >>> Sent from my iPhone
> >>>
>  On Mar 23, 2016, at 6:48 AM, Aljoscha Krettek 
> wrote:
> 
>  Hi,
>  what where the steps you took? By the way, are you running this on
> yarn or in standalone mode? How are you starting the Flink job? Do you
> still don’t see DEBUG entries in the log?
> 
>  Cheers,
>  Aljoscha
> > On 23 Mar 2016, at 14:32, Vijay  wrote:
> >
> > I have changed the properties file but it did not help.
> >
> > Regards,
> > Vijay
> >
> > Sent from my iPhone
> >
> >> On Mar 23, 2016, at 5:39 AM, Aljoscha Krettek 
> wrote:
> >>
> >> Ok, then you should be able to change the log level to DEBUG in
> conf/log4j.properties.
> >>
> >>> On 23 Mar 2016, at 12:41, Vijay  wrote:
> >>>
> >>> I think only the ERROR category gets displayed in the log file
> >>>
> >>> Regards,
> >>> Vijay
> >>>
> >>> Sent from my iPhone
> >>>
>  On Mar 23, 2016, at 2:30 AM, Aljoscha Krettek <
> aljos...@apache.org> wrote:
> 
>  Hi,
>  are you seeing the regular log output from the RollingSink in the
> TaskManager logs?
> 
>  Cheers,
>  Aljoscha
> > On 22 Mar 2016, at 20:03, Vijay Srinivasaraghavan <
> vijikar...@yahoo.com> wrote:
> >
> > I have tried both log4j logger as well as System.out.println
> option but none of these worked.
> >
> > From what I have seen so far is the Filesystem streaming
> connector classes are not packaged in the grand jar
> (flink-dist_2.10-1.1-SNAPSHOT.jar) that is copied under
> /build-target/lib location as part of Flink maven build step.
> >
> > So, I manually copied (overwrite) the compiled class files from
> org.apache.flink.streaming.connectors.fs package to the my "Flink job"
> distribution jar (otherwise it was using standard jars that are defined as
> mvn dependency in Articatory) and then uploaded the jar to Job Manager.
> >
> > Am I missing something? How do I enable logging for the
> RollingSink class?
> >
> > 
> > org.apache.flink
> > flink-connector-filesystem_2.11
> > ${flink.version}
> > provided
> > 
> >
> >
> > On Tuesday, March 22, 2016 3:04 AM, Aljoscha Krettek <
> aljos...@apache.org> wrote:
> >>

Native iterations in PyFlink

2016-03-24 Thread Shannon Quinn

Hi all,

I'm looking at Flink for highly iterative ALS-like distributed 
computations, and the concept of native iteration support was very 
attractive. However, I notice that the Python API is missing this item. 
I'd absolutely be interested in adding that component if someone could 
point me in the right direction. Thanks!


Shannon


Re: Native iterations in PyFlink

2016-03-24 Thread Chesnay Schepler

Hello Shannon,

 you've picked yourself quite a feature there.

The following classes will be relevant:

 * Python
 o DataSet
 o OperationInfo
 o Environment (_send_operation method)
 o Constants._Identifier
 * Java
 o PythonPlanBinder
 o PythonOperationInfo

An (Python)OperationInfo is a generic container for all arguments that 
are required to define an operation.
OperationInfos store information about parent/children sets, and thus 
form a double-linked tree structure.
These objects are transferred 1:1 to Java (with the exception of a few 
internal fields).
The contained arguments are sent in Environment._send_operation, and 
received in the PythonOperationInfo constructor.


The DataSet class resembles the Java DataSet class. Every operation 
generates a OperationInfo; you'll mostly deal with API design, and how 
to store the relevant information inside a OperationInfo.


The PythonPlanBinder effectively iterates over all OperationInfos, and 
reconstructs the final Java program from them. Operations are
defined in the order they were defined in the Python API, and all 
created DataSets are stored in a Map of DataSetID -> DataSet.


For a batch iteration you will need to do the following:

 * add a iterate() method to the DataSet class
 o this method generates a new OperationInfo containing
   (_Identifier.ITERATION, ID of the dataset that it was applied
   on, iteration count)
 o should return an IterativeDataSet (new class)
 * a new IterativeDataSet class that extends the DataSet, offering a
   new closeWith methods
 o generate a new OperationInfo containing
   (_Identifier.ITERATION_CLOSE, ID of the dataSet it was applied
   on, ID's of the resultSet [ID of the terminationCriterion])
 * within PythonPlanBinder you'll have to add 2 new methods:
 o   createIterationOperation()
 + fetch DataSet to apply iterate on (sets.get(info.parentID))
 + apply dataSet.iterate(info.getIterationCount)
 + store resulting set in the map
 o   createIterationCloseOperation()
 + fetch the IterativeDataSet to apply closeWith on
 + fetch resultSet/terminationCriterion dataSet
 # you'll have to account for both closeWith(resultSet) and
   closeWith(resultSet, terminationCriteration)!
 + apply closeWith, store resulting set in the map

I never looked to deeply into iterations myself, as such I'm not sure if 
there will be issues at runtime. But for the API the above steps should 
point you in the right direction. Delta-iterations should follow a 
similar pattern.


Feel free to mail me directly if you need further help.

On 24.03.2016 21:20, Shannon Quinn wrote:

Hi all,

I'm looking at Flink for highly iterative ALS-like distributed 
computations, and the concept of native iteration support was very 
attractive. However, I notice that the Python API is missing this 
item. I'd absolutely be interested in adding that component if someone 
could point me in the right direction. Thanks!


Shannon





Re: RichMapPartitionFunction - problems with collect

2016-03-24 Thread Chesnay Schepler
Haven't looked to deeply into this, but this sounds like object reuse is 
enabled, at which point buffering values effectively causes you to store 
the same value multiple times.


can you try disabling objectReuse using 
env.getConfig().disableObjectReuse() ?


On 22.03.2016 16:53, Sergio Ramírez wrote:

Hi all,

I've been having some problems with RichMapPartitionFunction. Firstly, 
I tried to convert the iterable into an array unsuccessfully. Then, I 
have used some buffers to store the values per column. I am trying to  
transpose the local matrix of LabeledVectors that I have in each 
partition.


None of these solutions have worked. For example, for partition 7 and 
feature 10, the vector is empty, whereas for the same partition and 
feature 11, the vectors contains 200 elements. And this change on each 
execution, different partitions and features.


I think there is a problem with using the collect method out of the 
iterable loop.


new RichMapPartitionFunction[LabeledVector, ((Int, Int), 
Array[Byte])]() {
def mapPartition(it: java.lang.Iterable[LabeledVector], out: 
Collector[((Int, Int), Array[Byte])]): Unit = {

  val index = getRuntimeContext().getIndexOfThisSubtask()
  val mat = for (i <- 0 until nFeatures) yield new 
scala.collection.mutable.ListBuffer[Byte]

  for(reg <- it.asScala) {
for (i <- 0 until (nFeatures - 1)) mat(i) += 
reg.vector(i).toByte

mat(nFeatures - 1) += classMap(reg.label)
  }
  for(i <- 0 until nFeatures) out.collect((i, index) -> 
mat(i).toArray) // numPartitions

}
 }

Regards





Kerberos for Streaming & Kafka

2016-03-24 Thread Eron Wright
Hi,
Given the other thread about per-job Kerberos identity, now's a good time to 
discuss some problems with the current delegation-token approach, since the 
answer could bear on the per-job enhancement.
Two problems:Delegation tokens expire.  For a continuous streaming job to 
survive, the original keytab is needed to re-authenticate.   Spark Streaming 
solved this problem with `--keytab` on spark-submit (see 
AMDelegationTokenRenewer.scala).Kafka doesn't support delegation tokens yet 
(see KIP-48 and KAFKA-1696).
Thoughts?  Thanks!
- Eron Wright