Hi Arvid,

Thanks for following up…

> On Sep 2, 2019, at 3:09 AM, Arvid Heise <ar...@data-artisans.com> wrote:
> 
> Hi Ken,
> 
> that's indeed a very odd issue that you found. I had a hard time to connect
> block size with S3 in the beginning and had to dig into the code. I still
> cannot fully understand why you got two different block size values from
> the S3 FileSytem. Looking into Hadoop code, I found the following snippet
> 
> public long getDefaultBlockSize() {
>    return this.getConf().getLong("fs.s3.block.size", 67108864L);
> }
> 
> I don't see a quick fix for that. Looks like mismatching configurations on
> different machines. We should probably have some sanity checks to detect
> mismatching block header information, but unfortunately, the block header
> is very primitive and doesn't allow for sophisticated checks.

Yes - and what made it harder to debug is that when the incorrect block size 
was set to 32MB, sometimes the first split that got processed was split[1] 
(second actual split). In that situation, the block info record was where the 
code expected it to be (since it was reading from 64MB - record size), so it 
all looked OK, but then the first record processed would be at an incorrect 
position.

> So let's focus on implementation solutions:
> 1. I gather that you need to have support for data that uses
> IOReadableWritable. So moving to more versatile solutions like Avro or
> Parquet is unfortunately not an option. I'd still recommend that for any
> new project.

See below - it’s not a requirement, but certainly easier.

> 2. Storing block size into the repeated headers in a file introduces a kind
> of hen-and-egg problem. You need the block size to read the header to get
> the block size.
> 3. Storing block size once in first block would require additional seeks
> and depending of the degree of parallelism would put a rather high load on
> the data node with the first block.
> 4. Storing block size in metadata would be ideal but with the wide range of
> possible filesystems most likely not doable.
> 5. Explicitly setting the block size would be the most reliable technique
> but quite user-unfriendly, especially, if multiple deployment environment
> use different block sizes.
> 6. Adding a periodic marker seems indeed as the most robust option and
> adding 20 bytes every 2k bytes doesn't seem too bad for me. The downside is
> that seeking can take a long time for larger records as it will linearly
> scan through the bytes at the block start. However, if you really want to
> support copying files across file systems with different block sizes, this
> would be the only option.
> 7. Deprecating sequence format is a good option in the long run. I simply
> don't see that for productive code the performance gain over using Avro or
> Parquet would be noticeable and getting a solid base concept for schema
> evolution will pay off quickly from my experience.
> 
> @Ken, could you please describe for what kind of data do you use the
> sequence format? I like to understand your requirements. How large are your
> records (OoM)? Are they POJOs? Do you craft them manually?

They are hand-crafted POJOs, typically about 1.2K/record.

It’s a mapping from words to feature vectors (and some additional data).

I then use them as backing store with a cache (in a downstream job) as 
side-input to a map function that creates word vectors from large collections 
of text. This is why the serialized format was appealing, as it’s then 
relatively straightforward to use the existing deserialization logic when 
reading from my custom Java code.

So yes, I could switch to Parquet with some additional work, I’ve used that 
format before in Hadoop jobs, but I’ve never tried directly reading from it.

Ditto for Avro. Note that in my use case I don’t have to worry about evolving 
the schema, as it’s just transient data used in the middle of a batch workflow 
(to avoid really, really big joins that take forever).

Regards,

— Ken



> On Sun, Sep 1, 2019 at 9:42 PM Stephan Ewen <se...@apache.org> wrote:
> 
>> Sounds reasonable.
>> 
>> I am adding Arvid to the thread - IIRC he authored that tool in his
>> Stratosphere days. And my a stroke of luck, he is now working on Flink
>> again.
>> 
>> @Arvid - what are your thoughts on Ken's suggestions?
>> 
>> On Fri, Aug 30, 2019 at 8:56 PM Ken Krugler <kkrugler_li...@transpac.com>
>> wrote:
>> 
>>> Hi Stephan (switching to dev list),
>>> 
>>> On Aug 29, 2019, at 2:52 AM, Stephan Ewen <se...@apache.org> wrote:
>>> 
>>> That is a good point.
>>> 
>>> Which way would you suggest to go? Not relying on the FS block size at
>>> all, but using a fix (configurable) block size?
>>> 
>>> 
>>> There’s value to not requiring a fixed block size, as then a file that’s
>>> moved between different file systems can be read using whatever block
>> size
>>> is optimal for that system.
>>> 
>>> Hadoop handles this in sequence files by storing a unique “sync marker”
>>> value in the file header (essentially a 16 byte UUID), injecting one of
>>> these every 2K bytes or so (in between records), and then code can scan
>> for
>>> this to find record boundaries without relying on a block size. The idea
>> is
>>> that 2^128 is a Big Number, so the odds of finding a false-positive sync
>>> marker in data is low enough to be ignorable.
>>> 
>>> But that’s a bigger change. Simpler would be to put a header in each part
>>> file being written, with some signature bytes to aid in detecting
>>> old-format files.
>>> 
>>> Or maybe deprecate SerializedOutputFormat/SerializedInputFormat, and
>>> provide some wrapper glue to make it easier to write/read Hadoop
>>> SequenceFiles that have a null key value, and store the POJO as the data
>>> value. Then you could also leverage Hadoop support for compression at
>>> either record or block level.
>>> 
>>> — Ken
>>> 
>>> 
>>> On Thu, Aug 29, 2019 at 4:49 AM Ken Krugler <kkrugler_li...@transpac.com
>>> 
>>> wrote:
>>> 
>>>> Hi all,
>>>> 
>>>> Wondering if anyone else has run into this.
>>>> 
>>>> We write files to S3 using the SerializedOutputFormat<OurCustomPOJO>.
>>>> When we read them back, sometimes we get deserialization errors where
>> the
>>>> data seems to be corrupt.
>>>> 
>>>> After a lot of logging, the weathervane of blame pointed towards the
>>>> block size somehow not being the same between the write (where it’s
>> 64MB)
>>>> and the read (unknown).
>>>> 
>>>> When I added a call to SerializedInputFormat.setBlockSize(64MB), the
>>>> problems went away.
>>>> 
>>>> It looks like both input and output formats use fs.getDefaultBlockSize()
>>>> to set this value by default, so maybe the root issue is S3 somehow
>>>> reporting different values.
>>>> 
>>>> But it does feel a bit odd that we’re relying on this default setting,
>>>> versus it being recorded in the file during the write phase.
>>>> 
>>>> And it’s awkward to try to set the block size on the write, as you need
>>>> to set it in the environment conf, which means it applies to all output
>>>> files in the job.
>>>> 
>>>> — Ken

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to