Hi, 可以看到你的问题,你需要订阅 user-zh 的邮件列表才能收到相关的回复,可以参考: https://flink.apache.org/community.html
Best, Weihua On Tue, Feb 21, 2023 at 12:17 PM 知而不惑 <chenliangv...@qq.com.invalid> wrote: > 有收到我的问题吗 > > > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > > <chenliangv...@qq.com.INVALID>; > 发送时间: 2023年2月21日(星期二) 上午9:37 > 收件人: "user-zh"<user-zh@flink.apache.org>; > > 主题: 广播流与非广播流 数据先后问题 > > > > 各位大佬好 > 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction > 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt > 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错 > 以下是processElement 的最小工作单元代码示例 和 main 方法的使用: > @Override > public void processElement(FileEventOuterClass.FileEvent value, > BroadcastProcessFunction<FileEventOuterClass.FileEvent, > List<SensitiveRule&gt;, > FileEventOuterClass.FileEvent&gt;.ReadOnlyContext ctx, > Collector<FileEventOuterClass.FileEvent&gt; out) { > try { > ReadOnlyBroadcastState<Void, > List<SensitiveRule&gt;&gt; broadcastState = > ctx.getBroadcastState(ruleDescriptor); > > List<SensitiveRule&gt; > sensitiveRules = broadcastState.get(null); > if > (CollectionUtils.isEmpty(sensitiveRules)) { > return; > } > .... > } catch (Exception e) { > > log.error("SensitiveDataClassify err:", e); > } > } > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > > MapStateDescriptor<Void, > List<SensitiveRule&gt;&gt; ruleDescriptor = > new > MapStateDescriptor<&gt;("ruleBroadcastState", Types.VOID, new > ListTypeInfo<&gt;(SensitiveRule.class)); > > // 广播流 > BroadcastStream<List<SensitiveRule&gt;&gt; > broadcast = sensitiveRule.broadcast(ruleDescriptor); > > DataStreamSource<String&gt; localhost = > env.socketTextStream("localhost", 11451); > > SingleOutputStreamOperator<FileEventOuterClass.FileEvent&gt; stream = > localhost.map((MapFunction<String, FileEventOuterClass.FileEvent&gt;) > value -&gt; > FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build()); > > > SingleOutputStreamOperator<FileEventOuterClass.FileEvent&gt; > streamOperator = stream.connect(broadcast).process(new > SensitiveDataClassify()); > streamOperator.print("qqq"); > env.execute(); > > }