Hi Team,

I am running a query for Time Window Join as below

INSERT INTO sourceKafkaMalicious
SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
sourceKafka.`source.ip`=badips.ip
WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
'15' MINUTE AND CURRENT_TIMESTAMP;

Time windowed join, Flink SQL should automatically clear older records, Some
how the query does not clear the heapspace and fails with error after
sometime.

Can you please let me know what could go wrong, or is it a issue

Environment File chunks
--------------------------------------------------------------------------------------------------------------------------------------------------------------
tables:
  - name: sourceKafka
    type: source-table
    update-mode: append
    connector:
      type: kafka
      version: "universal"
      topic: test-data-flatten
      properties:
        - key: zookeeper.connect
          value: x.x.x.x:2181
        - key: bootstrap.servers
          value: x.x.x.x:9092
        - key: group.id
          value: testgroup
    format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'source.ip': {
               type: 'string'
            },
            'source.port': {
               type: 'string'
            }
          }
        }
      derive-schema: false
    schema:
      - name: ' source.ip '
        type: VARCHAR
      - name: 'source.port'
        type: VARCHAR

  - name: sourceKafkaMalicious
    type: sink-table
    update-mode: append
    connector:
      type: kafka
      version: "universal"
      topic: test-data-mal
      properties:
        - key: zookeeper.connect
          value: x.x.x.x:2181
        - key: bootstrap.servers
          value: x.x.x.x:9092
        - key: group.id
          value: testgroupmal
    format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'source.ip': {
               type: 'string'
            },
            'source.port': {
               type: 'string'
            }
          }
        }
      derive-schema: false
    schema:
      - name: ' source.ip '
        type: VARCHAR
      - name: 'source.port'
        type: VARCHAR

  - name: badips
    type: source-table
    #update-mode: append
    connector:
      type: filesystem
      path: "/home/cyanadmin/ipsum/levels/badips.csv"
    format:
      type: csv
      fields:
        - name: ip
          type: VARCHAR
      comment-prefix: "#"
    schema:
      - name: ip
        type: VARCHAR

execution:
  planner: blink
  type: streaming
  time-characteristic: event-time
  periodic-watermarks-interval: 200
  result-mode: table
  max-table-result-rows: 1000000
  parallelism: 3
  max-parallelism: 128
  min-idle-state-retention: 0
  max-idle-state-retention: 0
  restart-strategy:
    type: fallback

configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb
 Properties that describe the cluster to which table programs are submitted
to.

deployment:
  response-timeout: 5000

--------------------------------------------------------------------------------------------------------------------------------------------------------------

Reply via email to