[ 
https://issues.apache.org/jira/browse/FLINK-24973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

anigkus updated FLINK-24973:
----------------------------
    Description: 
{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.myorg.quickstart;

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;

import java.io.File;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;


public class DistributedCache {
    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
        //1、register a file from HDFS
        //env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile");
        env.registerCachedFile("file:///tmp/tmp.txt", "localFile",true);

        DataSource<String> data = env.fromElements("a", "b", "c", "d");
        data.map(new RichMapFunction<String, String>() {
            private List<String> cache = new ArrayList<String>();

            @Override
            public void open(Configuration parameters) throws Exception {
                //super.open(parameters);
                //2、access cached file via RuntimeContext and DistributedCache
                File file = 
getRuntimeContext().getDistributedCache().getFile("localFile");
                List<String> lines = FileUtils.readLines(file,"UTF-8");  
                
                for (String line : lines) {
                    cache.add(line);
                    System.out.println("line=[" + line + "]");
                }
            }

            @Override
            public String map(String value) throws Exception {
                //value=["a", "b", "c", "d"];
                return value;
            }
        }).print();


    }
} {code}
#/tmp/tmp.txt–>this file existe

List<String> lines = FileUtils.readLines(file,"UTF-8"); //lines.size()=0

Why....

  was:
{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.myorg.quickstart;

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;

import java.io.File;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

/**
 * Copyright © DEEPEXI Technologies Co., Ltd. 2018-2020. All rights reserved.
 *
 * @Author zhangchunping
 * @Date 11/21/21 12:30 PM
 * @Description ???
 */
public class DistributedCache {
    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
        //1、register a file from HDFS
        //env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile");
        env.registerCachedFile("file:///tmp/tmp.txt", "localFile",true);

        DataSource<String> data = env.fromElements("a", "b", "c", "d");
        data.map(new RichMapFunction<String, String>() {
            private List<String> cache = new ArrayList<String>();

            @Override
            public void open(Configuration parameters) throws Exception {
                //super.open(parameters);
                //2、access cached file via RuntimeContext and DistributedCache
                File file = 
getRuntimeContext().getDistributedCache().getFile("localFile");
                List<String> lines = FileUtils.readLines(file,"UTF-8");  
                
                for (String line : lines) {
                    cache.add(line);
                    System.out.println("line=[" + line + "]");
                }
            }

            @Override
            public String map(String value) throws Exception {
                //value=["a", "b", "c", "d"];
                return value;
            }
        }).print();


    }
} {code}
#/tmp/tmp.txt–>this file existe

List<String> lines = FileUtils.readLines(file,"UTF-8"); //lines.size()=0

Why....


> flink registercachedfile example no effect
> ------------------------------------------
>
>                 Key: FLINK-24973
>                 URL: https://issues.apache.org/jira/browse/FLINK-24973
>             Project: Flink
>          Issue Type: Technical Debt
>          Components: API / Core
>    Affects Versions: shaded-14.0
>            Reporter: anigkus
>            Priority: Minor
>
> {code:java}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.myorg.quickstart;
> import org.apache.commons.io.FileUtils;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.configuration.Configuration;
> import java.io.File;
> import java.nio.charset.Charset;
> import java.util.ArrayList;
> import java.util.List;
> public class DistributedCache {
>     public static void main(String[] args) throws Exception {
>         ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>         //1、register a file from HDFS
>         //env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile");
>         env.registerCachedFile("file:///tmp/tmp.txt", "localFile",true);
>         DataSource<String> data = env.fromElements("a", "b", "c", "d");
>         data.map(new RichMapFunction<String, String>() {
>             private List<String> cache = new ArrayList<String>();
>             @Override
>             public void open(Configuration parameters) throws Exception {
>                 //super.open(parameters);
>                 //2、access cached file via RuntimeContext and DistributedCache
>                 File file = 
> getRuntimeContext().getDistributedCache().getFile("localFile");
>                 List<String> lines = FileUtils.readLines(file,"UTF-8");  
>                 
>                 for (String line : lines) {
>                     cache.add(line);
>                     System.out.println("line=[" + line + "]");
>                 }
>             }
>             @Override
>             public String map(String value) throws Exception {
>                 //value=["a", "b", "c", "d"];
>                 return value;
>             }
>         }).print();
>     }
> } {code}
> #/tmp/tmp.txt–>this file existe
> List<String> lines = FileUtils.readLines(file,"UTF-8"); //lines.size()=0
> Why....



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to