dianfu commented on a change in pull request #12902:
URL: https://github.com/apache/flink/pull/12902#discussion_r484217079



##########
File path: flink-python/pyflink/table/utils.py
##########
@@ -76,3 +79,61 @@ def to_expression_jarray(exprs):
     """
     gateway = get_gateway()
     return to_jarray(gateway.jvm.Expression, [expr._j_expr for expr in exprs])
+
+
+class CloseableIterator(object):

Review comment:
       Move this class to table_result?
   

##########
File path: flink-python/pyflink/table/utils.py
##########
@@ -76,3 +79,61 @@ def to_expression_jarray(exprs):
     """
     gateway = get_gateway()
     return to_jarray(gateway.jvm.Expression, [expr._j_expr for expr in exprs])
+
+
+class CloseableIterator(object):
+    """

Review comment:
       This current implementation of this class is actually not closable. 
Could you update it a bit as following:
   - Add a close() method
   - Add __enter__ and __exit__ methods
   - Add documentation in TableResult.collect describing that close should be 
called at the end. You could refer to the Java classes for more details
   - Add an example in TableResult.collect on how to use the returned result 
using the `with` statement
   

##########
File path: flink-python/pyflink/table/utils.py
##########
@@ -76,3 +79,61 @@ def to_expression_jarray(exprs):
     """
     gateway = get_gateway()
     return to_jarray(gateway.jvm.Expression, [expr._j_expr for expr in exprs])
+
+
+class CloseableIterator(object):
+    """
+    Representing an Iterator that is also auto closeable.
+    """
+    def __init__(self, j_closeable_iterator, field_data_types):
+        self._j_closeable_iterator = j_closeable_iterator
+        self._j_field_data_types = field_data_types
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        if not self._j_closeable_iterator.hasNext():
+            raise StopIteration("No more data.")
+        gateway = get_gateway()
+        pickle_bytes = gateway.jvm.PythonBridgeUtils. \
+            getPickledBytesFromRow(self._j_closeable_iterator.next(),
+                                   self._j_field_data_types)
+        pickle_bytes = list(pickle_bytes)
+        data_types = [_from_java_type(j_field_data_type)

Review comment:
       data_types could be generated in the constructor. There is no need to 
generate it for each element.

##########
File path: flink-python/pyflink/table/tests/test_table_environment_api.py
##########
@@ -445,6 +446,46 @@ def test_to_retract_stream(self):
                     "(True, <Row(2, 'Hello')>)"]
         self.assertEqual(result, expected)
 
+    def test_collect_for_all_data_types(self):

Review comment:
       could you make the tests runs both in blink planner and the legacy 
planner?

##########
File path: 
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -162,7 +171,42 @@
                return objs;
        }
 
