I didn't follow all of this thread, but if you want to have exactly one bucket-output-file per RDD-partition, you have to repartition (shuffle) your data on the bucket-key. If you don't repartition (shuffle), you may have records with different bucket-keys in the same RDD-partition, leading to two bucket-output-files for that RDD-partition. So, in your example from Sep 17, you're missing a res.repartition(8, "xxx_id").write...

Am 19.09.2016 um 16:26 schrieb Qiang Li:
I tried dataframe writer with coalesce or repartition api, but it can not meet my requirements, I still can get far more files than bucket number, and spark jobs is very slow after I add coalesce or repartition.

I've get back to Hive, use Hive to do data conversion.

Thanks.

On Sat, Sep 17, 2016 at 11:12 PM, Mich Talebzadeh <mich.talebza...@gmail.com <mailto:mich.talebza...@gmail.com>> wrote:

    Ok

    You have an external table in Hive  on S3 with partition and
    bucket. say

    ......
    PARTITIONED BY (year int, month string)
    CLUSTERED BY (prod_id) INTO 256 BUCKETS
    STORED AS ORC.....

    with have within each partition buckets on prod_id equally spread
    to 256 hash partitions/bucket. bucket is the hash partitioning
    within a Hive table partition.

    Now my question is how do you force data to go for a given
    partition p into bucket n. Since you have already specified say
    256 buckets then whatever prod_id is, it still has to go to one of
    256 buckets.

    Within Spark , the number of files is actually the number of
    underlying RDD partitions.  You can find this out by invoking
    toJavaRDD.partitions.size() and force it to accept a certain
    number of partitions by using coalesce(n) or something like that.
    However, I am not sure the output will be what you expect to be.

    Worth trying to sort it out the way you want with partition 8

    val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    val s = spark.read.parquet("oraclehadoop.sales2")
    s.coalesce(8).registerTempTable("tmp")
    HiveContext.sql("SELECT * FROM tmp SORT BY
    prod_id").write.mode("overwrite").parquet("test.sales6")


    It may work.

    HTH



    Dr Mich Talebzadeh

    LinkedIn
    
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
    
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>/

    http://talebzadehmich.wordpress.com
    <http://talebzadehmich.wordpress.com>


    *Disclaimer:* Use it at your own risk.Any and all responsibility
    for any loss, damage or destruction of data or any other property
    which may arise from relying on this email's technical content is
    explicitly disclaimed. The author will in no case be liable for
    any monetary damages arising from such loss, damage or destruction.


    On 17 September 2016 at 15:00, Qiang Li <q...@appannie.com
    <mailto:q...@appannie.com>> wrote:

        I want to run job to load existing data from one S3 bucket,
        process it, then store to another bucket with Partition, and
        Bucket (data format conversion from tsv to parquet with gzip).
        So source data and results both are in S3, different are the
        tools which I used to process data.

        First I process data with Hive, create external tables with s3
         location with partition and bucket number, jobs will generate
        files under each partition directory, and it was equal bucket
        number.
        then everything is ok, I also can use hive/presto/spark to run
        other jobs on results data in S3.

        But if I run spark job with partition and bucket, sort
        feature, spark job will generate far more files than bucket
        number under each partition directory, so presto or hive can
        not recongnize  the bucket because wrong files number is not
        equal bucket number in spark job.

        for example:
        ...
        val options = Map("path" -> "result_bucket_path",
        "compression" -> "gzip")
        res.write.mode("append").format("parquet").partitionBy("year",
        "month", "day").bucketBy(8,
        
"xxx_id").sortBy("xxx_id").options(options).saveAsTable("result_bucket_name")
        ...

        The results bucket files under each partition is far more than 8.


        On Sat, Sep 17, 2016 at 9:27 PM, Mich Talebzadeh
        <mich.talebza...@gmail.com <mailto:mich.talebza...@gmail.com>>
        wrote:

            It is difficult to guess what is happening with your data.

            First when you say you use Spark to generate test data are
            these selected randomly and then stored in Hive/etc table?

            HTH

            Dr Mich Talebzadeh

            LinkedIn
            
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
            
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>/

            http://talebzadehmich.wordpress.com
            <http://talebzadehmich.wordpress.com>


            *Disclaimer:* Use it at your own risk.Any and all
            responsibility for any loss, damage or destruction of data
            or any other property which may arise from relying on this
            email's technical content is explicitly disclaimed. The
            author will in no case be liable for any monetary damages
            arising from such loss, damage or destruction.


            On 17 September 2016 at 13:59, Qiang Li <q...@appannie.com
            <mailto:q...@appannie.com>> wrote:

                Hi,

                I use spark to generate data , then we use
                hive/pig/presto/spark to analyze data, but I found
                even I add used bucketBy and sortBy with bucket number
                in Spark, the results files was generate by Spark is
                always far more than bucket number under each
                partition, then Presto can not recognize the bucket,
                how can I control that in Spark ?

                Unfortunately, I did not find any way to do that.

                Thank you.

-- Adam - App Annie Ops
                Phone: +86 18610024053 <tel:%2B86%2018610024053>
                Email: q...@appannie.com <mailto:q...@appannie.com>

                /This email may contain or reference confidential
                information and is intended only for the individual to
                whom it is addressed.  Please refrain from
                distributing, disclosing or copying this email and the
                information contained within unless you are the
                intended recipient.  If you received this email in
                error, please notify us at le...@appannie.com
                <mailto:le...@appannie.com>// immediately and remove
                it from your system./





-- Adam - App Annie Ops
        Phone: +86 18610024053 <tel:%2B86%2018610024053>
        Email: q...@appannie.com <mailto:q...@appannie.com>

        /This email may contain or reference confidential information
        and is intended only for the individual to whom it is
        addressed. Please refrain from distributing, disclosing or
        copying this email and the information contained within unless
        you are the intended recipient.  If you received this email in
        error, please notify us at le...@appannie.com
        <mailto:le...@appannie.com>// immediately and remove it from
        your system./





--
Adam - App Annie Ops
Phone: +86 18610024053
Email: q...@appannie.com <mailto:q...@appannie.com>

/This email may contain or reference confidential information and is intended only for the individual to whom it is addressed. Please refrain from distributing, disclosing or copying this email and the information contained within unless you are the intended recipient. If you received this email in error, please notify us at le...@appannie.com <mailto:le...@appannie.com>// immediately and remove it from your system./

Reply via email to