[ 
https://issues.apache.org/jira/browse/FLINK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647955#comment-16647955
 ] 

ASF GitHub Bot commented on FLINK-10474:
----------------------------------------

fhueske commented on a change in pull request #6792: [FLINK-10474][table] Don't 
translate IN/NOT_IN to JOIN with VALUES
URL: https://github.com/apache/flink/pull/6792#discussion_r224778652
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
 ##########
 @@ -351,4 +351,43 @@ class CalcITCase extends AbstractTestBase {
       "{9=Comment#3}")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testIn(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
+      .as('a, 'b, 'c)
+      .where("b = 1 || b = 3 || b = 4 || b = 5 || b = 6")
+
+
+    val results = ds.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,1,Hi", "4,3,Hello world, how are you?", "5,3,I am fine.", "6,3,Luke 
Skywalker",
+      "7,4,Comment#1", "8,4,Comment#2", "9,4,Comment#3", "10,4,Comment#4", 
"11,5,Comment#5",
+      "12,5,Comment#6", "13,5,Comment#7", "14,5,Comment#8", "15,5,Comment#9", 
"16,6,Comment#10",
+      "17,6,Comment#11", "18,6,Comment#12", "19,6,Comment#13", 
"20,6,Comment#14", "21,6,Comment#15")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testNotIn(): Unit = {
 
 Review comment:
   Merge with existing

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Don't translate IN with Literals to JOIN with VALUES for streaming queries
> --------------------------------------------------------------------------
>
>                 Key: FLINK-10474
>                 URL: https://issues.apache.org/jira/browse/FLINK-10474
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>    Affects Versions: 1.6.1, 1.7.0
>            Reporter: Fabian Hueske
>            Assignee: Hequn Cheng
>            Priority: Major
>              Labels: pull-request-available
>
> IN predicates with literals are translated to JOIN with VALUES if the number 
> of elements in the IN clause exceeds a certain threshold. This should not be 
> done, because a streaming join is very heavy and materializes both inputs 
> (which is fine for the VALUES) input but not for the other.
> There are two ways to solve this:
>  # don't translate IN to a JOIN at all
>  # translate it to a JOIN but have a special join strategy if one input is 
> bound and final (non-updating)
> Option 1. should be easy to do, option 2. requires much more effort.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to