[ 
https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121545#comment-16121545
 ] 

Jark Wu commented on FLINK-7398:
--------------------------------

Porting to Java is not related to this issue. I have the idea for a long time, 
because when we implement a runtime function we have to be careful do not use 
Scala Iterable, do not use Scala foreach, do not use Scala magics. We have to 
be careful to explicitly extend the {{getAccumulatorType}} sometimes, be worry 
about the overhead of Scala types and other problems we don't know.   And you 
are right, it is a huge job and may introduce many risks.

> Table API operators/UDFs must not store Logger
> ----------------------------------------------
>
>                 Key: FLINK-7398
>                 URL: https://issues.apache.org/jira/browse/FLINK-7398
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Aljoscha Krettek
>            Assignee: Haohui Mai
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>
> Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in 
> an instance field (c.f. 
> https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
>  This means that the {{Logger}} will be serialised with the UDF and sent to 
> the cluster. This in itself does not sound right and leads to problems when 
> the slf4j configuration on the Client is different from the cluster 
> environment.
> This is an example of a user running into that problem: 
> https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
>  Here, they have Logback on the client but the Logback classes are not 
> available on the cluster and so deserialisation of the UDFs fails with a 
> {{ClassNotFoundException}}.
> This is a rough list of the involved classes:
> {code}
> src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: 
>  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:
>   private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  
> val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  
> val LOGICAL: Convention = new Convention.Impl("LOGICAL", 
> classOf[FlinkLogicalRel])
> src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val 
> LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
> src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48:
>   val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala:61:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala:47:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala:67:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala:72:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala:60:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala:40:
>   val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala:45:
>   val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala:41:  
> val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala:77:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala:41:  
> val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala:43:  
> val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala:37:  val 
> LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala:39:  val 
> LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala:39:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala:38:  
> val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/MapRunner.scala:36:  val LOG = 
> LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala:37:  
> val LOG = LoggerFactory.getLogger(this.getClass)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to