Hi Guaowei,
Here is what the code for my pipeline looks like.
Class CoFlatMapFunc extends
CoFlatMapFunction<FileInput,KafkaInput,KafkaOutput>
{
ValueState<FileInput> cache;
public void open(Configuration parameters){
//initialize cache
}
//read element from file and update cache.
public void flatMap1(FileInput fileInput, Collector<KafkaOutput>
collector){
cache.update(fileInput);
}
//read element from kafka, look up cache and output tuple.
public void flatMap2(KafkaInput kafkaInput, Collector<KafkaOutput>
collector){
return new KafkaOutput(kafkaInput,cache.value());
}
}
// Old pipeline that works fine.
Class OldFlinkPipeline {
public SingleOutputStreamOperator<KafkaOutput>
generateOutput(StreamExecutionEnvironment env){
DataStream<KafkaInput> kafkaStream = env
.addSource(new KafkaSourceFunction());
return kafkaStream
.map(kafkaInput ->
new KafkaOutput(kafkaInput, null /*fileInput*/ );
}
}
//New pipeline that is consuming more than 4X the resources.
Class NewFlinkPipeline {
public SingleOutputStreamOperator<KafkaOutput>
generateOutput(StreamExecutionEnvironment env){
KeyedStream<KafkaInput,ID> kafkaStream = env
.addSource(new KafkaSourceFunction())
.keyBy(kafkaInput -> kafkaInput.getId());
KeyedStream<FileInput,ID> fileStream = env
.readTextFile("file.txt")
.keyBy(fileInput -> fileInput.getId());
return fileStream
.connect(kafkaStream)
.coFlatMap(new CoFlatMapFunc())
}
}
Please do let me know if this is the recommended way to connect a bounded
stream with an unbounded stream, or if I am doing something obviously
expensive here.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/