Hello Shannon,

 you've picked yourself quite a feature there.

The following classes will be relevant:

 * Python
     o DataSet
     o OperationInfo
     o Environment (_send_operation method)
     o Constants._Identifier
 * Java
     o PythonPlanBinder
     o PythonOperationInfo

An (Python)OperationInfo is a generic container for all arguments that are required to define an operation. OperationInfos store information about parent/children sets, and thus form a double-linked tree structure. These objects are transferred 1:1 to Java (with the exception of a few internal fields). The contained arguments are sent in Environment._send_operation, and received in the PythonOperationInfo constructor.

The DataSet class resembles the Java DataSet class. Every operation generates a OperationInfo; you'll mostly deal with API design, and how to store the relevant information inside a OperationInfo.

The PythonPlanBinder effectively iterates over all OperationInfos, and reconstructs the final Java program from them. Operations are defined in the order they were defined in the Python API, and all created DataSets are stored in a Map of DataSetID -> DataSet.

For a batch iteration you will need to do the following:

 * add a iterate() method to the DataSet class
     o this method generates a new OperationInfo containing
       (_Identifier.ITERATION, ID of the dataset that it was applied
       on, iteration count)
     o should return an IterativeDataSet (new class)
 * a new IterativeDataSet class that extends the DataSet, offering a
   new closeWith methods
     o generate a new OperationInfo containing
       (_Identifier.ITERATION_CLOSE, ID of the dataSet it was applied
       on, ID's of the resultSet [ID of the terminationCriterion])
 * within PythonPlanBinder you'll have to add 2 new methods:
     o   createIterationOperation()
         + fetch DataSet to apply iterate on (sets.get(info.parentID))
         + apply dataSet.iterate(info.getIterationCount)
         + store resulting set in the map
     o   createIterationCloseOperation()
         + fetch the IterativeDataSet to apply closeWith on
         + fetch resultSet/terminationCriterion dataSet
             # you'll have to account for both closeWith(resultSet) and
               closeWith(resultSet, terminationCriteration)!
         + apply closeWith, store resulting set in the map

I never looked to deeply into iterations myself, as such I'm not sure if there will be issues at runtime. But for the API the above steps should point you in the right direction. Delta-iterations should follow a similar pattern.

Feel free to mail me directly if you need further help.

On 24.03.2016 21:20, Shannon Quinn wrote:
Hi all,

I'm looking at Flink for highly iterative ALS-like distributed computations, and the concept of native iteration support was very attractive. However, I notice that the Python API is missing this item. I'd absolutely be interested in adding that component if someone could point me in the right direction. Thanks!

Shannon


Reply via email to