Hi Fabian, Thanks for your reply I have a continuous stream of kafka coming and static table of badips. I wanted to segregate records having bad ip.
So therefore i was joining it. But with that 60 gb memory getting run out So i used below query. Can u please guide me in this regard On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > The query that you wrote is not a time-windowed join. > > INSERT INTO sourceKafkaMalicious > SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON > sourceKafka.`source.ip`=badips.ip > WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL > '15' MINUTE AND CURRENT_TIMESTAMP; > > The problem is the use of CURRENT_TIMESTAMP instead of a processing time > (or event time) attribute of badips. > > What exactly are you trying to achieve with the query? > > Best, Fabian > > Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta < > nishantgupta1...@gmail.com>: > >> Hi Team, >> >> I am running a query for Time Window Join as below >> >> INSERT INTO sourceKafkaMalicious >> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON >> sourceKafka.`source.ip`=badips.ip >> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL >> '15' MINUTE AND CURRENT_TIMESTAMP; >> >> Time windowed join, Flink SQL should automatically clear older records, Some >> how the query does not clear the heapspace and fails with error after >> sometime. >> >> Can you please let me know what could go wrong, or is it a issue >> >> Environment File chunks >> >> -------------------------------------------------------------------------------------------------------------------------------------------------------------- >> tables: >> - name: sourceKafka >> type: source-table >> update-mode: append >> connector: >> type: kafka >> version: "universal" >> topic: test-data-flatten >> properties: >> - key: zookeeper.connect >> value: x.x.x.x:2181 >> - key: bootstrap.servers >> value: x.x.x.x:9092 >> - key: group.id >> value: testgroup >> format: >> type: json >> fail-on-missing-field: false >> json-schema: > >> { >> type: 'object', >> properties: { >> 'source.ip': { >> type: 'string' >> }, >> 'source.port': { >> type: 'string' >> } >> } >> } >> derive-schema: false >> schema: >> - name: ' source.ip ' >> type: VARCHAR >> - name: 'source.port' >> type: VARCHAR >> >> - name: sourceKafkaMalicious >> type: sink-table >> update-mode: append >> connector: >> type: kafka >> version: "universal" >> topic: test-data-mal >> properties: >> - key: zookeeper.connect >> value: x.x.x.x:2181 >> - key: bootstrap.servers >> value: x.x.x.x:9092 >> - key: group.id >> value: testgroupmal >> format: >> type: json >> fail-on-missing-field: false >> json-schema: > >> { >> type: 'object', >> properties: { >> 'source.ip': { >> type: 'string' >> }, >> 'source.port': { >> type: 'string' >> } >> } >> } >> derive-schema: false >> schema: >> - name: ' source.ip ' >> type: VARCHAR >> - name: 'source.port' >> type: VARCHAR >> >> - name: badips >> type: source-table >> #update-mode: append >> connector: >> type: filesystem >> path: "/home/cyanadmin/ipsum/levels/badips.csv" >> format: >> type: csv >> fields: >> - name: ip >> type: VARCHAR >> comment-prefix: "#" >> schema: >> - name: ip >> type: VARCHAR >> >> execution: >> planner: blink >> type: streaming >> time-characteristic: event-time >> periodic-watermarks-interval: 200 >> result-mode: table >> max-table-result-rows: 1000000 >> parallelism: 3 >> max-parallelism: 128 >> min-idle-state-retention: 0 >> max-idle-state-retention: 0 >> restart-strategy: >> type: fallback >> >> configuration: >> table.optimizer.join-reorder-enabled: true >> table.exec.spill-compression.enabled: true >> table.exec.spill-compression.block-size: 128kb >> Properties that describe the cluster to which table programs are >> submitted to. >> >> deployment: >> response-timeout: 5000 >> >> >> -------------------------------------------------------------------------------------------------------------------------------------------------------------- >> >