Thanks Gabriel,

That’s all very helpful.

I’m not at all sure that the timeouts are related to compactions. This is just 
my best guess at reading the logs.

When our cluster is free for an extended period of time, I’ll try to do a large 
ingest and capture some details that will hopefully help clarify the root issue 
there.

AS far as increasing the number of regions, is there a straightforward way to 
do this once the table already exists?

For example, if I wanted to do four regions per letter of the alphabet, is 
there a way to modify my existing table, or would I have to drop it and start 
over?

From: Gabriel Reid [mailto:gabriel.r...@gmail.com]
Sent: Tuesday, June 23, 2015 1:47 PM
To: user@phoenix.apache.org
Subject: Re: CsvBulkLoad output questions

Thanks for all the additional details.

The short answer (to both of your questions from your most-recent mail) is that 
there shouldn't be any data loss, and that the failed reducer will 
automatically be re-run by MapReduce. The full job is only successful (as it 
was in this case) when all mappers and reducers have completed successfully, 
but map tasks or reduce tasks can be run multiple times in case of failure (or 
even in terms of speculative execution to work around slower nodes).

Related to this, the CsvBulkLoadTool actually writes to HFiles (and not 
directly to HBase). Once the HFiles have all been written then are handed off 
to HBase, making a bulk load a nearly atomic operation -- either all the data 
gets loaded in, or none.

I'm actually wondering about what you said with map tasks failing due to HBase 
compactions. As I mentioned above, the CsvBulkLoadTool writes to HFiles (and 
not HBase), so if there are compactions going on in HBase then I think that 
there's something else writing to it. Are you sure that the timeouts that 
you're getting are due to HBase compactions? Also, could you confirm that it's 
the map tasks (and not reduce tasks) that are causing the issues? A stack trace 
from a failed map or reduce task from this situation would also be really good.

About the timeout on your "select count(*)" query, I'm guessing that the 
relatively low number of regions is likely playing a role in this. Region 
boundaries are the default determining factor in parallelization of queries 
(although statistics collection [1] allows an alternative to this). A count 
query will run over each region, so in your situation that's a little over 300 
million records per region. A count is basically an internal scan over the 
region (within the regionserver) which sends back a single count value per 
region, but with 300 million records being iterated over at 300k rows/second, 
it still works out to 18 minutes to run that query.

I'm not sure if you specifically want to have 24 regions, but I think you'll 
get better performance in large scan-based queries with more regions (although 
I'm pretty sure there are other people on this list who are much better 
informed than me on maximizing query performance). Having more regions will 
also increase the parallelism that you get when running the CsvBulkLoadTool, as 
the number of reduce tasks is equal to the number of output regions.

- Gabriel

1. http://phoenix.apache.org/update_statistics.html



On Tue, Jun 23, 2015 at 5:25 PM Riesland, Zack 
<zack.riesl...@sensus.com<mailto:zack.riesl...@sensus.com>> wrote:
This question is mostly a followup based on my earlier mail (below).

I’m re-consuming this data, one (5GB) csv file at a time.

I see that in consuming this file, there was one failed reduce task. In the 
output, I see a stack trace that I’m guessing is related.

So, 2 questions:

1 – does this mean that data from the CSV file failed to be saved to HBase? Or 
is the mapreduce job smart enough to re-try the failed reducer? The reason I 
ask is that in the “Job History” GUI, I see that there were 24 Total Reducers, 
24 Successful Reducers, and 1 Failed Reducer. So if I 1 failed but ALL 
completed successfully, does that mean that the failed reducer was restarted 
and finished?


2 – If this represents missing data, then is that a bug?

15/06/23 10:48:42 INFO mapreduce.Job: Job job_1433177972202_5025 completed 
successfully
15/06/23 10:48:42 INFO mapreduce.Job: Counters: 52
        File System Counters
                FILE: Number of bytes read=119032798843
                FILE: Number of bytes written=178387531471
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=5243131310
                HDFS: Number of bytes written=702177539
                HDFS: Number of read operations=315
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=72
        Job Counters
                Failed reduce tasks=1
                Launched map tasks=39
                Launched reduce tasks=25
                Data-local map tasks=33
                Rack-local map tasks=6
                Total time spent by all maps in occupied slots (ms)=14443554
                Total time spent by all reduces in occupied slots (ms)=11439806
                Total time spent by all map tasks (ms)=14443554
                Total time spent by all reduce tasks (ms)=11439806
                Total vcore-seconds taken by all map tasks=14443554
                Total vcore-seconds taken by all reduce tasks=11439806
                Total megabyte-seconds taken by all map tasks=103531395072
                Total megabyte-seconds taken by all reduce tasks=82000529408
        Map-Reduce Framework
                Map input records=56330988
                Map output records=563309880
                Map output bytes=58314387712
                Map output materialized bytes=59441013088
                Input split bytes=5694
                Combine input records=0
                Combine output records=0
                Reduce input groups=56031172
                Reduce shuffle bytes=59441013088
                Reduce input records=563309880
                Reduce output records=560311720
                Spilled Records=1689929640
                Shuffled Maps =936
                Failed Shuffles=0
                Merged Map outputs=936
                GC time elapsed (ms)=248681
                CPU time spent (ms)=9564070
                Physical memory (bytes) snapshot=144845623296
                Virtual memory (bytes) snapshot=399930961920
                Total committed heap usage (bytes)=220049965056
        Phoenix MapReduce Import
                Upserts Done=56330988
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=5243095430
        File Output Format Counters
                Bytes Written=702177539

