Hi Team,

I am trying to Join [kafka stream] and [badip stream grouped with badip]

Can someone please help me out with verifying what is wrong in
highlighted query. Am I writing the time window join query wrong with this
use case.? Or it is a bug and i should report this
what is the work around, if it is a bug.

*Table1*
Table  kafkaSource = tableEnv.fromDataStream(inKafkaStreamM,
"sourceip,field1,field2, k_proctime.proctime")
tableEnv.registerTable("KafkaSource", kafkaSource);

*Table2*
Table  badipTable = tableEnv.fromDataStream(badipStreamM, "bad_ip,
b_proctime.proctime");
tableEnv.registerTable("BadIP", badipTable);

Table latestBadIps = tableEnv.sqlQuery("SELECT MAX(b_proctime) AS
mb_proctime, bad_ip FROM BadIP GROUP BY bad_ip HAVING MIN(b_proctime) >
CURRENT_TIMESTAMP - INTERVAL '2' DAY ");
tableEnv.registerTable("LatestBadIP", latestBadIps);

*Table3 - Join*
*Success for below query*
Table resultKafkaMalicious = tableEnv.sqlQuery("SELECT K.* FROM KafkaSource
AS K, LatestBadIP AS LB WHERE K.sourceip = LB.bad_ip");

*Failure for below query*
Table resultKafkaMalicious = tableEnv.sqlQuery("SELECT K.* FROM KafkaSource
AS K, LatestBadIP AS LB WHERE K.sourceip = LB.bad_ip AND LB.mb_proctime
BETWEEN K.k_proctime - INTERVAL '4' HOUR AND K.k_proctime + INTERVAL '10'
MINUTE");

*Error:*
14:25:25,230 INFO  org.apache.flink.runtime.taskmanager.Task
      - InnerJoin(where: (AND(=(sourceip, bad_ip), >=(mb_proctime,
-(PROCTIME(k_proctime), 14400000:INTERVAL HOUR)), <=(mb_proctime,
+(PROCTIME(k_proctime), 600000:INTERVAL MINUTE)))), join: (tlsversion,
tlscipher, tlscurve, tlsserver_name, tlsresumed, tlslast_alert,
tlsnext_protocol, tlsestablished, tlsclient_cert_chain_fuids, tlssubject,
tlsissuer, tlsclient_subject, tlsclient_issuer, tlsja3, ecsversion,
sourceip, sourceport, sourcegeolower, sourcegeoupper,
sourcegeocountry_iso_code, sourcegeocountry_name, sourcegeoregion_name,
sourcegeocity_name, sourcegeolocationlat, sourcegeolocationlon,
sourcegeozipcode, sourcegeotimezone, sourcegeoisp, sourcegeodomain,
sourcegeonetspeed, sourcegeoiddcode, sourcegeoareacode,
sourcegeoweatherstation_code, sourcegeoweatherstation_name, sourcegeomcc,
sourcegeomnc, sourcegeomobilebrand, sourcegeoelevation, sourcegeousagetype,
destinationip, destinationport, destinationgeolower, destinationgeoupper,
destinationgeocountry_iso_code, destinationgeocountry_name,
destinationgeoregion_name, destinationgeocity_name,
destinationgeolocationlat, destinationgeolocationlon,
destinationgeozipcode, destinationgeotimezone, destinationgeoisp,
destinationgeodomain, destinationgeonetspeed, destinationgeoiddcode,
destinationgeoareacode, destinationgeoweatherstation_code,
destinationgeoweatherstation_name, destinationgeomcc, destinationgeomnc,
destinationgeomobilebrand, destinationgeoelevation,
destinationgeousagetype, eventid, eventprovider, eventdataset, eventtype,
eventaction, organizationid, timestamp_received, clientmac, transactionid,
timestamp, message, dhcpassigned_ip, dhcplease_time, dnsrtt,
dnsquestionclass, dnsquestionname, dnsquestiontype, dnsquery,
dnsqtype_name, dnsresponse_code, dnsrcode_name, dnsheader_flags,
dnsanswersdata, dnsanswersttl, dnsrejected, networkprotocol, k_proctime,
mb_proctime, bad_ip)) -> select: (tlsversion, tlscipher, tlscurve,
tlsserver_name, tlsresumed, tlslast_alert, tlsnext_protocol,
tlsestablished, tlsclient_cert_chain_fuids, tlssubject, tlsissuer,
tlsclient_subject, tlsclient_issuer, tlsja3, ecsversion, sourceip,
sourceport, sourcegeolower, sourcegeoupper, sourcegeocountry_iso_code,
sourcegeocountry_name, sourcegeoregion_name, sourcegeocity_name,
sourcegeolocationlat, sourcegeolocationlon, sourcegeozipcode,
sourcegeotimezone, sourcegeoisp, sourcegeodomain, sourcegeonetspeed,
sourcegeoiddcode, sourcegeoareacode, sourcegeoweatherstation_code,
sourcegeoweatherstation_name, sourcegeomcc, sourcegeomnc,
sourcegeomobilebrand, sourcegeoelevation, sourcegeousagetype,
destinationip, destinationport, destinationgeolower, destinationgeoupper,
destinationgeocountry_iso_code, destinationgeocountry_name,
destinationgeoregion_name, destinationgeocity_name,
destinationgeolocationlat, destinationgeolocationlon,
destinationgeozipcode, destinationgeotimezone, destinationgeoisp,
destinationgeodomain, destinationgeonetspeed, destinationgeoiddcode,
destinationgeoareacode, destinationgeoweatherstation_code,
destinationgeoweatherstation_name, destinationgeomcc, destinationgeomnc,
destinationgeomobilebrand, destinationgeoelevation,
destinationgeousagetype, eventid, eventprovider, eventdataset, eventtype,
eventaction, organizationid, timestamp_received, clientmac, transactionid,
timestamp, message, dhcpassigned_ip, dhcplease_time, dnsrtt,
dnsquestionclass, dnsquestionname, dnsquestiontype, dnsquery,
dnsqtype_name, dnsresponse_code, dnsrcode_name, dnsheader_flags,
dnsanswersdata, dnsanswersttl, dnsrejected, networkprotocol,
PROCTIME(k_proctime) AS k_proctime) -> to: Tuple2 -> Map -> Sink: Unnamed
(4/8) (94e70bf5bb5b89ac5ae933c73c4b0353) switched from RUNNING to FAILED.
org.apache.flink.api.common.InvalidProgramException: *Table program cannot
be compiled. This is a bug. Please file an issue.*
at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at
org.apache.flink.table.runtime.join.NonWindowJoin.compile(NonWindowJoin.scala:46)
at
org.apache.flink.table.runtime.join.NonWindowJoin.open(NonWindowJoin.scala:75)
at
org.apache.flink.table.runtime.join.NonWindowInnerJoin.open(NonWindowInnerJoin.scala:53)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.co.LegacyKeyedCoProcessOperator.open(LegacyKeyedCoProcessOperator.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: org.codehaus.commons.compiler.CompileException: Line 957, Column
26: Unknown variable or type "ctx"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6773)
at org.codehaus.janino.UnitCompiler.access$13500(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$21.visitPackage(UnitCompiler.java:6385)
at org.codehaus.janino.UnitCompiler$21.visitPackage(UnitCompiler.java:6382)
at org.codehaus.janino.Java$Package.accept(Java.java:4237)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
at
org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7019)
at org.codehaus.janino.UnitCompiler.access$15700(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$21$2.visitMethodInvocation(UnitCompiler.java:6430)
at
org.codehaus.janino.UnitCompiler$21$2.visitMethodInvocation(UnitCompiler.java:6403)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
at
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580)
at org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503)
at
org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487)
at
org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
at
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
at
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
... 11 more

Reply via email to