Hi Fabian,
Thanks for the information.
I have been reading about it and doing the same as a part of flink job
written in Java
I am using proctime for both the tables. Can you please verify once the
implementation of temporal tables
here is the snippet.
----------------------------
public class StreamingJob {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Properties kafkaConsumerProperties = new Properties();
kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cg54");
kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"latest");
kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
DataStream<String> badipStream = env.addSource(new
FlinkKafkaConsumer<>("badips", new SimpleStringSchema(),
kafkaConsumerProperties));
DataStream<String> badipStreamM =
badipStream
.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -686775202L;
@Override
public String map(String value) throws Exception {
try {
String[] v = value.split("\\t");
if(v.length > 1) {
return v[0].toString();
} else
return "0.0.0.0";
} catch (Exception e) {
System.err.println(e);
return "0.0.0.0";
}
}
});
Table badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip,
r_proctime.proctime");*
tableEnv.registerTable("BadIP", badipTable);
TemporalTableFunction badIPTT =
badipTable.createTemporalTableFunction("r_proctime", "bad_ip");
tableEnv.registerFunction("BadIPTT", badIPTT);
DataStream<ObjectNode> inKafkaStream = env
.addSource(new FlinkKafkaConsumer<>("tests", new
JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties));
DataStream<Tuple2<String,String>> inKafkaStreamM =
inKafkaStream
.rebalance()
.filter(value -> value != null)
.map(new MapFunction<ObjectNode, Tuple2<String,String>>() {
private static final long serialVersionUID = -6867120202L;
@Override
public Tuple2<String,String> map(ObjectNode node) throws Exception {
try {
ObjectNode nodeValue = (ObjectNode) node.get("value");
return new Tuple2<>(nodeValue.get("source.ip").asText(),
nodeValue.get("destination.ip").asText());
} catch (Exception e) {
System.err.println(e);
System.out.println(node);
return null;
}
}
});
Table kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp,
destinationIp, k_proctime.proctime"*);
tableEnv.registerTable("KafkaSource", kafkaSource);
* Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp,
K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
(BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");*
TupleTypeInfo<Tuple2<String, String>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.STRING());
DataStream<Tuple2<String,String>> outStreamMalicious =
tableEnv.toAppendStream(resultKafkaMalicious, tupleType);
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
ObjectMapper mapper = new ObjectMapper();
DataStream<String> sinkStreamMaliciousData = outStreamMalicious
.map(new MapFunction<Tuple2<String,String>,String>() {
private static final long serialVersionUID = -6347120202L;
@Override
public String map(Tuple2<String,String> tuple) throws Exception {
try {
ObjectNode node = mapper.createObjectNode();
node.put("source.ip", tuple.f0);
node.put("destination.ip", tuple.f1);
return node.toString();
} catch (Exception e) {
System.err.println(e);
System.out.println(tuple);
return null;
}
}
});
sinkStreamMaliciousData.addSink(new
FlinkKafkaProducer<>("recon-data-malicious", new SimpleStringSchema(),
kafkaProducerProperties));
env.execute("Flink List Matching");
}
-------------------------------------------------------
On Wed, Sep 18, 2019 at 6:09 PM Fabian Hueske <[email protected]> wrote:
> Hi Nishant,
>
> You should model the query as a join with a time-versioned table [1].
> The bad-ips table would be the time-time versioned table [2].
> Since it is a time-versioned table, it could even be updated with new IPs.
>
> This type of join will only keep the time-versioned table (the bad-ips in
> state) and not the other (high-volume) table.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html
>
> Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta <
> [email protected]>:
>
>> Hi Fabian,
>>
>> Thanks for your reply
>> I have a continuous stream of kafka coming and static table of badips. I
>> wanted to segregate records having bad ip.
>>
>> So therefore i was joining it. But with that 60 gb memory getting run out
>>
>> So i used below query.
>> Can u please guide me in this regard
>>
>> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <[email protected]> wrote:
>>
>>> Hi,
>>>
>>> The query that you wrote is not a time-windowed join.
>>>
>>> INSERT INTO sourceKafkaMalicious
>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>> sourceKafka.`source.ip`=badips.ip
>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>
>>> The problem is the use of CURRENT_TIMESTAMP instead of a processing time
>>> (or event time) attribute of badips.
>>>
>>> What exactly are you trying to achieve with the query?
>>>
>>> Best, Fabian
>>>
>>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
>>> [email protected]>:
>>>
>>>> Hi Team,
>>>>
>>>> I am running a query for Time Window Join as below
>>>>
>>>> INSERT INTO sourceKafkaMalicious
>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>>> sourceKafka.`source.ip`=badips.ip
>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>>
>>>> Time windowed join, Flink SQL should automatically clear older records,
>>>> Some
>>>> how the query does not clear the heapspace and fails with error after
>>>> sometime.
>>>>
>>>> Can you please let me know what could go wrong, or is it a issue
>>>>
>>>> Environment File chunks
>>>>
>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> tables:
>>>> - name: sourceKafka
>>>> type: source-table
>>>> update-mode: append
>>>> connector:
>>>> type: kafka
>>>> version: "universal"
>>>> topic: test-data-flatten
>>>> properties:
>>>> - key: zookeeper.connect
>>>> value: x.x.x.x:2181
>>>> - key: bootstrap.servers
>>>> value: x.x.x.x:9092
>>>> - key: group.id
>>>> value: testgroup
>>>> format:
>>>> type: json
>>>> fail-on-missing-field: false
>>>> json-schema: >
>>>> {
>>>> type: 'object',
>>>> properties: {
>>>> 'source.ip': {
>>>> type: 'string'
>>>> },
>>>> 'source.port': {
>>>> type: 'string'
>>>> }
>>>> }
>>>> }
>>>> derive-schema: false
>>>> schema:
>>>> - name: ' source.ip '
>>>> type: VARCHAR
>>>> - name: 'source.port'
>>>> type: VARCHAR
>>>>
>>>> - name: sourceKafkaMalicious
>>>> type: sink-table
>>>> update-mode: append
>>>> connector:
>>>> type: kafka
>>>> version: "universal"
>>>> topic: test-data-mal
>>>> properties:
>>>> - key: zookeeper.connect
>>>> value: x.x.x.x:2181
>>>> - key: bootstrap.servers
>>>> value: x.x.x.x:9092
>>>> - key: group.id
>>>> value: testgroupmal
>>>> format:
>>>> type: json
>>>> fail-on-missing-field: false
>>>> json-schema: >
>>>> {
>>>> type: 'object',
>>>> properties: {
>>>> 'source.ip': {
>>>> type: 'string'
>>>> },
>>>> 'source.port': {
>>>> type: 'string'
>>>> }
>>>> }
>>>> }
>>>> derive-schema: false
>>>> schema:
>>>> - name: ' source.ip '
>>>> type: VARCHAR
>>>> - name: 'source.port'
>>>> type: VARCHAR
>>>>
>>>> - name: badips
>>>> type: source-table
>>>> #update-mode: append
>>>> connector:
>>>> type: filesystem
>>>> path: "/home/cyanadmin/ipsum/levels/badips.csv"
>>>> format:
>>>> type: csv
>>>> fields:
>>>> - name: ip
>>>> type: VARCHAR
>>>> comment-prefix: "#"
>>>> schema:
>>>> - name: ip
>>>> type: VARCHAR
>>>>
>>>> execution:
>>>> planner: blink
>>>> type: streaming
>>>> time-characteristic: event-time
>>>> periodic-watermarks-interval: 200
>>>> result-mode: table
>>>> max-table-result-rows: 1000000
>>>> parallelism: 3
>>>> max-parallelism: 128
>>>> min-idle-state-retention: 0
>>>> max-idle-state-retention: 0
>>>> restart-strategy:
>>>> type: fallback
>>>>
>>>> configuration:
>>>> table.optimizer.join-reorder-enabled: true
>>>> table.exec.spill-compression.enabled: true
>>>> table.exec.spill-compression.block-size: 128kb
>>>> Properties that describe the cluster to which table programs are
>>>> submitted to.
>>>>
>>>> deployment:
>>>> response-timeout: 5000
>>>>
>>>>
>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>