[ https://issues.apache.org/jira/browse/FLINK-1520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14612516#comment-14612516 ]
ASF GitHub Bot commented on FLINK-1520: --------------------------------------- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33822653 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.types.NullValue; +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ +public class GraphCsvReader<K,VV,EV> { + + private final Path vertexPath,edgePath; + private final ExecutionEnvironment executionContext; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction<K, VV> mapper; + +//-------------------------------------------------------------------------------------------------------------------- + + public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) { + this.vertexPath = vertexPath; + this.edgePath = edgePath; + this.VertexReader = new CsvReader(vertexPath,context); + this.EdgeReader = new CsvReader(edgePath,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path edgePath, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.EdgeReader = new CsvReader(edgePath,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path edgePath,final MapFunction<K, VV> mapper, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.EdgeReader = new CsvReader(edgePath,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String edgePath,ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")),new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")),mapper, context); + } + + public CsvReader getEdgeReader() { + return this.EdgeReader; + } + + public CsvReader getVertexReader() { + return this.VertexReader; + } + //-------------------------------------------------------------------------------------------------------------------- + + /** + * Specifies the types for the Graph fields and returns a Graph with those field types + * + * This method is overloaded for the case in which Vertices don't have a value + * + * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data and the type of Vetex ID in the returned Graph. + * @param type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph. + * @param type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. + * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data. + */ + public Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2) { + DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(type0, type0, type2); + if(vertexPath!=null) { + DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(type0, type1); + return Graph.fromTupleDataSet(vertices, edges, executionContext); + } else { + return Graph.fromTupleDataSet(edges, this.mapper, executionContext); + } + + + } + /** + * Specifies the types for the Graph fields and returns a Graph with those field types in the special case + * where Vertices don't have a value + * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph. + * @param type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. + * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. + */ + public Graph<K, NullValue, EV> typesVertexValueNull(Class<K> type0, Class<EV> type1) { + DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(type0, type0, type1); + return Graph.fromTupleDataSet(edges, executionContext); + } + + /** + * Specifies the types for the Graph fields and returns a Graph with those field types ain the special case + * where Edges don't have a value + * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph. + * @param type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. + * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. + */ + public Graph<K, VV, NullValue> typesEdgeValueNull(Class<K> type0, Class<VV> type1) { + DataSet<Tuple3<K, K, NullValue>> edges = this.EdgeReader.types(type0, type0) + .map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, NullValue>>() { + @Override + public Tuple3<K, K, NullValue> map(Tuple2<K, K> tuple2) throws Exception { + return new Tuple3<K, K, NullValue>(tuple2.f0, tuple2.f1, NullValue.getInstance()); + } + }); + + if(vertexPath!=null) { + DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(type0, type1); + return Graph.fromTupleDataSet(vertices, edges, executionContext); + } else { + return Graph.fromTupleDataSet(edges, this.mapper, executionContext); + } + } + + /** + * Specifies the types for the Graph fields and returns a Graph with those field types. + * This method is overloaded for the case in which Vertices and Edges don't have a value --- End diff -- Same here.. Actually, are you allowing reading edges with no values? It seems no when reading the docs, but it appears so here :) > Read edges and vertices from CSV files > -------------------------------------- > > Key: FLINK-1520 > URL: https://issues.apache.org/jira/browse/FLINK-1520 > Project: Flink > Issue Type: New Feature > Components: Gelly > Reporter: Vasia Kalavri > Assignee: Shivani Ghatge > Priority: Minor > Labels: easyfix, newbie > > Add methods to create Vertex and Edge Datasets directly from CSV file inputs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)