Hi, Flink community!



i have two problems,


1. how to concat multiset using separator?
    In spark sql: concat_ws(seperator, collect_set(column)). But in flink, the 
result data type of function 'collect(distinct column) ' is multiset, the 
corresponding class of multiset is 
org.apache.flink.calcite.shaded.com.google.common.collect.Multiset when writing 
UDF? Is there any better way such as builtin functions?  


2. when i enable mini-batch, the throughput of flink application is reduced by 
27%, but the mini-batch is designed to increase throughput. did i do something 
wrong?    


Thank you for any suggestions.


```
       TableEnvironment tenv=...
  
tenv.getConfig.getConfiguration.setBoolean("table.exec.hive.infer-source-parallelism",false)
  
tenv.getConfig.getConfiguration.setBoolean("table.exec.hive.fallback-mapred-reader",
 true)
  
tenv.getConfig.getConfiguration.setString("table.exec.sink.not-null-enforcer", 
"drop")
  
tenv.getConfig.getConfiguration.setInteger("table.exec.resource.default-parallelism",
 GlobalProp.instance.getProperty("flink.parallelism.max","20").toInt)
 
  
tenv.getConfig.getConfiguration.setBoolean("table.optimizer.join-reorder-enabled",true)
  
tenv.getConfig.getConfiguration.setInteger("table.optimizer.distinct-agg.split.bucket-num",
 10240)
  
tenv.getConfig.getConfiguration.setBoolean("table.optimizer.distinct-agg.split.enabled",
 true)
  
tenv.getConfig.getConfiguration.setInteger("table.optimizer.join.broadcast-threshold",
 128 * 1024 * 1024)
  
tenv.getConfig.getConfiguration.setBoolean("table.optimizer.reuse-sub-plan-enabled",
 true)
  
tenv.getConfig.getConfiguration.setBoolean("table.optimizer.reuse-source-enabled",
 true)
  
tenv.getConfig.getConfiguration.setBoolean("table.optimizer.source.predicate-pushdown-enabled",
 true)
  
tenv.getConfig.getConfiguration.setInteger("table.exec.async-lookup.buffer-capacity",
 10000)
  // mini-batch
  tenv.getConfig.getConfiguration.setBoolean("table.exec.mini-batch.enabled", 
true)
  
tenv.getConfig.getConfiguration.setString("table.exec.mini-batch.allow-latency",
 "1 min")
  tenv.getConfig.getConfiguration.setInteger("table.exec.mini-batch.size", 
10000)
  tenv
```




Best Regards!

Reply via email to