Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6107#discussion_r194703701
  
    --- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
 ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.hdfstests;
    +
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.test.util.MiniClusterResource;
    +import org.apache.flink.util.NetUtils;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests for distributing files with {@link 
org.apache.flink.api.common.cache.DistributedCache} via HDFS.
    + */
    +public class DistributedCacheDfsTest {
    +
    +   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +           + "Der Herr. Die himmlischen Heerscharen. Nachher 
Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +           + "RAPHAEL: Die Sonne toent, nach alter Weise, In 
Brudersphaeren Wettgesang,\n"
    +           + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. 
Ihr Anblick\n"
    +           + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
    +           + "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +           + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich 
umher der Erde\n"
    +           + "Pracht; Es wechselt Paradieseshelle Mit tiefer, 
schauervoller Nacht. Es\n"
    +           + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der 
Felsen auf, Und\n"
    +           + "Fels und Meer wird fortgerissen Im ewig schnellem 
Sphaerenlauf.\n"
    +           + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs 
Land, vom Land\n"
    +           + "aufs Meer, und bilden wuetend eine Kette Der tiefsten 
Wirkung rings umher.\n"
    +           + "Da flammt ein blitzendes Verheeren Dem Pfade vor des 
Donnerschlags. Doch\n"
    +           + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +   @ClassRule
    +   public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    +
    +   @ClassRule
    +   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
    +           new MiniClusterResource.MiniClusterResourceConfiguration(
    +                   new org.apache.flink.configuration.Configuration(),
    +                   1,
    +                   1));
    +
    +   private static MiniDFSCluster hdfsCluster;
    +   private static Configuration conf = new Configuration();
    +
    +   private static Path testFile;
    +   private static Path testDir;
    +
    +   @BeforeClass
    +   public static void setup() throws Exception {
    +           File dataDir = TEMP_FOLDER.newFolder();
    +
    +           conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
dataDir.getAbsolutePath());
    +           MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(conf);
    +           hdfsCluster = builder.build();
    +
    +           String hdfsURI = "hdfs://"
    +                   + 
NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), 
hdfsCluster.getNameNodePort())
    +                   + "/";
    +
    +           FileSystem dfs = FileSystem.get(new URI(hdfsURI));
    +           testFile = writeFile(dfs, dfs.getHomeDirectory(), "testFile");
    +
    +           testDir = new Path(dfs.getHomeDirectory(), "testDir");
    +           dfs.mkdirs(testDir);
    +           writeFile(dfs, testDir, "testFile1");
    +           writeFile(dfs, testDir, "testFile2");
    +   }
    +
    +   private static Path writeFile(FileSystem dfs, Path rootDir, String 
fileName) throws IOException {
    +           Path file = new Path(rootDir, fileName);
    +           try (
    +                   DataOutputStream outStream = new 
DataOutputStream(dfs.create(file,
    +                           FileSystem.WriteMode.OVERWRITE))) {
    +                   outStream.writeUTF(testFileContent);
    +           }
    +           return file;
    +   }
    +
    +   @AfterClass
    +   public static void teardown() {
    +           hdfsCluster.shutdown();
    +   }
    +
    +   @Test
    +   public void testDistributeFileViaDFS() throws Exception {
    +
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           env.setParallelism(1);
    +
    +           env.registerCachedFile(testFile.toString(), "test_data", false);
    +           env.registerCachedFile(testDir.toString(), "test_dir", false);
    +
    +           env.fromElements(1)
    +                   .map(new TestMapFunction())
    +                   .print();
    +
    +           env.execute("Distributed Cache Via Blob Test Program");
    +   }
    +
    +   static class TestMapFunction extends RichMapFunction<Integer, String> {
    --- End diff --
    
    can we make this private?


---

Reply via email to