Hi Peter, You should be able to use Spark structured streaming to write micro batches to an Iceberg table. That's documented on the Iceberg site under Structured Streaming <https://iceberg.apache.org/spark-structured-streaming/>, and you can check out the tests <https://github.com/apache/iceberg/blob/master/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java#L86> if you want full examples. That should support ORC just fine.
For compaction, check out the new MERGE INTO command, which can use a "replace" operation to rewrite your data. That will help any streaming consumers because replace snapshots can be ignored because they don't change the table's data. I hope that covers it, but please reply if you're still running into any issues. Ryan On Thu, Jul 8, 2021 at 12:02 PM Peter Giles <gil...@uw.edu> wrote: > Hi all, I have a non-iceberg Spark streaming process that I'm trying > to re-engineer, and am running into some trouble making it happen > using Iceberg. I think I'm using a fairly common pattern, so I wonder > if someone here can give me a tip on how to go about it. I'll try to > be concise but give enough detail to convey the problem: > > It's a spark streaming app that takes micro-batches of data from a > kafka topic, does some transformation on it, sorts the in-flight data, > and then writes the micro-batch out into an existing ORC "table" (not > a hive table, just an HDFS directory that contains all of the > partitions). The table is partitioned by date+hour, and within each > partition it's ordered by a string field. There is a constant stream > of incoming data, and it's a whole lot of volume, so micro-batches are > being processed frequently, each of which creates an additional set of > ORC files within the table. This results in lots of files being > created, way more than is optimal, so after a time, when all the data > for an hour has finally been written out, a separate job > "compacts"/coalesces that hour of data (in other words, it gets > re-written to a smaller number of ORC files). > > Why do it this way? > * Data is available for search/analysis almost immediately. All the > previous hours of data, having been compacted, are well optimized, and > having one poorly optimized hour is fine in trade for being able to > access the most recent data too. > * Writing many smaller ORC files for the current hour allows each file > to keep the correct ordering, which turns out to be important: Using > ORC's bloom filters (AKA "light indexes") in combination with the > sorted data vastly improves search performance. > > The major pain point is "compaction": because that process rewrites > the hour of data and then replaces the existing files, it will break > any already running analyses that happen to need rows from that hour. > I want to refactor to use Iceberg so that I can seamlessly do those > compactions thanks to snapshots. > > What I *think* I need is a way to get Iceberg to create new files > within the table for each micro-batch. At first I thought that > perhaps the SparkPartitionedFanoutWriter might be the right tool, but > (a) it doesn't seem to support ORC, and (b), if I'm reading it right, > it wants to use a size threshold to decide when to write to additional > files, which isn't what I need. Is there a simple answer here, or > would I need a new feature in Iceberg to support this use case? Or > maybe this is an outdated pattern, and I should be doing it a > different way? > > Thank you for bearing with me. Any suggestions are appreciated. > > - Peter > -- Ryan Blue Tabular