[ https://issues.apache.org/jira/browse/FLINK-1520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14625244#comment-14625244 ]
ASF GitHub Bot commented on FLINK-1520: --------------------------------------- Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r34504887 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.types.NullValue; +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ +@SuppressWarnings({"unused" , "unchecked"}) +public class GraphCsvReader<K,VV,EV> { + + private final Path vertexPath,edgePath; + private final ExecutionEnvironment executionContext; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction<K, VV> mapper; + protected Class<K> vertexKey; + protected Class<VV> vertexValue; + protected Class<EV> edgeValue; + +//-------------------------------------------------------------------------------------------------------------------- + public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) { + this.vertexPath = vertexPath; + this.edgePath = edgePath; + this.VertexReader = new CsvReader(vertexPath,context); + this.EdgeReader = new CsvReader(edgePath,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path edgePath, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.EdgeReader = new CsvReader(edgePath,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path edgePath,final MapFunction<K, VV> mapper, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.EdgeReader = new CsvReader(edgePath,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String edgePath,ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")), + new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")),mapper, context); + } + + //-------------------------------------------------------------------------------------------------------------------- + /** + * Specifies the types for the edges fields and returns this instance of GraphCsvReader + * + * @param vertexKey The type of Vetex ID in the Graph. + * @param edgeValue The type of Edge Value in the returned Graph. + * @return The {@link org.apache.flink.graph.GraphCsvReader} + */ + public GraphCsvReader typesEdges(Class<K> vertexKey, Class<EV> edgeValue) { + this.vertexKey = vertexKey; + this.edgeValue = edgeValue; + return this; + } + + /** + * Specifies the types for the edges fields and returns this instance of GraphCsvReader + * This method is overloaded for the case when the type of EdgeValue is NullValue + * @param vertexKey The type of Vetex ID in the Graph. + * @return The {@link org.apache.flink.graph.GraphCsvReader} + */ + public GraphCsvReader typesEdges(Class<K> vertexKey) { + this.vertexKey = vertexKey; + this.edgeValue = null; + return this; + } + + /** + * Specifies the types for the vertices fields and returns an instance of Graph + * @param vertexKey The type of Vertex ID in the Graph. + * @param vertexValue The type of Vertex Value in the Graph. + * @return The {@link org.apache.flink.graph.Graph} + */ + public Graph<K, VV, EV> typesVertices(Class vertexKey, Class vertexValue) { + DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(this.vertexKey,this.vertexKey, this.edgeValue); + if(mapper == null && this.VertexReader != null) { + DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(vertexKey, vertexValue); --- End diff -- Indentation is ruined here... DataSet<Tuple2> should have a tab before... > Read edges and vertices from CSV files > -------------------------------------- > > Key: FLINK-1520 > URL: https://issues.apache.org/jira/browse/FLINK-1520 > Project: Flink > Issue Type: New Feature > Components: Gelly > Reporter: Vasia Kalavri > Assignee: Shivani Ghatge > Priority: Minor > Labels: easyfix, newbie > > Add methods to create Vertex and Edge Datasets directly from CSV file inputs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)