Hi,
按照现在通用的设计应该是不行的。要么用两个comsumer读取后union;要么魔改下comsumer的代码,在真正数据拉取时用不同的aksk去读。
--
Best!
Xuyang
在 2024-01-09 14:49:35,"somebody someone" <[email protected]> 写道:
>问题:目前使用Flink版本1.12
>需要接入01和02两个topic,属于同一集群,但是数据方给的两个topic的
>jass的用户名username和密码password不一样,其他认证信息都一样,不想用两个Consumer去分别读取,
>怎么用同一个source 方式对接这种配置文件不一样的。
>
>这个上面也有人提出过,也没有想要的。
>https://stackoverflow.com/questions/38989443/flink-how-to-read-from-multiple-kafka-cluster-using-same-streamexecutionenviron
>
>
>StreamExecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment();
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers",
>"101.xxx.156.xxx:9097");
> properties.setProperty("group.id", "test_zj");
> properties.setProperty("security.protocol",
>"SASL_SSL");
> properties.setProperty("sasl.mechanism", "PLAIN");
> properties.setProperty("sasl.jaas.config",
>"org.apache.kafka.common.security.plain.PlainLoginModule required
>username=\"xxx-9dc9-xxxxxxx\" password=\"xxxx-4CdNkCo$5b=xxx";");
> properties.setProperty("ssl.truststore.location",
>"jks/client.truststore.jks");
> properties.setProperty("ssl.truststore.password",
>"dmxxx");
>
>
> FlinkKafkaConsumer<String>
>stringFlinkKafkaConsumer = new FlinkKafkaConsumer<>(
>
>Arrays.asList("Topic01","topic02"),
> new
>SimpleStringSchema(),
> properties
> );
>
>
> DataStreamSource<String> source =
>env.addSource(stringFlinkKafkaConsumer);
> source.print();
> env.execute();
>
>
>
>
>
>
>
>
>
>
>somebody someone
>[email protected]
>
>
>
>