anigkus created FLINK-24973: ------------------------------- Summary: flink registercachedfile example no effect Key: FLINK-24973 URL: https://issues.apache.org/jira/browse/FLINK-24973 Project: Flink Issue Type: Technical Debt Components: API / Core Affects Versions: shaded-14.0 Reporter: anigkus
{code:java} /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.myorg.quickstart; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.configuration.Configuration; import java.io.File; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; /** * Copyright © DEEPEXI Technologies Co., Ltd. 2018-2020. All rights reserved. * * @Author zhangchunping * @Date 11/21/21 12:30 PM * @Description ??? */ public class DistributedCache { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //1、register a file from HDFS //env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile"); env.registerCachedFile("file:///tmp/tmp.txt", "localFile",true); DataSource<String> data = env.fromElements("a", "b", "c", "d"); data.map(new RichMapFunction<String, String>() { private List<String> cache = new ArrayList<String>(); @Override public void open(Configuration parameters) throws Exception { //super.open(parameters); //2、access cached file via RuntimeContext and DistributedCache File file = getRuntimeContext().getDistributedCache().getFile("localFile"); List<String> lines = FileUtils.readLines(file,"UTF-8"); for (String line : lines) { cache.add(line); System.out.println("line=[" + line + "]"); } } @Override public String map(String value) throws Exception { //value=["a", "b", "c", "d"]; return value; } }).print(); } } {code} #/tmp/tmp.txt–>this file existe List<String> lines = FileUtils.readLines(file,"UTF-8"); //lines.size()=0 Why.... -- This message was sent by Atlassian Jira (v8.20.1#820001)