I'd open it up in a debugger to see the context where the error is thrown. It also looks like we need to improve the exception's message because it doesn't have enough context for how to fix the problem. If you want, feel free to open an issue and we'll fix it. Even better, we'd love it if you suggested a fix in a PR!
rb On Fri, Feb 8, 2019 at 2:25 PM Akshita Gupta < akshita.gu...@orbitalinsight.com> wrote: > Thanks Ryan and Manish. How can I see what name columns its having issues > with? I did not change anything explicitly. Just to be sure if its really > an issue on the iceberg end and I cannot do anything to rectify and open up > the issue, I am pasting my code here for you to verify: > > package com.mkyong.hashing; > > import org.apache.spark.sql.*; > import org.apache.spark.sql.api.java.UDF1; > import org.apache.spark.sql.api.java.UDF2; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.types.DataTypes; > import org.json.simple.JSONObject; > import org.json.simple.parser.*; > import org.apache.spark.api.java.JavaSparkContext; > > import com.netflix.iceberg.PartitionSpec; > import com.netflix.iceberg.Schema; > import com.netflix.iceberg.Table; > import com.netflix.iceberg.TableProperties; > import com.netflix.iceberg.hadoop.HadoopTables; > import com.netflix.iceberg.types.Types; > import org.apache.hadoop.conf.Configuration; > import java.io.FileReader; > import java.util.Arrays; > > import static com.netflix.iceberg.types.Types.NestedField.optional; > > > public class App { > > private static String AWS_KEY = ""; > private static String AWS_SECRET_KEY = “"; > > private static final Schema SCHEMA = new Schema( > > optional(1, "ad_id", Types.StringType.get()), > optional(2, "latitude", Types.DoubleType.get()), > optional(3, "longitude", Types.DoubleType.get()), > optional(4, "horizontal_accuracy", Types.DoubleType.get()), > optional(5, "id_type", Types.StringType.get()), > optional(6, "utc_timestamp", Types.LongType.get()), > optional(7, "geo_hash", Types.StringType.get()), > optional(8, "cluster", Types.StringType.get()), > optional(9, "new_cluster", Types.StringType.get()) > ); > > private static final Configuration CONF = new Configuration(); > > public static void main(String[] args) throws Exception { > > SparkSession spark = SparkSession > .builder() > .master("local[*]") > .appName("my-spark-iceberg") > .config("spark.driver.memory","50g") > .config("spark.hadoop.fs.s3a.impl", > "org.apache.hadoop.fs.s3a.S3AFileSystem") > .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", > "2") > .config("spark.sql.shuffle.partitions", "400") > .config("spark.eventLog.enabled", "true") > .config("spark.eventLog.dir", "/base/logs") > .getOrCreate(); > > String location = "/orbital/base/iceberg_local_tes"; > JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); > sc.hadoopConfiguration().set("fs.s3a.access.key", AWS_KEY); > sc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_KEY); > sc.hadoopConfiguration().set("spark.driver.memory","50g"); > sc.hadoopConfiguration().set("spark.eventLog.enabled", "true"); > sc.hadoopConfiguration().set("spark.eventLog.dir", “/base/logs"); > sc.setLogLevel("ERROR"); > > HadoopTables tables = new HadoopTables(CONF); > PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity( > "new_cluster").build(); > Table table = tables.create(SCHEMA, spec, location); > > tables.load(location).schema(); > > Dataset<Row> df = spark.read().parquet(" > s3a://key_path/part-002-tid-1544256-c000.gz.parquet"); > > // PARSING JSON FILE > Object obj = new JSONParser().parse(new FileReader( > "/orbital/base/gh_clusters.json")); > JSONObject jo = (JSONObject) obj; > spark.udf().register("getJsonVal", (UDF1<String, String>) key -> (String) > jo.get(key.substring(0, 5)), DataTypes.StringType); > > df = df.withColumn("cluster", functions.callUDF("getJsonVal", df.col( > "geo_hash"))); > // END PARSING JSON FILE > System.out.println(df.count()); > > df.createOrReplaceTempView("df_with_cluster"); > //Here adding a constant geohash value if cluster is null instead of > geohash column > String sql2 = "select *, coalesce(nullif(cluster, ''), '2h5rd') as > new_cluster from df_with_cluster"; > Dataset<Row> df2 = spark.sql(sql2); > > System.out.println("I am done part 1"); > > table.updateProperties().set(TableProperties.WRITE_NEW_DATA_LOCATION, " > s3a://output-bucket/data").commit(); > > df2.sort("new_cluster") > .write() > .format("iceberg") > .mode("append") > .save(location); // metadata location > > System.out.println(String.format("Done writing: %s", location)); > > Dataset<Row> res_df = spark.read().format("iceberg").load(location); > res_df.show(); > } > } > > > Regards > Akshita > > On Feb 5, 2019, at 4:29 PM, Ryan Blue <rb...@netflix.com> wrote: > > +dev@iceberg.apache.org. That's the correct list now. > > Akshita, > > The latest master doesn't yet have the fix that removes the conflict with > Spark's Parquet version. That fix is in PR #63 > <https://github.com/apache/incubator-iceberg/pull/63>. If you build that > PR, then you should be able to shade everything in org.apache.parquet to > avoid the conflict with Spark. > > The error you're getting does look like the one Manish hit. I think that > is because Spark can't match the column that you're trying to project by > name. Make sure the case you're using matches the case used in the table > definition because Iceberg is currently case sensitive. Xabriel is working > on fixing case sensitivity or at least making it optional for expressions > in PR #89 <https://github.com/apache/incubator-iceberg/pull/89>. If it > turns out to be case sensitivity, could you please open an issue on the > project and we'll fix it before the release? > > rb > > On Tue, Feb 5, 2019 at 3:02 PM Akshita Gupta < > akshita.gu...@orbitalinsight.com> wrote: > >> Forwarding to iceberg developers. Not sure if dev@iceberg is valid. >> >> Begin forwarded message: >> >> *From: *Akshita Gupta <akshita.gu...@orbitalinsight.com> >> *Subject: **Quick question on writing parquet files to iceberg* >> *Date: *February 5, 2019 at 12:09:14 PM PST >> *To: *dev@iceberg.apache.org >> >> Hello >> >> Hello, I am having issue reading from partitioned parquet file in >> iceberg. I am able to do csv easily. I get the following error at >> .save(location): >> >> Exception in thread "main" java.lang.NullPointerException at >> com.netflix.iceberg.types.ReassignIds.field(ReassignIds.java:74) at >> com.netflix.iceberg.types.ReassignIds.field(ReassignIds.java:25) at >> com.netflix.iceberg.types.TypeUtil$VisitFieldFuture.get(TypeUtil.java:308) >> at com.google.common.collect.Iterators$8.next(Iterators.java:812) at >> com.google.common.collect.Lists.newArrayList(Lists.java:139) at >> com.google.common.collect.Lists.newArrayList(Lists.java:119) at >> com.netflix.iceberg.types.ReassignIds.struct(ReassignIds.java:52) at >> com.netflix.iceberg.types.ReassignIds.struct(ReassignIds.java:25) at >> com.netflix.iceberg.types.TypeUtil.visit(TypeUtil.java:341) at >> com.netflix.iceberg.types.TypeUtil$VisitFuture.get(TypeUtil.java:293) at >> com.netflix.iceberg.types.ReassignIds.schema(ReassignIds.java:37) at >> com.netflix.iceberg.types.ReassignIds.schema(ReassignIds.java:25) at >> com.netflix.iceberg.types.TypeUtil.visit(TypeUtil.java:313) at >> com.netflix.iceberg.types.TypeUtil.reassignIds(TypeUtil.java:122) at >> com.netflix.iceberg.spark.SparkSchemaUtil.convert(SparkSchemaUtil.java:163) >> at >> com.netflix.iceberg.spark.source.IcebergSource.createWriter(IcebergSource.java:67) >> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:254) at >> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225) at >> com.mkyong.hashing.App.main(App.java:115). >> >> I remember Ryan mentioning to use the bundled version of iceberg earlier >> but I am not sure how to use that? I am partitioning in Java and added >> dependency using pom.xml. >> >> Here is my dependency xml. What else needs to be done to be able to read >> parquet? >> >> <?xml version="1.0" encoding="UTF-8"?> >> <project xmlns="http://maven.apache.org/POM/4.0.0" >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >> <modelVersion>4.0.0</modelVersion> >> >> <groupId>iceberg-sample</groupId> >> <artifactId>java-iceberg</artifactId> >> <version>1.0-SNAPSHOT</version> >> >> <properties> >> <!-- https://maven.apache.org/general.html#encoding-warning --> >> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> >> >> <maven.compiler.source>1.8</maven.compiler.source> >> <maven.compiler.target>1.8</maven.compiler.target> >> </properties> >> >> <repositories> >> <repository> >> <id>jitpack.io</id> >> <url>https://jitpack.io</url> >> </repository> >> </repositories> >> <build> >> <plugins> >> <plugin> >> <groupId>org.apache.maven.plugins</groupId> >> <artifactId>maven-jar-plugin</artifactId> >> <configuration> >> <archive> >> <manifest> >> <mainClass>com.mkyong.hashing.App</mainClass> >> </manifest> >> </archive> >> </configuration> >> </plugin> >> <plugin> >> <groupId>org.apache.maven.plugins</groupId> >> <artifactId>maven-shade-plugin</artifactId> >> <version>3.2.0</version> >> <executions> >> <!-- Attach the shade goal into the package phase --> >> <execution> >> <phase>package</phase> >> <goals> >> <goal>shade</goal> >> </goals> >> <configuration> >> <transformers> >> <transformer >> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> >> </transformers> >> </configuration> >> </execution> >> </executions> >> </plugin> >> </plugins> >> </build> >> >> >> <dependencies> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-core_2.11</artifactId> >> <version>2.3.2</version> >> </dependency> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-sql_2.11</artifactId> >> <version>2.3.2</version> >> </dependency> >> <dependency> >> <groupId>com.amazonaws</groupId> >> <artifactId>aws-java-sdk</artifactId> >> <version>1.11.417</version> >> </dependency> >> <dependency> >> <groupId>com.github.Netflix.iceberg</groupId> >> <artifactId>iceberg-spark</artifactId> >> <version>0.6.2</version> >> </dependency> >> >> <dependency> >> <groupId>com.github.Netflix.iceberg</groupId> >> <artifactId>iceberg-common</artifactId> >> <version>0.6.2</version> >> </dependency> >> <dependency> >> <groupId>com.github.Netflix.iceberg</groupId> >> <artifactId>iceberg-api</artifactId> >> <version>0.6.2</version> >> </dependency> >> <dependency> >> <groupId>com.github.Netflix.iceberg</groupId> >> <artifactId>iceberg-core</artifactId> >> <version>0.6.2</version> >> </dependency> >> <dependency> >> <groupId>com.github.Netflix.iceberg</groupId> >> <artifactId>iceberg-parquet</artifactId> >> <version>0.6.2</version> >> </dependency> >> >> <dependency> >> <groupId>org.apache.hadoop</groupId> >> <artifactId>hadoop-aws</artifactId> >> <version>3.1.1</version> >> </dependency> >> <!--FATAL: DO NOT ADD THIS--> >> <!--<dependency>--> >> <!--<groupId>org.apache.hadoop</groupId>--> >> <!--<artifactId>hadoop-hdfs</artifactId>--> >> <!--<version>3.1.1</version>--> >> <!--</dependency>--> >> <dependency> >> <groupId>org.apache.hadoop</groupId> >> <artifactId>hadoop-common</artifactId> >> <version>3.1.1</version> >> </dependency> >> <dependency> >> <groupId>org.apache.parquet</groupId> >> <artifactId>parquet-common</artifactId> >> <version>1.10.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.parquet</groupId> >> <artifactId>parquet-column</artifactId> >> <version>1.10.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.parquet</groupId> >> <artifactId>parquet-hadoop</artifactId> >> <version>1.10.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.avro</groupId> >> <artifactId>avro</artifactId> >> <version>1.8.2</version> >> </dependency> >> <dependency> >> <groupId>com.googlecode.json-simple</groupId> >> <artifactId> json-simple</artifactId> >> <version>1.1</version> >> </dependency> >> </dependencies> >> </project> >> >> >> >> >> >> -- >> You received this message because you are subscribed to the Google Groups >> "Iceberg Developers" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to iceberg-devel+unsubscr...@googlegroups.com. >> To post to this group, send email to iceberg-de...@googlegroups.com. >> To view this discussion on the web visit >> https://groups.google.com/d/msgid/iceberg-devel/31CCFFC7-913C-4CA6-882C-155C47B22DF6%40orbitalinsight.com >> <https://groups.google.com/d/msgid/iceberg-devel/31CCFFC7-913C-4CA6-882C-155C47B22DF6%40orbitalinsight.com?utm_medium=email&utm_source=footer> >> . >> For more options, visit https://groups.google.com/d/optout. >> > > > -- > Ryan Blue > Software Engineer > Netflix > > > -- Ryan Blue Software Engineer Netflix