Hi Fabian,

Thanks for your reply
I have a continuous stream of kafka coming and static table of badips. I
wanted to segregate records having bad ip.

So therefore i was joining it. But with that 60 gb memory getting run out

So i used below query.
Can u please guide me in this regard

On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> The query that you wrote is not a time-windowed join.
>
> INSERT INTO sourceKafkaMalicious
> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
> sourceKafka.`source.ip`=badips.ip
> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
> '15' MINUTE AND CURRENT_TIMESTAMP;
>
> The problem is the use of CURRENT_TIMESTAMP instead of a processing time
> (or event time) attribute of badips.
>
> What exactly are you trying to achieve with the query?
>
> Best, Fabian
>
> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
> nishantgupta1...@gmail.com>:
>
>> Hi Team,
>>
>> I am running a query for Time Window Join as below
>>
>> INSERT INTO sourceKafkaMalicious
>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>> sourceKafka.`source.ip`=badips.ip
>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
>> '15' MINUTE AND CURRENT_TIMESTAMP;
>>
>> Time windowed join, Flink SQL should automatically clear older records, Some
>> how the query does not clear the heapspace and fails with error after
>> sometime.
>>
>> Can you please let me know what could go wrong, or is it a issue
>>
>> Environment File chunks
>>
>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>> tables:
>>   - name: sourceKafka
>>     type: source-table
>>     update-mode: append
>>     connector:
>>       type: kafka
>>       version: "universal"
>>       topic: test-data-flatten
>>       properties:
>>         - key: zookeeper.connect
>>           value: x.x.x.x:2181
>>         - key: bootstrap.servers
>>           value: x.x.x.x:9092
>>         - key: group.id
>>           value: testgroup
>>     format:
>>       type: json
>>       fail-on-missing-field: false
>>       json-schema: >
>>         {
>>           type: 'object',
>>           properties: {
>>             'source.ip': {
>>                type: 'string'
>>             },
>>             'source.port': {
>>                type: 'string'
>>             }
>>           }
>>         }
>>       derive-schema: false
>>     schema:
>>       - name: ' source.ip '
>>         type: VARCHAR
>>       - name: 'source.port'
>>         type: VARCHAR
>>
>>   - name: sourceKafkaMalicious
>>     type: sink-table
>>     update-mode: append
>>     connector:
>>       type: kafka
>>       version: "universal"
>>       topic: test-data-mal
>>       properties:
>>         - key: zookeeper.connect
>>           value: x.x.x.x:2181
>>         - key: bootstrap.servers
>>           value: x.x.x.x:9092
>>         - key: group.id
>>           value: testgroupmal
>>     format:
>>       type: json
>>       fail-on-missing-field: false
>>       json-schema: >
>>         {
>>           type: 'object',
>>           properties: {
>>             'source.ip': {
>>                type: 'string'
>>             },
>>             'source.port': {
>>                type: 'string'
>>             }
>>           }
>>         }
>>       derive-schema: false
>>     schema:
>>       - name: ' source.ip '
>>         type: VARCHAR
>>       - name: 'source.port'
>>         type: VARCHAR
>>
>>   - name: badips
>>     type: source-table
>>     #update-mode: append
>>     connector:
>>       type: filesystem
>>       path: "/home/cyanadmin/ipsum/levels/badips.csv"
>>     format:
>>       type: csv
>>       fields:
>>         - name: ip
>>           type: VARCHAR
>>       comment-prefix: "#"
>>     schema:
>>       - name: ip
>>         type: VARCHAR
>>
>> execution:
>>   planner: blink
>>   type: streaming
>>   time-characteristic: event-time
>>   periodic-watermarks-interval: 200
>>   result-mode: table
>>   max-table-result-rows: 1000000
>>   parallelism: 3
>>   max-parallelism: 128
>>   min-idle-state-retention: 0
>>   max-idle-state-retention: 0
>>   restart-strategy:
>>     type: fallback
>>
>> configuration:
>>   table.optimizer.join-reorder-enabled: true
>>   table.exec.spill-compression.enabled: true
>>   table.exec.spill-compression.block-size: 128kb
>>  Properties that describe the cluster to which table programs are
>> submitted to.
>>
>> deployment:
>>   response-timeout: 5000
>>
>>
>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>
>

Reply via email to