Hi,

The Job1 is a simple ETL job and doesn’t consume much state size (only Kafka 
offset), so it should work well. 
The Job2 is an unbounded join which will put the two input stream data into 
state in Join operator. 
As the input stream is unlimited and 100GB per day as you described, if you are 
using Memory statebackend (which is the default one). 
Then the job will OOM at the end. 

Here are my answers:
>  1. How long does the data reside in my table once I read it? I consume
  100GB per day, should have been a retention policy right? If so, where do I
  configure and how?

The data is stored in state. You can specify the retention policy by setting
 “execution: min-idle-state-retention” and execution: max-idle-retention: “ 
keys[1]
 in environment file if you are using SQL CLI. 

>  2. Are retention policies specific to tables?

No. It affects to all the stateble non-window operations (e.g. GroupBy, Join)

>   3. I have a data set updates once a day. How about using UPSERT mode?
  If so, how could I delete the existing data set to load the new?

Flink SQL doesn’t support to load periodic-changed data set yet. Maybe you can 
achieve this by implementing custom source and operators in DataStream API.

Best,
Jark


[1]: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#environment-files


> 在 2019年9月13日,15:43,srikanth flink <flink.d...@gmail.com> 写道:
> 
> Hi there,
> 
> I came across Flink and FlinkSQL and using FlinkSQL for stream processing.
> Flink runs as 3 node cluster with embedded Zookeeper, given heap 80GB on
> each. I came across few issues and would like to get some clarification.
> 
>   - Job1: Using Flink(java) to read and flatten my JSON and write to Kafka
>   topic.
> 
> 
>   - Job2: Environment file configured to read from 2 different Kafka
>   topics. I get to join both the tables and are working. The query runs for a
>   while (say an hour) and then fails with *error*.
> 
> Questions:
> 
>   1. How long does the data reside in my table once I read it? I consume
>   100GB per day, should have been a retention policy right? If so, where do I
>   configure and how?
>   2. Are retention policies specific to tables?
>   3. I have a data set updates once a day. How about using UPSERT mode?
>   If so, how could I delete the existing data set to load the new?
> 
> 
> *Query*: SELECT s.* from sourceKafka AS s INNER JOIN badIp AS b ON
> s.`source.ip`=b.ip;
> *Error*: org.apache.flink.util.FlinkException: The assigned slot
> e57d1c0556b4a197eb44d7d9e83e1a47_6 was removed. at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlot(SlotManagerImpl.java:958)
> at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlots(SlotManagerImpl.java:928)
> at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalUnregisterTaskManager(SlotManagerImpl.java:1149)
> 
> *Environment File*:
> #==============================================================================
> # Tables
> #==============================================================================
> 
> # Define tables here such as sources, sinks, views, or temporal tables.
> tables:  # empty list
> # A typical table source definition looks like:
>  - name: sourceKafka
>    type: source-table
>    update-mode: append
>    connector:
>      type: kafka
>      version: "universal"     # required: valid connector versions are
>                      #   "0.8", "0.9", "0.10", "0.11", and "universal"
>      topic: recon-data-flatten          # required: topic name from which
> the table is read
> 
>      properties:         # optional: connector specific properties
>        - key: zookeeper.connect
>          value: 1.2.4.1:2181
>        - key: bootstrap.servers
>          value: 1.2.4.1:9092
>        - key: group.id
>          value: reconDataGroup
>    format:
>      type: json
>      fail-on-missing-field: false
>      json-schema: >
>        {
>          type: 'object',
>          properties: {
>            'source.ip': {
>              type: 'string'
>            },
>            'source.port': {
>              type: 'string'
>            },
>            'destination.ip': {
>              type: 'string'
>            },
>            'destination.port': {
>              type: 'string'
>            }
>          }
>        }
>      derive-schema: false
> 
>    schema:
>      - name: 'source.ip'
>        type: VARCHAR
>      - name: 'source.port'
>        type: VARCHAR
>      - name: 'destination.ip'
>        type: VARCHAR
>      - name: 'destination.port'
>        type: VARCHAR
> 
>  - name: badips
>    type: source-table
>    #update-mode: append
>    connector:
>      type: filesystem
>      path: "/home/ipsum/levels/badips.csv"
>    format:
>      type: csv
>      fields:
>        - name: ip
>          type: VARCHAR
>      comment-prefix: "#"
>    schema:
>      - name: ip
>        type: VARCHAR
> 
> #==============================================================================
> # Execution properties
> #==============================================================================
> 
> # Properties that change the fundamental execution behavior of a table
> program.
> 
> execution:
>  # select the implementation responsible for planning table programs
>  # possible values are 'old' (used by default) or 'blink'
>  planner: blink
>  # 'batch' or 'streaming' execution
>  type: streaming
>  # allow 'event-time' or only 'processing-time' in sources
>  time-characteristic: event-time
>  # interval in ms for emitting periodic watermarks
>  periodic-watermarks-interval: 200
>  # 'changelog' or 'table' presentation of results
>  result-mode: table
>  # maximum number of maintained rows in 'table' presentation of results
>  max-table-result-rows: 1000000
>  # parallelism of the program
>  parallelism: 3
>  # maximum parallelism
>  max-parallelism: 128
>  # minimum idle state retention in ms
>  min-idle-state-retention: 0
>  # maximum idle state retention in ms
>  max-idle-state-retention: 0
>  # current catalog ('default_catalog' by default)
>  #  current-catalog: default_catalog
>  # current database of the current catalog (default database of the
> catalog by default)
>  #current-database: default_database
>  # controls how table programs are restarted in case of a failures
>  restart-strategy:
>    # strategy type
>    # possible values are "fixed-delay", "failure-rate", "none", or
> "fallback" (default)
>    type: fallback
>    #attempts: 3
>    #delay: 5000
> 
> #==============================================================================
> # Configuration options
> #==============================================================================
> 
> # A full list of options and their default values can be found
> # on the dedicated "Configuration" page.
> configuration:
>  table.optimizer.join-reorder-enabled: true
>  table.exec.spill-compression.enabled: true
>  table.exec.spill-compression.block-size: 128kb
> 
> #==============================================================================
> # Deployment properties
> #==============================================================================
> 
> # Properties that describe the cluster to which table programs are
> submitted to.
> 
> deployment:
>  # general cluster communication timeout in ms
>  response-timeout: 5000

Reply via email to