[ 
https://issues.apache.org/jira/browse/HIVE-8457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chao updated HIVE-8457:
-----------------------
    Description: 
Currently, on the Spark branch, each thread it is bound with a thread-local 
IOContext, which gets initialized when we generates an input {{HadoopRDD}}, and 
later used in {{MapOperator}}, {{FilterOperator}}, etc.

And, given the introduction of HIVE-8118, we may have multiple downstream RDDs 
that share the same input {{HadoopRDD}}, and we would like to have the 
{{HadoopRDD}} to be cached, to avoid scanning the same table multiple times. A 
typical case would be like the following:

{noformat}
     inputRDD     inputRDD
        |            |
       MT_11        MT_12
        |            |
       RT_1         RT_2
{noformat}

Here, {{MT_11}} and {{MT_12}} are {{MapTran}} from a splitted {{MapWork}},
and {{RT_1}} and {{RT_2}} are two {{ReduceTran}}. Note that, this example is 
simplified, as we may also have {{ShuffleTran}} between {{MapTran}} and 
{{ReduceTran}}.

When multiple Spark threads are running, {{MT_11}} may be executed first, and 
it will ask for an iterator from the {{HadoopRDD}} will trigger the creation of 
the iterator, which in turn triggers the initialization of the {{IOContext}} 
associated with that particular thread.

Now, before {{MT_12}} starts executing, it will also ask for an iterator from 
the
{{HadoopRDD}}, and since the RDD is already cached, instead of creating a new 
iterator, it will just fetch it from the cached result. However, the problem 
is, this will skip the initialization of the IOContext associated with this 
particular thread. When {{MT_12}} starts executing, it will first initialize 
the {{MapOperator}}, but since the {{IOContext}} is not initialized, this will 
fail miserably. 

  was:
Currently, on the Spark branch, each thread it is bound with a thread-local 
IOContext, which gets initialized when we generates an input {{HadoopRDD}}, and 
later used in {{MapOperator}}, {{FilterOperator}}, etc.

And, given the introduction of HIVE-8118, we may have multiple downstream RDDs 
that share the same input {{HadoopRDD}}, and we would like to have the 
{{HadoopRDD}} to be cached, to avoid scanning the same table multiple times. A 
typical case would be like the following:

{noformat}
     inputRDD     inputRDD
        |            |
       MT_11        MT_12
        |            |
       RT_1         RT_2
{noformat}

Here, {{MT_11}} and {{MT_12}} are {{MapTran}} from a splitted {{MapWork}},
and {{RT_1}} and {{RT_2}} are two {{ReduceTran}}. Note that, this example is 
simplified, as we may also have {{ShuffleTran}} between {{MapTran}} and 
{{ReduceTran}}.

When multiple Spark threads are running, {{MT_11} may be executed first, and it 
will ask for an iterator from the {{HadoopRDD}} will trigger the creation of 
the iterator, which in turn triggers the initialization of the {{IOContext}} 
associated with that particular thread.

Now, before {{MT_12}} starts executing, it will also ask for an iterator from 
the
{{HadoopRDD}}, and since the RDD is already cached, instead of creating a new 
iterator, it will just fetch it from the cached result. However, the problem 
is, this will skip the initialization of the IOContext associated with this 
particular thread. When {{MT_12}} starts executing, it will first initialize 
the {{MapOperator}}, but since the {{IOContext}} is not initialized, this will 
fail miserably. 


> MapOperator initialization when multiple Spark threads is enabled. [Spark 
> Branch]
> ---------------------------------------------------------------------------------
>
>                 Key: HIVE-8457
>                 URL: https://issues.apache.org/jira/browse/HIVE-8457
>             Project: Hive
>          Issue Type: Bug
>          Components: Spark
>            Reporter: Chao
>
> Currently, on the Spark branch, each thread it is bound with a thread-local 
> IOContext, which gets initialized when we generates an input {{HadoopRDD}}, 
> and later used in {{MapOperator}}, {{FilterOperator}}, etc.
> And, given the introduction of HIVE-8118, we may have multiple downstream 
> RDDs that share the same input {{HadoopRDD}}, and we would like to have the 
> {{HadoopRDD}} to be cached, to avoid scanning the same table multiple times. 
> A typical case would be like the following:
> {noformat}
>      inputRDD     inputRDD
>         |            |
>        MT_11        MT_12
>         |            |
>        RT_1         RT_2
> {noformat}
> Here, {{MT_11}} and {{MT_12}} are {{MapTran}} from a splitted {{MapWork}},
> and {{RT_1}} and {{RT_2}} are two {{ReduceTran}}. Note that, this example is 
> simplified, as we may also have {{ShuffleTran}} between {{MapTran}} and 
> {{ReduceTran}}.
> When multiple Spark threads are running, {{MT_11}} may be executed first, and 
> it will ask for an iterator from the {{HadoopRDD}} will trigger the creation 
> of the iterator, which in turn triggers the initialization of the 
> {{IOContext}} associated with that particular thread.
> Now, before {{MT_12}} starts executing, it will also ask for an iterator from 
> the
> {{HadoopRDD}}, and since the RDD is already cached, instead of creating a new 
> iterator, it will just fetch it from the cached result. However, the problem 
> is, this will skip the initialization of the IOContext associated with this 
> particular thread. When {{MT_12}} starts executing, it will first initialize 
> the {{MapOperator}}, but since the {{IOContext}} is not initialized, this 
> will fail miserably. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to