Hi Team, I am running a query for Time Window Join as below
INSERT INTO sourceKafkaMalicious SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON sourceKafka.`source.ip`=badips.ip WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; Time windowed join, Flink SQL should automatically clear older records, Some how the query does not clear the heapspace and fails with error after sometime. Can you please let me know what could go wrong, or is it a issue Environment File chunks -------------------------------------------------------------------------------------------------------------------------------------------------------------- tables: - name: sourceKafka type: source-table update-mode: append connector: type: kafka version: "universal" topic: test-data-flatten properties: - key: zookeeper.connect value: x.x.x.x:2181 - key: bootstrap.servers value: x.x.x.x:9092 - key: group.id value: testgroup format: type: json fail-on-missing-field: false json-schema: > { type: 'object', properties: { 'source.ip': { type: 'string' }, 'source.port': { type: 'string' } } } derive-schema: false schema: - name: ' source.ip ' type: VARCHAR - name: 'source.port' type: VARCHAR - name: sourceKafkaMalicious type: sink-table update-mode: append connector: type: kafka version: "universal" topic: test-data-mal properties: - key: zookeeper.connect value: x.x.x.x:2181 - key: bootstrap.servers value: x.x.x.x:9092 - key: group.id value: testgroupmal format: type: json fail-on-missing-field: false json-schema: > { type: 'object', properties: { 'source.ip': { type: 'string' }, 'source.port': { type: 'string' } } } derive-schema: false schema: - name: ' source.ip ' type: VARCHAR - name: 'source.port' type: VARCHAR - name: badips type: source-table #update-mode: append connector: type: filesystem path: "/home/cyanadmin/ipsum/levels/badips.csv" format: type: csv fields: - name: ip type: VARCHAR comment-prefix: "#" schema: - name: ip type: VARCHAR execution: planner: blink type: streaming time-characteristic: event-time periodic-watermarks-interval: 200 result-mode: table max-table-result-rows: 1000000 parallelism: 3 max-parallelism: 128 min-idle-state-retention: 0 max-idle-state-retention: 0 restart-strategy: type: fallback configuration: table.optimizer.join-reorder-enabled: true table.exec.spill-compression.enabled: true table.exec.spill-compression.block-size: 128kb Properties that describe the cluster to which table programs are submitted to. deployment: response-timeout: 5000 --------------------------------------------------------------------------------------------------------------------------------------------------------------