Hi Peter,

You should be able to use Spark structured streaming to write micro batches
to an Iceberg table. That's documented on the Iceberg site under Structured
Streaming <https://iceberg.apache.org/spark-structured-streaming/>, and you
can check out the tests
<https://github.com/apache/iceberg/blob/master/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java#L86>
if you want full examples. That should support ORC just fine.

For compaction, check out the new MERGE INTO command, which can use a
"replace" operation to rewrite your data. That will help any streaming
consumers because replace snapshots can be ignored because they don't
change the table's data.

I hope that covers it, but please reply if you're still running into any
issues.

Ryan

On Thu, Jul 8, 2021 at 12:02 PM Peter Giles <gil...@uw.edu> wrote:

> Hi all, I have a non-iceberg Spark streaming process that I'm trying
> to re-engineer, and am running into some trouble making it happen
> using Iceberg.  I think I'm using a fairly common pattern, so I wonder
> if someone here can give me a tip on how to go about it.  I'll try to
> be concise but give enough detail to convey the problem:
>
> It's a spark streaming app that takes micro-batches of data from a
> kafka topic, does some transformation on it, sorts the in-flight data,
>  and then writes the micro-batch out into an existing ORC "table" (not
> a hive table, just an HDFS directory that contains all of the
> partitions).  The table is partitioned by date+hour, and within each
> partition it's ordered by a string field.  There is a constant stream
> of incoming data, and it's a whole lot of volume, so micro-batches are
> being processed frequently, each of which creates an additional set of
> ORC files within the table.  This results in lots of files being
> created, way more than is optimal, so after a time, when all the data
> for an hour has finally been written out, a separate job
> "compacts"/coalesces that hour of data (in other words, it gets
> re-written to a smaller number of ORC files).
>
> Why do it this way?
> * Data is available for search/analysis almost immediately. All the
> previous hours of data, having been compacted, are well optimized, and
> having one poorly optimized hour is fine in trade for being able to
> access the most recent data too.
> * Writing many smaller ORC files for the current hour allows each file
> to keep the correct ordering, which turns out to be important:  Using
> ORC's bloom filters (AKA "light indexes") in combination with the
> sorted data vastly improves search performance.
>
> The major pain point is "compaction": because that process rewrites
> the hour of data and then replaces the existing files, it will break
> any already running analyses that happen to need rows from that hour.
> I want to refactor to use Iceberg so that I can seamlessly do those
> compactions thanks to snapshots.
>
> What I *think* I need is a way to get Iceberg to create new files
> within the table for each micro-batch.  At first I thought that
> perhaps the SparkPartitionedFanoutWriter might be the right tool, but
> (a) it doesn't seem to support ORC, and (b), if I'm reading it right,
> it wants to use a size threshold to decide when to write to additional
> files, which isn't what I need.  Is there a simple answer here, or
> would I need a new feature in Iceberg to support this use case?  Or
> maybe this is an outdated pattern, and I should be doing it a
> different way?
>
> Thank you for bearing with me.  Any suggestions are appreciated.
>
> - Peter
>


-- 
Ryan Blue
Tabular

Reply via email to