Hello Shannon,
you've picked yourself quite a feature there.
The following classes will be relevant:
* Python
o DataSet
o OperationInfo
o Environment (_send_operation method)
o Constants._Identifier
* Java
o PythonPlanBinder
o PythonOperationInfo
An (Python)OperationInfo is a generic container for all arguments that
are required to define an operation.
OperationInfos store information about parent/children sets, and thus
form a double-linked tree structure.
These objects are transferred 1:1 to Java (with the exception of a few
internal fields).
The contained arguments are sent in Environment._send_operation, and
received in the PythonOperationInfo constructor.
The DataSet class resembles the Java DataSet class. Every operation
generates a OperationInfo; you'll mostly deal with API design, and how
to store the relevant information inside a OperationInfo.
The PythonPlanBinder effectively iterates over all OperationInfos, and
reconstructs the final Java program from them. Operations are
defined in the order they were defined in the Python API, and all
created DataSets are stored in a Map of DataSetID -> DataSet.
For a batch iteration you will need to do the following:
* add a iterate() method to the DataSet class
o this method generates a new OperationInfo containing
(_Identifier.ITERATION, ID of the dataset that it was applied
on, iteration count)
o should return an IterativeDataSet (new class)
* a new IterativeDataSet class that extends the DataSet, offering a
new closeWith methods
o generate a new OperationInfo containing
(_Identifier.ITERATION_CLOSE, ID of the dataSet it was applied
on, ID's of the resultSet [ID of the terminationCriterion])
* within PythonPlanBinder you'll have to add 2 new methods:
o createIterationOperation()
+ fetch DataSet to apply iterate on (sets.get(info.parentID))
+ apply dataSet.iterate(info.getIterationCount)
+ store resulting set in the map
o createIterationCloseOperation()
+ fetch the IterativeDataSet to apply closeWith on
+ fetch resultSet/terminationCriterion dataSet
# you'll have to account for both closeWith(resultSet) and
closeWith(resultSet, terminationCriteration)!
+ apply closeWith, store resulting set in the map
I never looked to deeply into iterations myself, as such I'm not sure if
there will be issues at runtime. But for the API the above steps should
point you in the right direction. Delta-iterations should follow a
similar pattern.
Feel free to mail me directly if you need further help.
On 24.03.2016 21:20, Shannon Quinn wrote:
Hi all,
I'm looking at Flink for highly iterative ALS-like distributed
computations, and the concept of native iteration support was very
attractive. However, I notice that the Python API is missing this
item. I'd absolutely be interested in adding that component if someone
could point me in the right direction. Thanks!
Shannon