Hi Nishant, On a brief look. I think this is a problem with your 2nd query:
> > *Table2*... > Table latestBadIps = tableEnv.sqlQuery("SELECT MAX(b_proctime) AS > mb_proctime, bad_ip FROM BadIP ***GROUP BY bad_ip***HAVING > MIN(b_proctime) > CURRENT_TIMESTAMP - INTERVAL '2' DAY "); > tableEnv.registerTable("LatestBadIP", latestBadIps); This SQL statement states that the table is a ending and thus your final table generates a nonWindowJoin. If I understood you correctly, you were trying to emit some sort of bad IP address within a specific time window until it is last seen 2 days ago? What I am assuming you were trying to do is something similar to the OverWindowAggregate[1]. Similar to: "SELECT MAX(b_proctime) OVER ( PARTITION BY bad_ip RANGE BETWEEN INTERVAL '2' DAY PRECEDING AND CURRENT ROW ) FROM BadIP" Thanks, Rong [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#aggregations On Mon, Sep 30, 2019 at 2:17 AM Nishant Gupta <nishantgupta1...@gmail.com> wrote: > Hi Team, > > I am trying to Join [kafka stream] and [badip stream grouped with badip] > > Can someone please help me out with verifying what is wrong in > highlighted query. Am I writing the time window join query wrong with this > use case.? Or it is a bug and i should report this > what is the work around, if it is a bug. > > *Table1* > Table kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, > "sourceip,field1,field2, k_proctime.proctime") > tableEnv.registerTable("KafkaSource", kafkaSource); > > *Table2* > Table badipTable = tableEnv.fromDataStream(badipStreamM, "bad_ip, > b_proctime.proctime"); > tableEnv.registerTable("BadIP", badipTable); > > Table latestBadIps = tableEnv.sqlQuery("SELECT MAX(b_proctime) AS > mb_proctime, bad_ip FROM BadIP GROUP BY bad_ip HAVING MIN(b_proctime) > > CURRENT_TIMESTAMP - INTERVAL '2' DAY "); > tableEnv.registerTable("LatestBadIP", latestBadIps); > > *Table3 - Join* > *Success for below query* > Table resultKafkaMalicious = tableEnv.sqlQuery("SELECT K.* FROM > KafkaSource AS K, LatestBadIP AS LB WHERE K.sourceip = LB.bad_ip"); > > *Failure for below query* > Table resultKafkaMalicious = tableEnv.sqlQuery("SELECT K.* FROM > KafkaSource AS K, LatestBadIP AS LB WHERE K.sourceip = LB.bad_ip AND > LB.mb_proctime BETWEEN K.k_proctime - INTERVAL '4' HOUR AND K.k_proctime + > INTERVAL '10' MINUTE"); > > *Error:* > 14:25:25,230 INFO org.apache.flink.runtime.taskmanager.Task > - InnerJoin(where: (AND(=(sourceip, bad_ip), >=(mb_proctime, > -(PROCTIME(k_proctime), 14400000:INTERVAL HOUR)), <=(mb_proctime, > +(PROCTIME(k_proctime), 600000:INTERVAL MINUTE)))), join: (tlsversion, > tlscipher, tlscurve, tlsserver_name, tlsresumed, tlslast_alert, > tlsnext_protocol, tlsestablished, tlsclient_cert_chain_fuids, tlssubject, > tlsissuer, tlsclient_subject, tlsclient_issuer, tlsja3, ecsversion, > sourceip, sourceport, sourcegeolower, sourcegeoupper, > sourcegeocountry_iso_code, sourcegeocountry_name, sourcegeoregion_name, > sourcegeocity_name, sourcegeolocationlat, sourcegeolocationlon, > sourcegeozipcode, sourcegeotimezone, sourcegeoisp, sourcegeodomain, > sourcegeonetspeed, sourcegeoiddcode, sourcegeoareacode, > sourcegeoweatherstation_code, sourcegeoweatherstation_name, sourcegeomcc, > sourcegeomnc, sourcegeomobilebrand, sourcegeoelevation, sourcegeousagetype, > destinationip, destinationport, destinationgeolower, destinationgeoupper, > destinationgeocountry_iso_code, destinationgeocountry_name, > destinationgeoregion_name, destinationgeocity_name, > destinationgeolocationlat, destinationgeolocationlon, > destinationgeozipcode, destinationgeotimezone, destinationgeoisp, > destinationgeodomain, destinationgeonetspeed, destinationgeoiddcode, > destinationgeoareacode, destinationgeoweatherstation_code, > destinationgeoweatherstation_name, destinationgeomcc, destinationgeomnc, > destinationgeomobilebrand, destinationgeoelevation, > destinationgeousagetype, eventid, eventprovider, eventdataset, eventtype, > eventaction, organizationid, timestamp_received, clientmac, transactionid, > timestamp, message, dhcpassigned_ip, dhcplease_time, dnsrtt, > dnsquestionclass, dnsquestionname, dnsquestiontype, dnsquery, > dnsqtype_name, dnsresponse_code, dnsrcode_name, dnsheader_flags, > dnsanswersdata, dnsanswersttl, dnsrejected, networkprotocol, k_proctime, > mb_proctime, bad_ip)) -> select: (tlsversion, tlscipher, tlscurve, > tlsserver_name, tlsresumed, tlslast_alert, tlsnext_protocol, > tlsestablished, tlsclient_cert_chain_fuids, tlssubject, tlsissuer, > tlsclient_subject, tlsclient_issuer, tlsja3, ecsversion, sourceip, > sourceport, sourcegeolower, sourcegeoupper, sourcegeocountry_iso_code, > sourcegeocountry_name, sourcegeoregion_name, sourcegeocity_name, > sourcegeolocationlat, sourcegeolocationlon, sourcegeozipcode, > sourcegeotimezone, sourcegeoisp, sourcegeodomain, sourcegeonetspeed, > sourcegeoiddcode, sourcegeoareacode, sourcegeoweatherstation_code, > sourcegeoweatherstation_name, sourcegeomcc, sourcegeomnc, > sourcegeomobilebrand, sourcegeoelevation, sourcegeousagetype, > destinationip, destinationport, destinationgeolower, destinationgeoupper, > destinationgeocountry_iso_code, destinationgeocountry_name, > destinationgeoregion_name, destinationgeocity_name, > destinationgeolocationlat, destinationgeolocationlon, > destinationgeozipcode, destinationgeotimezone, destinationgeoisp, > destinationgeodomain, destinationgeonetspeed, destinationgeoiddcode, > destinationgeoareacode, destinationgeoweatherstation_code, > destinationgeoweatherstation_name, destinationgeomcc, destinationgeomnc, > destinationgeomobilebrand, destinationgeoelevation, > destinationgeousagetype, eventid, eventprovider, eventdataset, eventtype, > eventaction, organizationid, timestamp_received, clientmac, transactionid, > timestamp, message, dhcpassigned_ip, dhcplease_time, dnsrtt, > dnsquestionclass, dnsquestionname, dnsquestiontype, dnsquery, > dnsqtype_name, dnsresponse_code, dnsrcode_name, dnsheader_flags, > dnsanswersdata, dnsanswersttl, dnsrejected, networkprotocol, > PROCTIME(k_proctime) AS k_proctime) -> to: Tuple2 -> Map -> Sink: Unnamed > (4/8) (94e70bf5bb5b89ac5ae933c73c4b0353) switched from RUNNING to FAILED. > org.apache.flink.api.common.InvalidProgramException: *Table program > cannot be compiled. This is a bug. Please file an issue.* > at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.join.NonWindowJoin.compile(NonWindowJoin.scala:46) > at > org.apache.flink.table.runtime.join.NonWindowJoin.open(NonWindowJoin.scala:75) > at > org.apache.flink.table.runtime.join.NonWindowInnerJoin.open(NonWindowInnerJoin.scala:53) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.co.LegacyKeyedCoProcessOperator.open(LegacyKeyedCoProcessOperator.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.base/java.lang.Thread.run(Thread.java:844) > Caused by: org.codehaus.commons.compiler.CompileException: Line 957, > Column 26: Unknown variable or type "ctx" > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6773) > at org.codehaus.janino.UnitCompiler.access$13500(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$21.visitPackage(UnitCompiler.java:6385) > at org.codehaus.janino.UnitCompiler$21.visitPackage(UnitCompiler.java:6382) > at org.codehaus.janino.Java$Package.accept(Java.java:4237) > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768) > at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410) > at > org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407) > at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213) > at > org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407) > at > org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403) > at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137) > at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) > at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) > at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7019) > at org.codehaus.janino.UnitCompiler.access$15700(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$21$2.visitMethodInvocation(UnitCompiler.java:6430) > at > org.codehaus.janino.UnitCompiler$21$2.visitMethodInvocation(UnitCompiler.java:6403) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) > at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) > at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) > at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580) > at org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487) > at > org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) > at > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) > at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) > at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33) > ... 11 more >