??????bug??????????
classloader.resolve-order: parent-first
??????bug??????parquet????????


------------------ ???????? ------------------
??????:                                                                         
                                               "kcz"                            
                                                        <[email protected]&gt;;
????????:&nbsp;2020??7??17??(??????) ????1:32
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;flink-1.11 DDL ????hdfs???? Cannot instantiate user function



standalone 
lib&nbsp; jar??????
flink-connector-hive_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; 
flink-json-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; flink-sql-connector-kafka_2.12-1.11.0.jar&nbsp; 
log4j-api-2.12.1.jar
flink-csv-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; flink-parquet_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flink-table_2.11-1.11.0.jar&nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log4j-core-2.12.1.jar
flink-dist_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar&nbsp; 
flink-table-blink_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
log4j-slf4j-impl-2.12.1.jar
flink-hadoop-compatibility_2.11-1.11.0.jar&nbsp; 
flink-shaded-zookeeper-3.4.14.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
log4j-1.2-api-2.12.1.jar





??????????idea????????
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
// ????????????????????????????
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);;
env.setStateBackend(new FsStateBackend(path));

tableEnv.executeSql("CREATE TABLE source_table (\n" +
        "\thost STRING,\n" +
        "\turl STRING,\n" +
        "\tpublic_date STRING\n" +
        ") WITH (\n" +
        "\t'connector.type' = 'kafka',\n" +
        "\t'connector.version' = 'universal',\n" +
        "\t'connector.startup-mode' = 'latest-offset',\n" +
        "\t'connector.topic' = 'test_flink_1.11',\n" +
        "\t'connector.properties.group.id' = 'domain_testGroup',\n" +
        "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
        "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
        "\t'update-mode' = 'append',\n" +
        "\t'format.type' = 'json',\n" +
        "\t'format.derive-schema' = 'true'\n" +
        ")");

tableEnv.executeSql("CREATE TABLE fs_table (\n" +
        "  host STRING,\n" +
        "  url STRING,\n" +
        "  public_date STRING\n" +
        ") PARTITIONED BY (public_date) WITH (\n" +
        "  'connector'='filesystem',\n" +
        "  'path'='path',\n" +
        "  'format'='json',\n" +
        "  'sink.partition-commit.delay'='0s',\n" +
        "  'sink.partition-commit.policy.kind'='success-file'\n" +
        ")");

tableEnv.executeSql("INSERT INTO  fs_table SELECT host, url, 
DATE_FORMAT(public_date, 'yyyy-MM-dd') FROM source_table");
TableResult result = tableEnv.executeSql("SELECT * FROM fs_table ");
result.print();
????????
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init&gt;(OperatorChain.java:126)
&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
&nbsp;&nbsp;&nbsp; at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of 
org.apache.commons.collections.map.LinkedMap to field 


??????bug 
sink??hdfs??????????parquet??????lib??????parquet????pom??????provided????????????????error????????pom????????provided????????OK

回复