Hi Fokko,

Thanks so much for sharing. I am using version 3.2.1. Is this not supported in 
3.2.1?

I do get the error with the `col` syntax: 
df2.writeTo(spark_table_path).using("iceberg").overwrite(col("tid") >= 2)

The stack trace would look like this:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)

      1 from pyspark.sql.functions import col
----> 2 df2.writeTo(spark_table_path).using("iceberg").overwrite(col("tid") >= 
2)

...pyspark/sql/readwriter.py in overwrite(self, condition)
   1164         the output table.
   1165         """
-> 1166         self._jwriter.overwrite(condition)
   1167
   1168     @since(3.1)

...py4j/java_gateway.py in __call__(self, *args)
   1311
   1312     def __call__(self, *args):
-> 1313         args_command, temp_args = self._build_args(*args)
   1314
   1315         command = proto.CALL_COMMAND_NAME +\

...py4j/java_gateway.py in _build_args(self, *args)
   1275     def _build_args(self, *args):
   1276         if self.converters is not None and len(self.converters) > 0:
-> 1277             (new_args, temp_args) = self._get_args(args)
   1278         else:
   1279             new_args = args

...py4j/java_gateway.py in _get_args(self, args)
   1262                 for converter in self.gateway_client.converters:
   1263                     if converter.can_convert(arg):
-> 1264                         temp_arg = converter.convert(arg, 
self.gateway_client)
   1265                         temp_args.append(temp_arg)
   1266                         new_args.append(temp_arg)

...py4j/java_collections.py in convert(self, object, gateway_client)
    508         ArrayList = JavaClass("java.util.ArrayList", gateway_client)
    509         java_list = ArrayList()
--> 510         for element in object:
    511             java_list.add(element)
    512         return java_list

...pyspark/sql/column.py in __iter__(self)
    461
    462     def __iter__(self):
--> 463         raise TypeError("Column is not iterable")
    464
    465     # string methods

TypeError: Column is not iterable

Thanks!
Best,
Ha

From: Fokko Driesprong <fo...@apache.org>
Sent: Friday, June 28, 2024 3:00 PM
To: dev@iceberg.apache.org
Subject: Re: Iceberg - PySpark overwrite with a condition

Hey Ha,

What version of Spark are you using? Can you share the whole stack trace? I 
tried to reproduce it locally and it worked fine:

pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2\
    --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
 \
    --conf 
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive \
    --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.type=hadoop \
    --conf spark.sql.catalog.local.warehouse=$PWD/warehouse \
    --conf spark.sql.defaultCatalog=local
Python 3.9.6 (default, Feb  3 2024, 15:58:27)
[Clang 15.0.0 (clang-1500.3.9.4)] on darwin
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/

Using Python version 3.9.6 (default, Feb  3 2024 15:58:27)
Spark context Web UI available at 
http://secure-web.cisco.com/1AplqucVgy6zNuU83jHXzTaAD7IXV0U88upo3R0ciGSYtjHwLm9Bdtd78mSwe_bPBtysvIxaZmASm2vknr_GVtMRAgGMde99uUqvLdu1fTxVt8ptznXMo4blxxjNIVJA4-7Cm59oYdA7m0fmKvhYtYy59vrlM4tAGHgE-_oq5HLnogBWQpe1hLalhCvXA78yHcTAYLxNPPka3mPFVSsQhJ5qX908IWNbeGG17g9lUKML0NumrAjj6Q8Izqs-z8MPx/http%3A%2F%2F192.168.1.10%3A4040
Spark context available as 'sc' (master = local[*], app id = 
local-1719599873923).
SparkSession available as 'spark'.

>>> table_name = "local.test.person_with_age"
>>>
>>> spark.sql(f"""
... CREATE TABLE {table_name} (
...     name string,
...     age int
... )
... USING iceberg
... PARTITIONED BY (age);
... """).show()
++
||
++
++

>>> spark.table(table_name).show()
+----+---+
|name|age|
+----+---+
+----+---+

>>> persons = [('Fokko', 1), ('Gurbe', 2), ('Pieter', 2)]
>>> df = spark.createDataFrame(persons, ['name', 'age'])
>>> df.writeTo(table_name).append()
>>> spark.table(table_name).show()
+------+---+
|  name|age|
+------+---+
| Fokko|  1|
| Gurbe|  2|
|Pieter|  2|
+------+---+

>>> new_person = [('Rho', 2)]
>>> df_overwrite = spark.createDataFrame(new_person, ['name', 'age'])
>>> from pyspark.sql.functions import col
>>> df_overwrite.writeTo(table_name).overwrite(col("age") >= 2)
>>> spark.table(table_name).show()
+-----+---+
| name|age|
+-----+---+
|  Rho|  2|
|Fokko|  1|
+-----+---+

The syntax with the col is the way to go. I hope this helps and let me know if 
this doesn't work for you.

Kind regards,
Fokko

Op vr 28 jun 2024 om 18:09 schreef Ha Cao 
<ha....@twosigma.com<mailto:ha....@twosigma.com>>:
Hi Ajantha,

Thanks for replying! The example, however, is in Java. I figure that that 
syntax probably only works for Java and Scala. I have tried similarly for 
PySpark but still got `Column is not iterable` with:
df.writeTo(spark_table_path).using("iceberg").overwrite(col("time") > 
target_timestamp)

For this, I get `Column object is not callable`:
df.writeTo(spark_table_path).using("iceberg").overwrite(col("time").less(target_timestamp))

The only example I can find in the PySpark codebase is 
https://github.com/apache/spark/blob/master/python/pyspark/sql/tests/test_readwriter.py#L251
 but even with this, it throws `Column is not iterable`. I cannot find any 
other test case that tests `overwrite()` as a method.

Thank you!
Best,
Ha

From: Ajantha Bhat <ajanthab...@gmail.com<mailto:ajanthab...@gmail.com>>
Sent: Friday, June 28, 2024 3:52 AM
To: dev@iceberg.apache.org<mailto:dev@iceberg.apache.org>
Subject: Re: Iceberg - PySpark overwrite with a condition

Hi,

Please refer this doc: 
https://iceberg.apache.org/docs/nightly/spark-writes/#overwriting-data

We do have some test cases for the same: 
https://github.com/apache/iceberg/blob/91fbcaa62c25308aa815557dd2c0041f75530705/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/PartitionedWritesTestBase.java#L153

- Ajantha

On Fri, Jun 28, 2024 at 1:00 AM Ha Cao 
<ha....@twosigma.com<mailto:ha....@twosigma.com>> wrote:
Hello,

I am experimenting with PySpark’s DataFrameWriterV2 
overwrite()<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.overwrite.html>
 to an Iceberg table with existing data in a target partition. My goal is that 
instead of overwriting the entire partition, it will only overwrite specific 
rows that match the condition. However, I can’t get it to work with any syntax 
and I keep getting “Column is not iterable”. I have tried:

df.writeTo(spark_table_path).using("iceberg").overwrite(df.tid)
df.writeTo(spark_table_path).using("iceberg").overwrite(df.tid.isin(1))
df.writeTo(spark_table_path).using("iceberg").overwrite(df.tid >= 1)

and all of these syntaxes fail with “Column is not iterable”.

What is the correct syntax for this? I also think that there is a possibility 
that Iceberg-PySpark integration doesn’t support overwrite, but I don’t know 
how to confirm this.

Thank you so much!
Best,
Ha

Reply via email to