Hi, Flink community!
i have two problems, 1. how to concat multiset using separator? In spark sql: concat_ws(seperator, collect_set(column)). But in flink, the result data type of function 'collect(distinct column) ' is multiset, the corresponding class of multiset is org.apache.flink.calcite.shaded.com.google.common.collect.Multiset when writing UDF? Is there any better way such as builtin functions? 2. when i enable mini-batch, the throughput of flink application is reduced by 27%, but the mini-batch is designed to increase throughput. did i do something wrong? Thank you for any suggestions. ``` TableEnvironment tenv=... tenv.getConfig.getConfiguration.setBoolean("table.exec.hive.infer-source-parallelism",false) tenv.getConfig.getConfiguration.setBoolean("table.exec.hive.fallback-mapred-reader", true) tenv.getConfig.getConfiguration.setString("table.exec.sink.not-null-enforcer", "drop") tenv.getConfig.getConfiguration.setInteger("table.exec.resource.default-parallelism", GlobalProp.instance.getProperty("flink.parallelism.max","20").toInt) tenv.getConfig.getConfiguration.setBoolean("table.optimizer.join-reorder-enabled",true) tenv.getConfig.getConfiguration.setInteger("table.optimizer.distinct-agg.split.bucket-num", 10240) tenv.getConfig.getConfiguration.setBoolean("table.optimizer.distinct-agg.split.enabled", true) tenv.getConfig.getConfiguration.setInteger("table.optimizer.join.broadcast-threshold", 128 * 1024 * 1024) tenv.getConfig.getConfiguration.setBoolean("table.optimizer.reuse-sub-plan-enabled", true) tenv.getConfig.getConfiguration.setBoolean("table.optimizer.reuse-source-enabled", true) tenv.getConfig.getConfiguration.setBoolean("table.optimizer.source.predicate-pushdown-enabled", true) tenv.getConfig.getConfiguration.setInteger("table.exec.async-lookup.buffer-capacity", 10000) // mini-batch tenv.getConfig.getConfiguration.setBoolean("table.exec.mini-batch.enabled", true) tenv.getConfig.getConfiguration.setString("table.exec.mini-batch.allow-latency", "1 min") tenv.getConfig.getConfiguration.setInteger("table.exec.mini-batch.size", 10000) tenv ``` Best Regards!