Version 1.11.2

On Sun, Dec 6, 2020 at 10:20 PM Yun Gao <yungao...@aliyun.com> wrote:

> Hi Rex,
>
>    I tried a similar example[1] but did not reproduce the issue, which
> version of Flink you are using now ?
>
> Best,
>  Yun
>
>
>
>
> [1] The example code:
>
> StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> bsEnv.setRestartStrategy(RestartStrategies.noRestart());
> bsEnv.setParallelism(1);
>
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
> bsSettings);
>
> DataStream<Tuple2<Integer, String>> source = bsEnv.addSource(new 
> RichParallelSourceFunction<Tuple2<Integer, String>>() {
>
>    @Override
>    public void run(SourceContext<Tuple2<Integer, String>> sourceContext) 
> throws Exception {
>       sourceContext.collect(new Tuple2<>(0, "test"));
>    }
>
>    @Override
>    public void cancel() {
>
>    }
> });
>
> Table table = bsTableEnv.fromDataStream(
>       source, $("id"), $("name"));
> Table table2 = table.select(call("abs", $("id")), $("name"))
>       .as("new_id", "new_name");
>
> bsTableEnv.createTemporaryView("view", table2);
> Table handled = bsTableEnv.sqlQuery("select new_id, FIRST_VALUE(new_name) as 
> new_name from view group by new_id");
>
> Table ret = table.join(handled)
>       .where($("id").isEqual($("new_id")))
>       .select($("id"), $("name"), $("new_name"));
> System.out.println(ret.explain());
>
> DataStream<Tuple2<Boolean, Row>> row = bsTableEnv.toRetractStream(ret, 
> Row.class);
> row.addSink(new SinkFunction<Tuple2<Boolean, Row>>() {
>    @Override
>    public void invoke(Tuple2<Boolean, Row> value, Context context) throws 
> Exception {
>
>    }
> });
>
> System.out.println(bsEnv.getStreamGraph().getStreamingPlanAsJSON());
>
> ------------------------------------------------------------------
> Sender:Rex Fenley<r...@remind101.com>
> Date:2020/12/04 14:18:21
> Recipient:Yun Gao<yungao...@aliyun.com>
> Cc:user<user@flink.apache.org>; Brad Davis<brad.da...@remind101.com>
> Theme:Re: Duplicate operators generated by plan
>
> cc Brad
>
> On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley <r...@remind101.com> wrote:
>
>> Yes, the same exact input operators go into both joins.
>>
>> The chunk of code for the joins from the specific part of the plan I
>> showed is as follows. The orgUsersTable is later filtered into one table
>> and aggregated and another table and aggregated. The planner seems to
>> duplicate orgUsersTable into 2 operators even though I create only 1 of it.
>>
>> // in the main function
>> val orgUsersTable = splatRoles(
>> this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
>> OrgUsersRoleSplatPrefix,
>> this.tableEnv
>> )
>>
>> // helper function
>> def splatRoles(
>> table: Table,
>> columnPrefix: String,
>> tableEnv: TableEnvironment
>> ): Table = {
>> // Flink does not have a contains function so we have to splat out our
>> role array's contents
>> // and join it to the originating table.
>> val func = new SplatRolesFunc()
>> val splatted = table
>> .map(func($"roles", $"id"))
>> .as(
>> "id_splatted",
>> s"${columnPrefix}_is_admin",
>> s"${columnPrefix}_is_teacher",
>> s"${columnPrefix}_is_student",
>> s"${columnPrefix}_is_parent"
>> )
>> // FIRST_VALUE is only available in SQL - so this is SQL.
>> // Rationale: We have to group by after a map to preserve the pk
>> inference, otherwise flink will
>> // toss it out and all future joins will not have a unique key.
>> tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
>> val grouped = tableEnv.sqlQuery(s"""
>> SELECT
>> id_splatted,
>> FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
>> FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
>> FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
>> FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
>> FROM ${columnPrefix}_splatted
>> GROUP BY id_splatted
>> """)
>> return table
>> .join(grouped, $"id" === $"id_splatted")
>> .dropColumns($"id_splatted")
>> .renameColumns($"roles".as(s"${columnPrefix}_roles"))
>> }
>>
>> @FunctionHint(
>> output = new DataTypeHint(
>> "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student
>> BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
>> )
>> )
>> class SplatRolesFunc extends ScalarFunction {
>> def eval(roles: Array[String], id: java.lang.Long): Row = {
>> val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
>> val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
>> val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
>> val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
>> return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
>> }
>>
>> override def getResultType(signature: Array[Class[_]]):
>> TypeInformation[_] =
>> Types.ROW(
>> Types.LONG,
>> Types.BOOLEAN,
>> Types.BOOLEAN,
>> Types.BOOLEAN,
>> Types.BOOLEAN
>> )
>> }
>>
>>
>> On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <yungao...@aliyun.com> wrote:
>>
>>> Hi Rex,
>>>
>>>     Could  you also attach one example for these sql / table ? And one
>>> possible issue to confirm is that does the operators with the same names
>>> also have the same inputs ?
>>>
>>> Best,
>>> Yun
>>>
>>> ------------------Original Mail ------------------
>>> *Sender:*Rex Fenley <r...@remind101.com>
>>> *Send Date:*Fri Dec 4 02:55:41 2020
>>> *Recipients:*user <user@flink.apache.org>
>>> *Subject:*Duplicate operators generated by plan
>>>
>>>> Hello,
>>>>
>>>> I'm running into an issue where my execution plan is creating the same
>>>> exact join operator multiple times simply because the subsequent operator
>>>> filters on a different boolean value. This is a massive duplication of
>>>> storage and work. The filtered operators which follow result in only a
>>>> small set of elements filtered out per set too.
>>>>
>>>> eg. of two separate operators that are equal
>>>>
>>>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>>>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>>>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>>>> leftInputSpec=[JoinKeyContainsUniqueKey],
>>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>>>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>>>> org_user_is_teacher, org_user_is_student, org_user_is_parent]
>>>>
>>>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>>>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>>>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>>>> leftInputSpec=[JoinKeyContainsUniqueKey],
>>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>>>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>>>> org_user_is_teacher, org_user_is_student, org_user_is_parent])
>>>>
>>>> Which are entirely the same datasets being processed.
>>>>
>>>> The first one points to
>>>> GroupAggregate(groupBy=[user_id], select=[user_id,
>>>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>>>> TMP_0.f0 AS admin_organization_ids])
>>>>
>>>> The second one points to
>>>> GroupAggregate(groupBy=[user_id], select=[user_id,
>>>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>>>> TMP_0.f0 AS teacher_organization_ids])
>>>>
>>>> And these are both intersecting sets of data though slightly different.
>>>> I don't see why that would make the 1 join from before split into 2 though.
>>>> There's even a case where I'm seeing a join tripled.
>>>>
>>>> Is there a good reason why this should happen? Is there a way to tell
>>>> flink to not duplicate operators where it doesn't need to?
>>>>
>>>> Thanks!
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to