From: Riesland, Zack
Sent: Tuesday, June 23, 2015 9:20 AM
To: 'user@phoenix.apache.org<mailto:user@phoenix.apache.org>'
Subject: RE: How To Count Rows In Large Phoenix Table?

Anil: Thanks for the tip about mapreduce.RowCounter. That takes about 70 
minutes, but it works!

Unfortunately, I only got about 60% of the rows I’m expecting.

Gabriel (and anyone interested):

Thanks for your response!

A few details to give context to my question:

Our cluster has 6 region servers (256 GB RAM and 8 large drives each).

Our table has about 8 billion rows, with about a dozen columns each. This is 
mostly time-series data with some details about each value.

For any given ‘node’, which is identified by the key, there are about 2,000 – 
3,000 rows.

These rows each have a different timestamp in one of the columns.

The key + timestamp is the primary key.

I created the table using Phoenix, so I don’t know what it does under the 
covers as far as column families.

My script does something like this:

CREATE TABLE xxxxxx (
    AAAA varchar not null,
    xxxx integer,
    xxxxxx varchar,
    xxxxxx varchar,
    xxxxxx integer,
    BBBB  integer not null, --unique-to-the-node timestamp of the value
    xxxxxx integer,
    xxxxxxx integer,
    xxxxxxx varchar,
    xxxxxxxx  decimal(19,6),
    xxxxxxxx  decimal(19,6)
    CONSTRAINT pk_redacted PRIMARY KEY (AAAA,BBBB)
    )
    COMPRESSION='GZ'
    SPLIT ON ('AZ', 'BZ', 'CZ', 'DZ', 'EZ', 'FZ', 'GZ', 'HZ', 'IZ', 'JZ', 'KZ', 
'LZ', 'MZ', 'NZ', 'OZ', 'PZ', 'RZ', 'SZ', 'TZ', 'UZ', 'VZ', 'WZ');

As you can see, I split the table alphabetically: 1 region per letter of the 
alphabet. Since our keys are based on  customer ID (they start with customer Id 
which is 4 letters), there are a couple letters that have no indexes, so we end 
up with 4 regions per region server (24 total regions).

When I drill into the table from Ambari, I see that it is fairly 
well-distributed. Most regions have 1 million-4 million requests. A few have 
hundreds of thousands.

Queries against the table are very fast. Basically instant.

When I try to consume all the data at once via CsvBulkLoad, it runs for several 
hours. What eventually happens is that more and more map jobs fail (and retry) 
as more and more regions are busy compacting. This eventually hits a certain 
threshold where the application manager decides to fail the whole job.

For my select count(*) query that fails, I believe it is a timeout issue:

java.lang.RuntimeException: org.apache.phoenix.exception.PhoenixIOException: 
org.apache.phoenix.exception.PhoenixIOException: Failed after attempts=36, 
exceptions:
Tue Jun 23 07:53:36 EDT 2015, null, java.net.SocketTimeoutException: 
callTimeout=60000, callDuration=108925: row '' on table 'redacted' at 
region=redacted,,1434377989918.552c1ed6d6d0c65ec30f467ed11ae0c3., 
hostname=redacted,60020,1434375519767, seqNum=2

        at sqlline.SqlLine$IncrementalRows.hasNext(SqlLine.java:2440)
        at sqlline.SqlLine$TableOutputFormat.print(SqlLine.java:2074)
        at sqlline.SqlLine.print(SqlLine.java:1735)
        at sqlline.SqlLine$Commands.execute(SqlLine.java:3683)
        at sqlline.SqlLine$Commands.sql(SqlLine.java:3584)
        at sqlline.SqlLine.dispatch(SqlLine.java:821)
        at sqlline.SqlLine.begin(SqlLine.java:699)
        at sqlline.SqlLine.mainWithInputRedirection(SqlLine.java:441)
        at sqlline.SqlLine.main(SqlLine.java:424)

I am running the query from a region server node by CD’ing into 
/user/hdp/2.2.0.0-2041/phoenix/bin and calling ./sqlline.py <params>

