[ https://issues.apache.org/jira/browse/FLINK-24973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
anigkus updated FLINK-24973: ---------------------------- Description: {code:java} /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.myorg.quickstart; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.configuration.Configuration; import java.io.File; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; public class DistributedCache { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //1、register a file from HDFS //env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile"); env.registerCachedFile("file:///tmp/tmp.txt", "localFile",true); DataSource<String> data = env.fromElements("a", "b", "c", "d"); data.map(new RichMapFunction<String, String>() { private List<String> cache = new ArrayList<String>(); @Override public void open(Configuration parameters) throws Exception { //super.open(parameters); //2、access cached file via RuntimeContext and DistributedCache File file = getRuntimeContext().getDistributedCache().getFile("localFile"); List<String> lines = FileUtils.readLines(file,"UTF-8"); for (String line : lines) { cache.add(line); System.out.println("line=[" + line + "]"); } } @Override public String map(String value) throws Exception { //value=["a", "b", "c", "d"]; return value; } }).print(); } } {code} #/tmp/tmp.txt–>this file existe List<String> lines = FileUtils.readLines(file,"UTF-8"); //lines.size()=0 Why.... was: {code:java} /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.myorg.quickstart; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.configuration.Configuration; import java.io.File; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; /** * Copyright © DEEPEXI Technologies Co., Ltd. 2018-2020. All rights reserved. * * @Author zhangchunping * @Date 11/21/21 12:30 PM * @Description ??? */ public class DistributedCache { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //1、register a file from HDFS //env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile"); env.registerCachedFile("file:///tmp/tmp.txt", "localFile",true); DataSource<String> data = env.fromElements("a", "b", "c", "d"); data.map(new RichMapFunction<String, String>() { private List<String> cache = new ArrayList<String>(); @Override public void open(Configuration parameters) throws Exception { //super.open(parameters); //2、access cached file via RuntimeContext and DistributedCache File file = getRuntimeContext().getDistributedCache().getFile("localFile"); List<String> lines = FileUtils.readLines(file,"UTF-8"); for (String line : lines) { cache.add(line); System.out.println("line=[" + line + "]"); } } @Override public String map(String value) throws Exception { //value=["a", "b", "c", "d"]; return value; } }).print(); } } {code} #/tmp/tmp.txt–>this file existe List<String> lines = FileUtils.readLines(file,"UTF-8"); //lines.size()=0 Why.... > flink registercachedfile example no effect > ------------------------------------------ > > Key: FLINK-24973 > URL: https://issues.apache.org/jira/browse/FLINK-24973 > Project: Flink > Issue Type: Technical Debt > Components: API / Core > Affects Versions: shaded-14.0 > Reporter: anigkus > Priority: Minor > > {code:java} > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > package org.myorg.quickstart; > import org.apache.commons.io.FileUtils; > import org.apache.flink.api.common.functions.RichMapFunction; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.operators.DataSource; > import org.apache.flink.configuration.Configuration; > import java.io.File; > import java.nio.charset.Charset; > import java.util.ArrayList; > import java.util.List; > public class DistributedCache { > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > //1、register a file from HDFS > //env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile"); > env.registerCachedFile("file:///tmp/tmp.txt", "localFile",true); > DataSource<String> data = env.fromElements("a", "b", "c", "d"); > data.map(new RichMapFunction<String, String>() { > private List<String> cache = new ArrayList<String>(); > @Override > public void open(Configuration parameters) throws Exception { > //super.open(parameters); > //2、access cached file via RuntimeContext and DistributedCache > File file = > getRuntimeContext().getDistributedCache().getFile("localFile"); > List<String> lines = FileUtils.readLines(file,"UTF-8"); > > for (String line : lines) { > cache.add(line); > System.out.println("line=[" + line + "]"); > } > } > @Override > public String map(String value) throws Exception { > //value=["a", "b", "c", "d"]; > return value; > } > }).print(); > } > } {code} > #/tmp/tmp.txt–>this file existe > List<String> lines = FileUtils.readLines(file,"UTF-8"); //lines.size()=0 > Why.... -- This message was sent by Atlassian Jira (v8.20.1#820001)