Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3166#discussion_r101397995
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
---
@@ -98,4 +106,134 @@ object CommonTestData {
this(null, null)
}
}
+
+ def getMockTableEnvironment: TableEnvironment = new MockTableEnvironment
+
+ def getFilterableTableSource(
+ fieldNames: Array[String] = Array[String](
+ "name", "id", "amount", "price"),
+ fieldTypes: Array[TypeInformation[_]] = Array(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO)) = new
TestFilterableTableSource(fieldNames, fieldTypes)
+}
+
+class MockTableEnvironment extends TableEnvironment(new TableConfig) {
+
+ override private[flink] def writeToSink[T](table: Table, sink:
TableSink[T]): Unit = ???
+
+ override protected def checkValidTableName(name: String): Unit = ???
+
+ override protected def getBuiltInRuleSet: RuleSet = ???
+
+ override def sql(query: String): Table = ???
+
+ override def registerTableSource(name: String, tableSource:
TableSource[_]): Unit = ???
+}
+
+class TestFilterableTableSource(
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]])
+ extends BatchTableSource[Row]
+ with StreamTableSource[Row]
+ with FilterableTableSource
+ with DefinedFieldNames {
+
+ private var filterPredicate: Option[Expression] = None
+
+ /** Returns the data of the table as a [[DataSet]]. */
+ override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
+ execEnv.fromCollection[Row](
+ generateDynamicCollection(33, fieldNames, filterPredicate).asJava,
getReturnType)
+ }
+
+ /** Returns the data of the table as a [[DataStream]]. */
+ def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row]
= {
+ execEnv.fromCollection[Row](
+ generateDynamicCollection(33, fieldNames, filterPredicate).asJava,
getReturnType)
+ }
+
+ private def generateDynamicCollection(
+ num: Int,
+ fieldNames: Array[String],
+ predicate: Option[Expression]): Seq[Row] = {
+
+ if (predicate.isEmpty) {
+ throw new RuntimeException("filter expression was not set")
+ }
+
+ val literal = predicate.get.children.last
+ .asInstanceOf[Literal]
+ .value.asInstanceOf[Int]
+
+ def shouldCreateRow(value: Int): Boolean = {
+ value > literal
+ }
+
+ def createRow(row: Row, name: String, pos: Int, value: Int): Unit = {
--- End diff --
With hard-coded schema, this methods would not be necessary
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---