Re: Time Window Flink SQL join

2019-09-21 Thread Nishant Gupta
Hi Fabian, I am facing an issue if run multiple queries like this: Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp, K.destinationIp FROM KafkaSource AS K, LATERAL TABLE (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip"); Table resultKafkaSafe = tableEnv.sqlQuery( "SELECT K

Re: Time Window Flink SQL join

2019-09-20 Thread Nishant Gupta
Use case is similar. But not able to check heap space issue, as data size is small. Thought of mean while checking with you. Thanks for looking into it. Really appreciate it. I have marked the usage of temporal tables in bold red for ease of reference. On Fri, Sep 20, 2019, 8:18 PM Fabian Hueske

Re: Time Window Flink SQL join

2019-09-20 Thread Fabian Hueske
Hi, This looks OK on the first sight. Is it doing what you expect? Fabian Am Fr., 20. Sept. 2019 um 16:29 Uhr schrieb Nishant Gupta < nishantgupta1...@gmail.com>: > Hi Fabian, > > Thanks for the information. > I have been reading about it and doing the same as a part of flink job > written in J

Re: Time Window Flink SQL join

2019-09-20 Thread Nishant Gupta
Hi Fabian, Thanks for the information. I have been reading about it and doing the same as a part of flink job written in Java I am using proctime for both the tables. Can you please verify once the implementation of temporal tables here is the snippet. public class

Re: Time Window Flink SQL join

2019-09-18 Thread Fabian Hueske
Hi Nishant, You should model the query as a join with a time-versioned table [1]. The bad-ips table would be the time-time versioned table [2]. Since it is a time-versioned table, it could even be updated with new IPs. This type of join will only keep the time-versioned table (the bad-ips in stat

Re: Time Window Flink SQL join

2019-09-18 Thread Nishant Gupta
Hi Fabian, Thanks for your reply I have a continuous stream of kafka coming and static table of badips. I wanted to segregate records having bad ip. So therefore i was joining it. But with that 60 gb memory getting run out So i used below query. Can u please guide me in this regard On Wed, 18 S

Re: Time Window Flink SQL join

2019-09-18 Thread Fabian Hueske
Hi, The query that you wrote is not a time-windowed join. INSERT INTO sourceKafkaMalicious SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON sourceKafka.`source.ip`=badips.ip WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; The

Time Window Flink SQL join

2019-09-18 Thread Nishant Gupta
Hi Team, I am running a query for Time Window Join as below INSERT INTO sourceKafkaMalicious SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON sourceKafka.`source.ip`=badips.ip WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; T