dianfu commented on a change in pull request #8057: [FLINK-12028][table] Add 
`addColumns`,`renameColumns`, `dropColumns` …
URL: https://github.com/apache/flink/pull/8057#discussion_r269853922
 
 

 ##########
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala
 ##########
 @@ -451,6 +450,117 @@ class TableImpl(
     }
     tableName
   }
+
+  override def addColumns(fields: String): Table = {
+    addColumns(ExpressionParser.parseExpressionList(fields): _*);
+  }
+
+  override def addColumns(fields: Expression*): Table = {
+    addColumns(false, fields: _*)
+  }
+
+  override def addColumns(replaceIfExist: Boolean, fields: String): Table = {
+    addColumns(replaceIfExist, ExpressionParser.parseExpressionList(fields): 
_*)
+  }
+
+  override def addColumns(replaceIfExist: Boolean, fields: Expression*): Table 
= {
+
+    val aggNames = extractAggregationsAndProperties(
+      fields.map(expressionBridge.bridge), tableEnv)._1
+
+    if(aggNames.nonEmpty){
+      throw new TableException(
+        s"The added field expression cannot be an aggregations, find 
[${aggNames.head}].")
+    }
+
+    val childFields = logicalPlan.output.map(a => 
UnresolvedFieldReference(a.name))
+
+    if (replaceIfExist) {
+
+      val finalFields = new ListBuffer[Expression]()
+      val addFields = fields.map(expressionBridge.bridge)
+      childFields.foreach(e => finalFields.append(e))
+
+      // replace field if exist.
+      addFields.foreach {
+        case e@Alias(_, name, _) =>
+          val index = finalFields.indexWhere(p => p match {
+            case u: UnresolvedFieldReference => u.name.equalsIgnoreCase(name)
+            case a: Alias => a.name.equalsIgnoreCase(name)
+            case _ => false
+          })
+          if (index >= 0) {
+            finalFields(index) = e
+          } else {
+            finalFields.append(e)
+          }
+        case e =>
+          throw new TableException(
+            s"Should add an alias to the [$e], if replaceIfExist is true.")
+      }
+      select(finalFields: _*)
+    } else {
+      select(childFields ++ fields:_*)
+    }
+  }
+
+  override def renameColumns(fields: String): Table = {
+    renameColumns(ExpressionParser.parseExpressionList(fields): _*)
+  }
+
+  override def renameColumns(fields: Expression*): Table = {
+
+    val childFields = logicalPlan.output.map(a => 
UnresolvedFieldReference(a.name))
+    val finalFields = childFields.map(e => e.asInstanceOf[Expression]).toArray
+
+    // Rename existing fields
+    fields.map(expressionBridge.bridge).foreach {
+      case e@Alias(child: UnresolvedFieldReference, _, _) =>
+        val index = finalFields.indexWhere(p => p match {
+          case u: UnresolvedFieldReference => 
u.name.equalsIgnoreCase(child.name)
+          case _ => false
+        })
+        if (index >= 0) {
+          finalFields(index) = e
+        } else {
+          throw new TableException(s"Rename field [${child.name}] does not 
exist in source table.")
+        }
+      case e =>
+        throw new TableException(
+          s"Unexpected field expression type [$e]. " +
+            s"Renaming must add an alias to the original field, e.g., a as 
a1.")
+    }
+    select(finalFields: _*)
+  }
+
+  override def dropColumns(fields: String): Table = {
+    dropColumns(ExpressionParser.parseExpressionList(fields): _*)
+  }
+
+  override def dropColumns(fields: Expression*): Table = {
+    val childFields = logicalPlan.output.map(a => 
UnresolvedFieldReference(a.name))
+    val dropFields = fields.map(expressionBridge.bridge)
+
+    val finalFields = childFields.toBuffer
+
+    // Remove the fields which should be delete in the final list
+    dropFields.foreach {
+      case UnresolvedFieldReference(name) =>
+        val index = finalFields.indexWhere(p => p match {
+          case u: UnresolvedFieldReference => u.name.equalsIgnoreCase(name)
 
 Review comment:
   equalsIgnoreCase -> equals

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to