Hi, Flink community!
i have two problems,
1. how to concat multiset using separator?
In spark sql: concat_ws(seperator, collect_set(column)). But in flink, the
result data type of function 'collect(distinct column) ' is multiset, the
corresponding class of multiset is
org.apache.flink.calcite.shaded.com.google.common.collect.Multiset when writing
UDF? Is there any better way such as builtin functions?
2. when i enable mini-batch, the throughput of flink application is reduced by
27%, but the mini-batch is designed to increase throughput. did i do something
wrong?
Thank you for any suggestions.
```
TableEnvironment tenv=...
tenv.getConfig.getConfiguration.setBoolean("table.exec.hive.infer-source-parallelism",false)
tenv.getConfig.getConfiguration.setBoolean("table.exec.hive.fallback-mapred-reader",
true)
tenv.getConfig.getConfiguration.setString("table.exec.sink.not-null-enforcer",
"drop")
tenv.getConfig.getConfiguration.setInteger("table.exec.resource.default-parallelism",
GlobalProp.instance.getProperty("flink.parallelism.max","20").toInt)
tenv.getConfig.getConfiguration.setBoolean("table.optimizer.join-reorder-enabled",true)
tenv.getConfig.getConfiguration.setInteger("table.optimizer.distinct-agg.split.bucket-num",
10240)
tenv.getConfig.getConfiguration.setBoolean("table.optimizer.distinct-agg.split.enabled",
true)
tenv.getConfig.getConfiguration.setInteger("table.optimizer.join.broadcast-threshold",
128 * 1024 * 1024)
tenv.getConfig.getConfiguration.setBoolean("table.optimizer.reuse-sub-plan-enabled",
true)
tenv.getConfig.getConfiguration.setBoolean("table.optimizer.reuse-source-enabled",
true)
tenv.getConfig.getConfiguration.setBoolean("table.optimizer.source.predicate-pushdown-enabled",
true)
tenv.getConfig.getConfiguration.setInteger("table.exec.async-lookup.buffer-capacity",
10000)
// mini-batch
tenv.getConfig.getConfiguration.setBoolean("table.exec.mini-batch.enabled",
true)
tenv.getConfig.getConfiguration.setString("table.exec.mini-batch.allow-latency",
"1 min")
tenv.getConfig.getConfiguration.setInteger("table.exec.mini-batch.size",
10000)
tenv
```
Best Regards!