I created /usr/hdp/2.2.0.0-2041/phoenix/bin/hbase-site.xml and added the 
configuration below, but it doesn’t seem to ‘stick’:

<configuration>
    <property>
        <name>phoenix.query.timeoutMs</name>
        <value>900000</value>
    </property>
</configuration>


I understand your comments about determining whether there are any failed map 
or reduce operations. I watched each one in the application master GUI and 
didn’t notice any that failed.

Finally, I understand your point about how the HBase data must have a unique 
key. I confirmed that the source Hive table is also de-duplicated.

Thanks for any insight or hints you might have.

I’d love to be able to ingest the entire data set over night.

It’s clear that I’m missing quite a bit of data and I’m going to have to start 
over with this table…


From: Gabriel Reid [mailto:gabriel.r...@gmail.com]
Sent: Tuesday, June 23, 2015 2:57 AM
To: user@phoenix.apache.org<mailto:user@phoenix.apache.org>
Subject: Re: How To Count Rows In Large Phoenix Table?

Hi Zack,

Would it be possible to provide a few more details on what kinds of failures 
that you're getting, both with the CsvBulkLoadTool, and with the "SELECT 
COUNT(*)" query?

About question #1, there aren't any known bugs (that I'm aware of) that would 
cause some records to go missing in the CsvBulkLoadTool.  One thing to keep in 
mind is that failure to parse an input record won't cause the CsvBulkLoadTool 
to crash, but it will be recorded in the job counters. There are three job 
counters that are recorded: input records, failed records, and output records. 
If the "failed records" job counter is present (i.e. not zero), then that means 
that some records that were present in the input files were not imported.

About the failures that you're getting in the CsvBulkLoadTool, loading 0.5 TB 
of data (or basically any amount of data) should just work. Could you give some 
details on:
* how many records you're working with
* how many regions the output table has
* a general idea of the schema of the output table (how many columns are 
involved, how many column families are involved)
* what the specific errors are that you're getting when the import job fails

One general issue to keep in mind that can cause a difference in the number of 
records in Hive and in Phoenix is that your Phoenix table will have a primary 
key which is guaranteed unique, and this will not be the case in Hive. This can 
mean that there are multiple records in Hive that have the same values in the 
primary key columns as defined in Phoenix, but when bringing these records over 
to Phoenix they will end up as a single row. Any idea if this could be the 
situation in your setup?

- Gabriel


On Tue, Jun 23, 2015 at 6:11 AM anil gupta 
<anilgupt...@gmail.com<mailto:anilgupt...@gmail.com>> wrote:
For#2: You can use Row_Counter mapreduce job of HBase to count rows of large 
table. You dont need to write any code.
Here is the sample command to invoke:
hbase org.apache.hadoop.hbase.mapreduce.RowCounter <TABLE_NAME>
~Anil


On Mon, Jun 22, 2015 at 12:08 PM, Ciureanu Constantin 
<ciureanu.constan...@gmail.com<mailto:ciureanu.constan...@gmail.com>> wrote:

Hive can connect to HBase and insert directly into any direction.
Don't know if it also works via Phoenix...

Counting is too slow on a single threaded job /command line - you should write 
a map-reduce job, with some filter to load just the key this being really fast.

A Map-reduce job is also the solution to load data from hive to HBase (read 
from HDFS not Hive, prepare output to Phoenix format and bulk load the results).
Pe 22 iun. 2015 9:34 p.m., "Riesland, Zack" 
<zack.riesl...@sensus.com<mailto:zack.riesl...@sensus.com>> a scris:
I had a very large Hive table that I needed in HBase.

After asking around, I came to the conclusion that my best bet was to:

1 – export the hive table to a CSV ‘file’/folder on the HDFS
2 – Use the org.apache.phoenix.mapreduce.CsvBulkLoadTool to import the data.

I found that if I tried to pass the entire folder (~ 1/2 TB of data) to the 
CsvBulkLoadTool, my job would eventually fail.

Empirically, it seems that on our particular cluster, 20-30GB of data is the 
most that the CSVBulkLoadTool can handle at one time without so many map jobs 
timing out that the entire operation fails.

So I passed one sub-file at a time and eventually got all the data into HBase.

I tried doing a select count(*)  on the table to see whether all of the rows 
were transferred, but this eventually fails.

Today, I believe I found a set of data that is in Hive but NOT in HBase.

So, I have 2 questions:

1) Are there any known errors with the CsvBulkLoadTool such that it might skip 
some data without getting my attention with some kind of error?

2) Is there a straightforward way to count the rows in my Phoenix table so that 
I can compare the Hive table with the HBase table?

Thanks in advance!


--
Thanks & Regards,
Anil Gupta

Reply via email to