Yes, the same exact input operators go into both joins. The chunk of code for the joins from the specific part of the plan I showed is as follows. The orgUsersTable is later filtered into one table and aggregated and another table and aggregated. The planner seems to duplicate orgUsersTable into 2 operators even though I create only 1 of it.
// in the main function val orgUsersTable = splatRoles( this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS), OrgUsersRoleSplatPrefix, this.tableEnv ) // helper function def splatRoles( table: Table, columnPrefix: String, tableEnv: TableEnvironment ): Table = { // Flink does not have a contains function so we have to splat out our role array's contents // and join it to the originating table. val func = new SplatRolesFunc() val splatted = table .map(func($"roles", $"id")) .as( "id_splatted", s"${columnPrefix}_is_admin", s"${columnPrefix}_is_teacher", s"${columnPrefix}_is_student", s"${columnPrefix}_is_parent" ) // FIRST_VALUE is only available in SQL - so this is SQL. // Rationale: We have to group by after a map to preserve the pk inference, otherwise flink will // toss it out and all future joins will not have a unique key. tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted) val grouped = tableEnv.sqlQuery(s""" SELECT id_splatted, FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin, FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher, FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student, FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent FROM ${columnPrefix}_splatted GROUP BY id_splatted """) return table .join(grouped, $"id" === $"id_splatted") .dropColumns($"id_splatted") .renameColumns($"roles".as(s"${columnPrefix}_roles")) } @FunctionHint( output = new DataTypeHint( "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)" ) ) class SplatRolesFunc extends ScalarFunction { def eval(roles: Array[String], id: java.lang.Long): Row = { val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue) val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue) val isStudent: java.lang.Boolean = roles.contains(Student.rawValue) val isParent: java.lang.Boolean = roles.contains(Parent.rawValue) return Row.of(id, isAdmin, isTeacher, isStudent, isParent) } override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.ROW( Types.LONG, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN ) } On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <yungao...@aliyun.com> wrote: > Hi Rex, > > Could you also attach one example for these sql / table ? And one > possible issue to confirm is that does the operators with the same names > also have the same inputs ? > > Best, > Yun > > ------------------Original Mail ------------------ > *Sender:*Rex Fenley <r...@remind101.com> > *Send Date:*Fri Dec 4 02:55:41 2020 > *Recipients:*user <user@flink.apache.org> > *Subject:*Duplicate operators generated by plan > >> Hello, >> >> I'm running into an issue where my execution plan is creating the same >> exact join operator multiple times simply because the subsequent operator >> filters on a different boolean value. This is a massive duplication of >> storage and work. The filtered operators which follow result in only a >> small set of elements filtered out per set too. >> >> eg. of two separate operators that are equal >> >> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, >> organization_id, user_id, roles, id_splatted, org_user_is_admin, >> org_user_is_teacher, org_user_is_student, org_user_is_parent], >> leftInputSpec=[JoinKeyContainsUniqueKey], >> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, >> organization_id, user_id, roles AS org_user_roles, org_user_is_admin, >> org_user_is_teacher, org_user_is_student, org_user_is_parent] >> >> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, >> organization_id, user_id, roles, id_splatted, org_user_is_admin, >> org_user_is_teacher, org_user_is_student, org_user_is_parent], >> leftInputSpec=[JoinKeyContainsUniqueKey], >> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, >> organization_id, user_id, roles AS org_user_roles, org_user_is_admin, >> org_user_is_teacher, org_user_is_student, org_user_is_parent]) >> >> Which are entirely the same datasets being processed. >> >> The first one points to >> GroupAggregate(groupBy=[user_id], select=[user_id, >> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, >> TMP_0.f0 AS admin_organization_ids]) >> >> The second one points to >> GroupAggregate(groupBy=[user_id], select=[user_id, >> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, >> TMP_0.f0 AS teacher_organization_ids]) >> >> And these are both intersecting sets of data though slightly different. I >> don't see why that would make the 1 join from before split into 2 though. >> There's even a case where I'm seeing a join tripled. >> >> Is there a good reason why this should happen? Is there a way to tell >> flink to not duplicate operators where it doesn't need to? >> >> Thanks! >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>