Hi Rex, I tried a similar example[1] but did not reproduce the issue, which version of Flink you are using now ?
Best, Yun [1] The example code: StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.setRestartStrategy(RestartStrategies.noRestart()); bsEnv.setParallelism(1); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); DataStream<Tuple2<Integer, String>> source = bsEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, String>>() { @Override public void run(SourceContext<Tuple2<Integer, String>> sourceContext) throws Exception { sourceContext.collect(new Tuple2<>(0, "test")); } @Override public void cancel() { } }); Table table = bsTableEnv.fromDataStream( source, $("id"), $("name")); Table table2 = table.select(call("abs", $("id")), $("name")) .as("new_id", "new_name"); bsTableEnv.createTemporaryView("view", table2); Table handled = bsTableEnv.sqlQuery("select new_id, FIRST_VALUE(new_name) as new_name from view group by new_id"); Table ret = table.join(handled) .where($("id").isEqual($("new_id"))) .select($("id"), $("name"), $("new_name")); System.out.println(ret.explain()); DataStream<Tuple2<Boolean, Row>> row = bsTableEnv.toRetractStream(ret, Row.class); row.addSink(new SinkFunction<Tuple2<Boolean, Row>>() { @Override public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception { } }); System.out.println(bsEnv.getStreamGraph().getStreamingPlanAsJSON()); ------------------------------------------------------------------ Sender:Rex Fenley<r...@remind101.com> Date:2020/12/04 14:18:21 Recipient:Yun Gao<yungao...@aliyun.com> Cc:user<user@flink.apache.org>; Brad Davis<brad.da...@remind101.com> Theme:Re: Duplicate operators generated by plan cc Brad On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley <r...@remind101.com> wrote: Yes, the same exact input operators go into both joins. The chunk of code for the joins from the specific part of the plan I showed is as follows. The orgUsersTable is later filtered into one table and aggregated and another table and aggregated. The planner seems to duplicate orgUsersTable into 2 operators even though I create only 1 of it. // in the main function val orgUsersTable = splatRoles( this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS), OrgUsersRoleSplatPrefix, this.tableEnv ) // helper function def splatRoles( table: Table, columnPrefix: String, tableEnv: TableEnvironment ): Table = { // Flink does not have a contains function so we have to splat out our role array's contents // and join it to the originating table. val func = new SplatRolesFunc() val splatted = table .map(func($"roles", $"id")) .as( "id_splatted", s"${columnPrefix}_is_admin", s"${columnPrefix}_is_teacher", s"${columnPrefix}_is_student", s"${columnPrefix}_is_parent" ) // FIRST_VALUE is only available in SQL - so this is SQL. // Rationale: We have to group by after a map to preserve the pk inference, otherwise flink will // toss it out and all future joins will not have a unique key. tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted) val grouped = tableEnv.sqlQuery(s""" SELECT id_splatted, FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin, FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher, FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student, FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent FROM ${columnPrefix}_splatted GROUP BY id_splatted """) return table .join(grouped, $"id" === $"id_splatted") .dropColumns($"id_splatted") .renameColumns($"roles".as(s"${columnPrefix}_roles")) } @FunctionHint( output = new DataTypeHint( "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)" ) ) class SplatRolesFunc extends ScalarFunction { def eval(roles: Array[String], id: java.lang.Long): Row = { val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue) val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue) val isStudent: java.lang.Boolean = roles.contains(Student.rawValue) val isParent: java.lang.Boolean = roles.contains(Parent.rawValue) return Row.of(id, isAdmin, isTeacher, isStudent, isParent) } override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.ROW( Types.LONG, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN ) } On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <yungao...@aliyun.com> wrote: Hi Rex, Could you also attach one example for these sql / table ? And one possible issue to confirm is that does the operators with the same names also have the same inputs ? Best, Yun ------------------Original Mail ------------------ Sender:Rex Fenley <r...@remind101.com> Send Date:Fri Dec 4 02:55:41 2020 Recipients:user <user@flink.apache.org> Subject:Duplicate operators generated by plan Hello, I'm running into an issue where my execution plan is creating the same exact join operator multiple times simply because the subsequent operator filters on a different boolean value. This is a massive duplication of storage and work. The filtered operators which follow result in only a small set of elements filtered out per set too. eg. of two separate operators that are equal Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent] Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent]) Which are entirely the same datasets being processed. The first one points to GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS admin_organization_ids]) The second one points to GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS teacher_organization_ids]) And these are both intersecting sets of data though slightly different. I don't see why that would make the 1 join from before split into 2 though. There's even a case where I'm seeing a join tripled. Is there a good reason why this should happen? Is there a way to tell flink to not duplicate operators where it doesn't need to? Thanks! -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US