Thanks Ha for the quick response. I've installed 3.2.1 locally and see the same issue you're having. I went through the PySpark codebase and this appears to be a bug. I don't see any test-cases where they don't actually supply a predicate. A possible workaround might be overwritePartitions <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.overwritePartitions.html> .
>>> table_name = "local.test.persons_with_age" >>> spark.sql(f"""DROP TABLE IF EXISTS {table_name}""").show() >>> >>> spark.sql(f""" ... CREATE TABLE {table_name} ( ... name string, ... age int ... ) ... USING iceberg ... PARTITIONED BY (age); ... """).show() >>> spark.table(table_name).show() +----+---+ |name|age| +----+---+ +----+---+ >>> persons = [('Fokko', 1), ('Gurbe', 2), ('Pieter', 2)] >>> df = spark.createDataFrame(persons, ['name', 'age']) >>> >>> df.writeTo(table_name).append() >>> spark.table(table_name).show() +------+---+ | name|age| +------+---+ | Fokko| 1| | Gurbe| 2| |Pieter| 2| +------+---+ >>> >>> new_person = [('Joe', 2)] >>> df_overwrite = spark.createDataFrame(new_person, ['name', 'age']) >>> >>> from pyspark.sql.functions import col >>> >>> df_overwrite.writeTo(table_name).overwritePartitions() >>> >>> spark.table(table_name).show() +-----+---+ | name|age| +-----+---+ |Fokko| 1| | Joe| 2| +-----+---+ Let me know if this could solve the issues on your end. Kind regards, Fokko Driesprong Op vr 28 jun 2024 om 21:26 schreef Ha Cao <ha....@twosigma.com>: > Hi Fokko, > > > > Thanks so much for sharing. I am using version 3.2.1. Is this not > supported in 3.2.1? > > > > I do get the error with the `col` syntax: > df2.writeTo(spark_table_path).using("iceberg").overwrite(col("tid") >= 2) > > > > The stack trace would look like this: > > > > --------------------------------------------------------------------------- > > TypeError Traceback (most recent call last) > > > > 1 from pyspark.sql.functions import col > > ----> 2 > df2.writeTo(spark_table_path).using("iceberg").overwrite(col("tid") >= 2) > > > > ...pyspark/sql/readwriter.py in overwrite(self, condition) > > 1164 the output table. > > 1165 """ > > -> 1166 self._jwriter.overwrite(condition) > > 1167 > > 1168 @since(3.1) > > > > ...py4j/java_gateway.py in __call__(self, *args) > > 1311 > > 1312 def __call__(self, *args): > > -> 1313 args_command, temp_args = self._build_args(*args) > > 1314 > > 1315 command = proto.CALL_COMMAND_NAME +\ > > > > ...py4j/java_gateway.py in _build_args(self, *args) > > 1275 def _build_args(self, *args): > > 1276 if self.converters is not None and len(self.converters) > > 0: > > -> 1277 (new_args, temp_args) = self._get_args(args) > > 1278 else: > > 1279 new_args = args > > > > ...py4j/java_gateway.py in _get_args(self, args) > > 1262 for converter in self.gateway_client.converters: > > 1263 if converter.can_convert(arg): > > -> 1264 temp_arg = converter.convert(arg, > self.gateway_client) > > 1265 temp_args.append(temp_arg) > > 1266 new_args.append(temp_arg) > > > > ...py4j/java_collections.py in convert(self, object, gateway_client) > > 508 ArrayList = JavaClass("java.util.ArrayList", > gateway_client) > > 509 java_list = ArrayList() > > --> 510 for element in object: > > 511 java_list.add(element) > > 512 return java_list > > > > ...pyspark/sql/column.py in __iter__(self) > > 461 > > 462 def __iter__(self): > > --> 463 raise TypeError("Column is not iterable") > > 464 > > 465 # string methods > > > > TypeError: Column is not iterable > > > > Thanks! > > Best, > > Ha > > > > *From:* Fokko Driesprong <fo...@apache.org> > *Sent:* Friday, June 28, 2024 3:00 PM > *To:* dev@iceberg.apache.org > *Subject:* Re: Iceberg - PySpark overwrite with a condition > > > > Hey Ha, > > > > What version of Spark are you using? Can you share the whole stack trace? > I tried to reproduce it locally and it worked fine: > > > > pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2\ > --conf > spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions > \ > --conf > spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog > \ > --conf spark.sql.catalog.spark_catalog.type=hive \ > --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ > --conf spark.sql.catalog.local.type=hadoop \ > --conf spark.sql.catalog.local.warehouse=$PWD/warehouse \ > --conf spark.sql.defaultCatalog=local > Python 3.9.6 (default, Feb 3 2024, 15:58:27) > [Clang 15.0.0 (clang-1500.3.9.4)] on darwin > Welcome to > ____ __ > / __/__ ___ _____/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /__ / .__/\_,_/_/ /_/\_\ version 3.5.1 > /_/ > > Using Python version 3.9.6 (default, Feb 3 2024 15:58:27) > Spark context Web UI available at > http://secure-web.cisco.com/1AplqucVgy6zNuU83jHXzTaAD7IXV0U88upo3R0ciGSYtjHwLm9Bdtd78mSwe_bPBtysvIxaZmASm2vknr_GVtMRAgGMde99uUqvLdu1fTxVt8ptznXMo4blxxjNIVJA4-7Cm59oYdA7m0fmKvhYtYy59vrlM4tAGHgE-_oq5HLnogBWQpe1hLalhCvXA78yHcTAYLxNPPka3mPFVSsQhJ5qX908IWNbeGG17g9lUKML0NumrAjj6Q8Izqs-z8MPx/http%3A%2F%2F192.168.1.10%3A4040 > Spark context available as 'sc' (master = local[*], app id = > local-1719599873923). > SparkSession available as 'spark'. > > > > >>> table_name = "local.test.person_with_age" > >>> > >>> spark.sql(f""" > ... CREATE TABLE {table_name} ( > ... name string, > ... age int > ... ) > ... USING iceberg > ... PARTITIONED BY (age); > ... """).show() > ++ > || > ++ > ++ > > >>> spark.table(table_name).show() > +----+---+ > |name|age| > +----+---+ > +----+---+ > > >>> persons = [('Fokko', 1), ('Gurbe', 2), ('Pieter', 2)] > >>> df = spark.createDataFrame(persons, ['name', 'age']) > >>> df.writeTo(table_name).append() > >>> spark.table(table_name).show() > +------+---+ > | name|age| > +------+---+ > | Fokko| 1| > | Gurbe| 2| > |Pieter| 2| > +------+---+ > > >>> new_person = [('Rho', 2)] > >>> df_overwrite = spark.createDataFrame(new_person, ['name', 'age']) > >>> from pyspark.sql.functions import col > >>> df_overwrite.writeTo(table_name).overwrite(col("age") >= 2) > >>> spark.table(table_name).show() > +-----+---+ > | name|age| > +-----+---+ > | Rho| 2| > |Fokko| 1| > +-----+---+ > > > > The syntax with the col is the way to go. I hope this helps and let me > know if this doesn't work for you. > > > > Kind regards, > > Fokko > > > > Op vr 28 jun 2024 om 18:09 schreef Ha Cao <ha....@twosigma.com>: > > Hi Ajantha, > > > > Thanks for replying! The example, however, is in Java. I figure that that > syntax probably only works for Java and Scala. I have tried similarly for > PySpark but still got `Column is not iterable` with: > > df.writeTo(spark_table_path).using("iceberg").overwrite(col("time") > > target_timestamp) > > > > For this, I get `Column object is not callable`: > > > df.writeTo(spark_table_path).using("iceberg").overwrite(col("time").less(target_timestamp)) > > > > The only example I can find in the PySpark codebase is > https://github.com/apache/spark/blob/master/python/pyspark/sql/tests/test_readwriter.py#L251 > but even with this, it throws `Column is not iterable`. I cannot find any > other test case that tests `overwrite()` as a method. > > > > Thank you! > > Best, > > Ha > > > > *From:* Ajantha Bhat <ajanthab...@gmail.com> > *Sent:* Friday, June 28, 2024 3:52 AM > *To:* dev@iceberg.apache.org > *Subject:* Re: Iceberg - PySpark overwrite with a condition > > > > Hi, > > Please refer this doc: > https://iceberg.apache.org/docs/nightly/spark-writes/#overwriting-data > > We do have some test cases for the same: > https://github.com/apache/iceberg/blob/91fbcaa62c25308aa815557dd2c0041f75530705/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/PartitionedWritesTestBase.java#L153 > > - Ajantha > > > > On Fri, Jun 28, 2024 at 1:00 AM Ha Cao <ha....@twosigma.com> wrote: > > Hello, > > > > I am experimenting with PySpark’s DataFrameWriterV2 overwrite() > <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.overwrite.html> > to an Iceberg table with existing data in a target partition. My goal is > that instead of overwriting the entire partition, it will only overwrite > specific rows that match the condition. However, I can’t get it to work > with any syntax and I keep getting “Column is not iterable”. I have tried: > > > > df.writeTo(spark_table_path).using("iceberg").overwrite(df.tid) > > df.writeTo(spark_table_path).using("iceberg").overwrite(df.tid.isin(1)) > > df.writeTo(spark_table_path).using("iceberg").overwrite(df.tid >= 1) > > > > and all of these syntaxes fail with “Column is not iterable”. > > > > What is the correct syntax for this? I also think that there is a > possibility that Iceberg-PySpark integration doesn’t support overwrite, but > I don’t know how to confirm this. > > > > Thank you so much! > > Best, > Ha > >