+       private static List<byte[]> getPickledBytesFromRow(Row row, 
LogicalType[] dataTypes) throws IOException {
+               List<byte[]> pickledRowBytes = new ArrayList<>(row.getArity());
+               Pickler pickler = new Pickler();
+               for (int i = 0; i < row.getArity(); i++) {
+                       Object fieldData = row.getField(i);
+                       if (fieldData == null) {
+                               pickledRowBytes.add(new byte[0]);
+                       } else {
+                               if (dataTypes[i] instanceof DateType) {
+                                       long time = ((Date) 
fieldData).toLocalDate().toEpochDay();
+                                       
pickledRowBytes.add(pickler.dumps(time));
+                               } else if (dataTypes[i] instanceof TimeType) {
+                                       long time = ((Time) 
fieldData).toLocalTime().toNanoOfDay();
+                                       time = time / 1000;
+                                       
pickledRowBytes.add(pickler.dumps(time));
+                               } else if (dataTypes[i] instanceof RowType) {
+                                       Row tmpRow = (Row) fieldData;
+                                       LogicalType[] tmpRowFieldTypes = new 
LogicalType[tmpRow.getArity()];
+                                       ((RowType) 
dataTypes[i]).getChildren().toArray(tmpRowFieldTypes);

Review comment:
       What about change it as following:
   tmpRowFieldTypes = ((RowType) dataTypes[i]).getChildren().toArray(new 
LogicalType[0])

##########
File path: flink-python/pyflink/table/tests/test_table_environment_api.py
##########
@@ -445,6 +446,46 @@ def test_to_retract_stream(self):
                     "(True, <Row(2, 'Hello')>)"]
         self.assertEqual(result, expected)
 
+    def test_collect_for_all_data_types(self):
+
+        def collect_from_source(element_data, expected_output):
+            source = self.t_env.from_elements(element_data, ["a"])
+            result = source.execute().collect()
+            collected_result = []
+            for i in result:
+                collected_result.append(i)
+            self.assertEqual(expected_output, collected_result)
+
+        data_in_different_types = [True, 1, "a", u"a", datetime.date(1970, 1, 
1),
+                                   datetime.time(0, 0, 0), 1.0, [1], (1,), 
{"a": 1}, bytearray(1)]
+        expected_results = [Row([True]), Row([1]), Row(['a']), Row(['a']),
+                            Row([datetime.date(1970, 1, 1)]), 
Row([datetime.time(0, 0)]),
+                            Row([1.0]), Row([[1]]), Row([Row([1])]), 
Row([{'a': 1}]),
+                            Row([bytearray(b'\x00')])]
+        zipped_datas = zip(data_in_different_types, expected_results)
+        for data, expected_data in zipped_datas:
+            element_data = [Row(data) for _ in range(2)]
+            expected_output = [expected_data for _ in range(2)]
+            collect_from_source(element_data, expected_output)
+
+    def test_collect_with_retract(self):
+        element_data = [(1, 2, 'a'),
+                        (3, 4, 'b'),
+                        (5, 6, 'a'),
+                        (7, 8, 'b')]
+
+        source = self.t_env.from_elements(element_data, ["a", "b", "c"])
+        result = self.t_env.execute_sql("SELECT SUM(a), c FROM %s group by c" 
% source).collect()
+        collected_result = []
+        for i in result:
+            collected_result.append(i)
+        # The result contains one delete row and an insert row in retract 
operation for each key.

Review comment:
       It seems that we should consider the retraction flag also, that's the 
rowKind field?

##########
File path: 
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -162,7 +171,42 @@
                return objs;
        }
 
+       private static List<byte[]> getPickledBytesFromRow(Row row, 
LogicalType[] dataTypes) throws IOException {

Review comment:
       Should we call the initialize() method?

##########
File path: 
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -162,7 +171,42 @@
                return objs;
        }
 
+       private static List<byte[]> getPickledBytesFromRow(Row row, 
LogicalType[] dataTypes) throws IOException {
+               List<byte[]> pickledRowBytes = new ArrayList<>(row.getArity());
+               Pickler pickler = new Pickler();
+               for (int i = 0; i < row.getArity(); i++) {
+                       Object fieldData = row.getField(i);
+                       if (fieldData == null) {

Review comment:
       Could you add test case for this? That's None field.

##########
File path: 
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -162,7 +171,42 @@
                return objs;
        }
 
+       private static List<byte[]> getPickledBytesFromRow(Row row, 
LogicalType[] dataTypes) throws IOException {
+               List<byte[]> pickledRowBytes = new ArrayList<>(row.getArity());
+               Pickler pickler = new Pickler();
+               for (int i = 0; i < row.getArity(); i++) {
+                       Object fieldData = row.getField(i);
+                       if (fieldData == null) {
+                               pickledRowBytes.add(new byte[0]);
+                       } else {
+                               if (dataTypes[i] instanceof DateType) {
+                                       long time = ((Date) 
fieldData).toLocalDate().toEpochDay();
+                                       
pickledRowBytes.add(pickler.dumps(time));
+                               } else if (dataTypes[i] instanceof TimeType) {
+                                       long time = ((Time) 
fieldData).toLocalTime().toNanoOfDay();
+                                       time = time / 1000;
+                                       
pickledRowBytes.add(pickler.dumps(time));
+                               } else if (dataTypes[i] instanceof RowType) {
+                                       Row tmpRow = (Row) fieldData;
+                                       LogicalType[] tmpRowFieldTypes = new 
LogicalType[tmpRow.getArity()];
+                                       ((RowType) 
dataTypes[i]).getChildren().toArray(tmpRowFieldTypes);
+                                       List<byte[]> rowFieldBytes = 
getPickledBytesFromRow(tmpRow, tmpRowFieldTypes);
+                                       
pickledRowBytes.add(pickler.dumps(rowFieldBytes));
+                               } else {
+                                       
pickledRowBytes.add(pickler.dumps(row.getField(i)));

Review comment:
       It seems that the implementation doesn't consider the composite types 
such as ArrayType, MapType, etc, e.g. the element of an array is Row.

##########
File path: 
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -162,7 +171,42 @@
                return objs;
        }
 
+       private static List<byte[]> getPickledBytesFromRow(Row row, 
LogicalType[] dataTypes) throws IOException {
+               List<byte[]> pickledRowBytes = new ArrayList<>(row.getArity());
+               Pickler pickler = new Pickler();
+               for (int i = 0; i < row.getArity(); i++) {
+                       Object fieldData = row.getField(i);
+                       if (fieldData == null) {
+                               pickledRowBytes.add(new byte[0]);
+                       } else {
+                               if (dataTypes[i] instanceof DateType) {
+                                       long time = ((Date) 
fieldData).toLocalDate().toEpochDay();
+                                       
pickledRowBytes.add(pickler.dumps(time));
+                               } else if (dataTypes[i] instanceof TimeType) {
+                                       long time = ((Time) 
fieldData).toLocalTime().toNanoOfDay();
+                                       time = time / 1000;
+                                       
pickledRowBytes.add(pickler.dumps(time));
+                               } else if (dataTypes[i] instanceof RowType) {
+                                       Row tmpRow = (Row) fieldData;
+                                       LogicalType[] tmpRowFieldTypes = new 
LogicalType[tmpRow.getArity()];
+                                       ((RowType) 
dataTypes[i]).getChildren().toArray(tmpRowFieldTypes);
+                                       List<byte[]> rowFieldBytes = 
getPickledBytesFromRow(tmpRow, tmpRowFieldTypes);
+                                       
pickledRowBytes.add(pickler.dumps(rowFieldBytes));
+                               } else {

Review comment:
       Doesn't all types are supported? Otherwise, it would be great to throw 
meaningful exceptions if there are unsupported types.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to