one created HIVE-21191:
--------------------------

             Summary: I want to  extends lag/lead functions to Implementing 
some special functions, And I met some problems
                 Key: HIVE-21191
                 URL: https://issues.apache.org/jira/browse/HIVE-21191
             Project: Hive
          Issue Type: Wish
          Components: Hive, UDF, Windows
    Affects Versions: 1.1.0
            Reporter: one


i want a distinctLag functions ,The function is like lag, but the difference is 
to select different values in front of it.
 Example:
 {color:#14892c}select * from active{color}
||session||sq||channel||
|1|1|A|
|1|2|B|
|1|3|B|
|1|4|C|
|1|5|B|
|2|1|C|
|2|2|B|
|2|3|B|
|2|4|A|
|2|5|B|

{color:#14892c}
 select session,sq,lag(channel)over(partition by session order by sq) from 
active{color}
||session||sq||channel||
|1|1|null|
|1|2|A|
|1|3|B|
|1|4|B|
|1|5|C|
|2|1|null|
|2|2|C|
|2|3|B|
|2|4|B|
|2|5|A|

The function I want is:{color:#14892c}
 select session,sq,distinctLag(channel)over(partition by session order by sq) 
from active{color}
||session||sq||channel||
|1|1|null|
|1|2|A|
|1|3|A|
|1|4|B|
|1|5|C|
|2|1|null|
|2|2|C|
|2|3|C|
|2|4|B|
|2|5|A|

 

i try to extend GenericUDFLeadLag and Override:
{code:java}
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@Description(
            name = "distinctLag",
            value = "distinctLag  (scalar_expression [,offset] [,default]) OVER 
([query_partition_clause] order_by_clause); "
                + "The distinctLag function is used to access data from a 
distinct previous row.",
            extended = "Example:\n "
            + "select p1.p_mfgr, p1.p_name, p1.p_size,\n"
            + " p1.p_size - distinctLag(p1.p_size,1,p1.p_size) over( distribute 
by p1.p_mfgr sort by p1.p_name) as deltaSz\n"
            + " from part p1 join part p2 on p1.p_partkey = p2.p_partkey")

@UDFType(impliesOrder = true)
public class GenericUDFDistinctLag extends GenericUDFLeadLag {

        @Override
        public Object evaluate(DeferredObject[] arguments) throws HiveException 
{
                Object defaultVal = null;
                if (arguments.length == 3) {
                        defaultVal = 
ObjectInspectorUtils.copyToStandardObject(getDefaultValueConverter().convert(arguments[2].get()),
 getDefaultArgOI());
                }

                int idx = getpItr().getIndex() - 1;
                int start = 0;
                int end = getpItr().getPartition().size();
                try {
                        Object currValue = 
ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().resetToIndex(idx)),
 getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE);
                        Object ret = null;
                        int newIdx = idx;
                        do {
                                --newIdx;
                                if (newIdx >= end || newIdx < start) {
                                        ret = defaultVal;
                                        return ret;
                                }else{
                                        ret = 
ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().lag(1)),
 getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE);
                                        if(ret.equals(currValue)){
                                                setAmt(getAmt() - 1);
                                        }
                                }
                        } while (getAmt() > 0);
                        return ret;
                } finally {
                        Object currRow = getpItr().resetToIndex(idx);
                        // reevaluate expression on current Row, to trigger the 
Lazy object
                        // caches to be reset to the current row.
                        getExprEvaluator().evaluate(currRow);
                }

        }

        @Override
        protected  String _getFnName(){
                 return "distinctLag";
        }

        @Override
        protected Object getRow(int amt) throws HiveException {
                throw new HiveException("distinctLag error: cannot call 
getRow");
        }

        @Override
        protected int getIndex(int amt) {
                // TODO Auto-generated method stub
                return 0;
        }
}{code}
and package as a jar,add into hive,create a  temporary function.

then,i run:

{color:#14892c}select session,sq,distinctLag(channel)over(partition by session 
order by sq) from active;{color}

{color:#333333}It reported an error:{color}

{color:#d04437}FAILED: SemanticException Failed to breakup Windowing 
invocations into Groups. At least 1 group must only depend on input columns. 
Also check for circular dependencies.
 Underlying error: Invalid function distinctLag{color}

{color:#333333}I don't know exactly what the problem is. I hope someone can 
give me a hint. Thank you.{color}

{color:#333333}then,I noticed that there have a UDAF function GenericUDAFLag.I 
tried to imitate it.{color}
{code:java}
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing;

@WindowFunctionDescription(description = @Description(name = "lag", value = 
"_FUNC_(expr, amt, default)"), supportsWindow = false, pivotResult = true, 
impliesOrder = true)
public class GenericUDAFDistinctLag extends GenericUDAFDistinctLeadLag {

        static final Log LOG = 
LogFactory.getLog(GenericUDAFDistinctLag.class.getName());

        @Override
        protected String functionName() {
                return "Lag";
        }

        @Override
        protected GenericUDAFDistinctLeadLagEvaluator createLLEvaluator() {
                return new GenericUDAFDistinctLagEvaluator();
        }

        public static class GenericUDAFDistinctLagEvaluator extends 
GenericUDAFDistinctLeadLagEvaluator {

                public GenericUDAFDistinctLagEvaluator() {
                }

                /*
                 * used to initialize Streaming Evaluator.
                 */
                protected 
GenericUDAFDistinctLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) {
                        super(src);
                }

                @Override
                protected DistinctLeadLagBuffer getNewLLBuffer() throws 
HiveException {
                        return new DistinctLagBuffer();
                }

                @Override
                public GenericUDAFEvaluator 
getWindowingEvaluator(WindowFrameDef wFrmDef) {

                        return new 
GenericUDAFDistinctLagEvaluatorStreaming(this);
                }

        }

        static class DistinctLagBuffer implements DistinctLeadLagBuffer {
                ArrayList<Object> values;
                int lagAmt;
                ArrayList<Object> lagValues;

                @Override
                public void initialize(int lagAmt) {
                        this.lagAmt = lagAmt;
                        lagValues = new ArrayList<Object>();
                        values = new ArrayList<Object>();
                }

                @Override
                public void addRow(Object currValue, Object defaultValue) {
                        int i = values.size() - 1;
                        int noEquals = 0;
                        for (; i >= 0; i--) {
                                if (!currValue.equals(values.get(i))) {
                                        if (++noEquals == lagAmt) {
                                                break;
                                        }
                                }
                        }
                        lagValues.add(i == -1 ? defaultValue : values.get(i));
                        values.add(currValue);
                }

                @Override
                public Object terminate() {

                        return lagValues;

                }
        }

        /*
         * StreamingEval: wrap regular eval. on getNext remove first row from 
values
         * and return it.
         */
        static class GenericUDAFDistinctLagEvaluatorStreaming extends 
GenericUDAFDistinctLagEvaluator implements ISupportStreamingModeForWindowing {

                protected 
GenericUDAFDistinctLagEvaluatorStreaming(GenericUDAFDistinctLeadLagEvaluator 
src) {
                        super(src);
                }

                @Override
                public Object getNextResult(AggregationBuffer agg) throws 
HiveException {
                        DistinctLagBuffer lb = (DistinctLagBuffer) agg;

                        if (!lb.lagValues.isEmpty()) {
                                Object res = lb.lagValues.remove(0);
                                if (res == null) {
                                        return 
ISupportStreamingModeForWindowing.NULL_RESULT;
                                }
                                return res;
                        } else if (!lb.values.isEmpty()) {
                                Object res = lb.values.remove(0);
                                if (res == null) {
                                        return 
ISupportStreamingModeForWindowing.NULL_RESULT;
                                }
                                return res;
                        }
                        return null;
                }

                @Override
                public int getRowsRemainingAfterTerminate() throws 
HiveException {
                        return getAmt();
                }
        }
}

 {code}
{code:java}
import java.lang.reflect.Field;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.io.IntWritable;

public abstract class GenericUDAFDistinctLeadLag extends 
AbstractGenericUDAFResolver {

        static final Log LOG = 
LogFactory.getLog(GenericUDAFLead.class.getName());

        @Override
        public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo 
parameters) throws SemanticException {

                ObjectInspector[] paramOIs = 
parameters.getParameterObjectInspectors();
                String fNm = functionName();

                if (!(paramOIs.length >= 1 && paramOIs.length <= 3)) {
                        throw new UDFArgumentTypeException(paramOIs.length - 1, 
"Incorrect invocation of " + fNm + ": _FUNC_(expr, amt, default)");
                }

                int amt = 1;
                if (paramOIs.length > 1) {
                        ObjectInspector amtOI = paramOIs[1];

                        if 
(!ObjectInspectorUtils.isConstantObjectInspector(amtOI) || (amtOI.getCategory() 
!= ObjectInspector.Category.PRIMITIVE)
                                        || ((PrimitiveObjectInspector) 
amtOI).getPrimitiveCategory() != 
PrimitiveObjectInspector.PrimitiveCategory.INT) {
                                throw new UDFArgumentTypeException(1, fNm + " 
amount must be a integer value " + amtOI.getTypeName() + " was passed as 
parameter 1.");
                        }
                        Object o = ((ConstantObjectInspector) 
amtOI).getWritableConstantValue();
                        amt = ((IntWritable) o).get();
                        if (amt < 0) {
                                throw new UDFArgumentTypeException(1, fNm + " 
amount can not be nagative. Specified: " + amt);
                        }
                }

                if (paramOIs.length == 3) {
                        ObjectInspectorConverters.getConverter(paramOIs[2], 
paramOIs[0]);
                }

                GenericUDAFDistinctLeadLagEvaluator eval = createLLEvaluator();
                eval.setAmt(amt);
                return eval;
        }

        protected abstract String functionName();

        protected abstract GenericUDAFDistinctLeadLagEvaluator 
createLLEvaluator();

        public static abstract class GenericUDAFDistinctLeadLagEvaluator 
extends GenericUDAFEvaluator {

                private transient ObjectInspector[] inputOI;
                private int amt;
                String fnName;
                private transient Converter defaultValueConverter;

                public GenericUDAFDistinctLeadLagEvaluator() {
                }

                /*
                 * used to initialize Streaming Evaluator.
                 */
                protected 
GenericUDAFDistinctLeadLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) {
                        this.inputOI = src.inputOI;
                        this.amt = src.amt;
                        this.fnName = src.fnName;
                        this.defaultValueConverter = src.defaultValueConverter;
                        try {
                                Field mode = 
GenericUDAFEvaluator.class.getDeclaredField("mode");
                                mode.setAccessible(true);
                                mode.set(this, mode.get(src));
                                mode.setAccessible(false);
                        } catch (IllegalArgumentException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        } catch (IllegalAccessException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        } catch (NoSuchFieldException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        } catch (SecurityException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
                }

                @Override
                public ObjectInspector init(Mode m, ObjectInspector[] 
parameters) throws HiveException {
                        super.init(m, parameters);
                        if (m != Mode.COMPLETE) {
                                throw new HiveException("Only COMPLETE mode 
supported for " + fnName + " function");
                        }

                        inputOI = parameters;

                        if (parameters.length == 3) {
                                defaultValueConverter = 
ObjectInspectorConverters.getConverter(parameters[2], parameters[0]);
                        }

                        return 
ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(parameters[0]));
                }

                public int getAmt() {
                        return amt;
                }

                public void setAmt(int amt) {
                        this.amt = amt;
                }

                public String getFnName() {
                        return fnName;
                }

                public void setFnName(String fnName) {
                        this.fnName = fnName;
                }

                protected abstract DistinctLeadLagBuffer getNewLLBuffer() 
throws HiveException;

                @Override
                public AggregationBuffer getNewAggregationBuffer() throws 
HiveException {
                        DistinctLeadLagBuffer lb = getNewLLBuffer();
                        lb.initialize(amt);
                        return lb;
                }

                @Override
                public void reset(AggregationBuffer agg) throws HiveException {
                        ((DistinctLeadLagBuffer) agg).initialize(amt);
                }

                @Override
                public void iterate(AggregationBuffer agg, Object[] parameters) 
throws HiveException {
                        Object rowExprVal = 
ObjectInspectorUtils.copyToStandardObject(parameters[0], inputOI[0]);
                        Object defaultVal = parameters.length > 2 ? 
ObjectInspectorUtils.copyToStandardObject(defaultValueConverter.convert(parameters[2]),
 inputOI[0]) : null;
                        ((DistinctLeadLagBuffer) agg).addRow(rowExprVal, 
defaultVal);
                }

                @Override
                public Object terminatePartial(AggregationBuffer agg) throws 
HiveException {
                        throw new HiveException("terminatePartial not 
supported");
                }

                @Override
                public void merge(AggregationBuffer agg, Object partial) throws 
HiveException {
                        throw new HiveException("merge not supported");
                }

                @Override
                public Object terminate(AggregationBuffer agg) throws 
HiveException {
                        return ((DistinctLeadLagBuffer) agg).terminate();
                }

        }

}
{code}
{code:java}
import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;

@SuppressWarnings("deprecation")
interface DistinctLeadLagBuffer extends AggregationBuffer {
  void initialize(int leadAmt);

  void addRow(Object leadExprValue, Object defaultValue);

  Object terminate();

}
{code}
and package as a jar,add into hive,create a temporary function.in hige hive 
vesion,it works, but in{color:#14892c} hive1.1.0{color} version ,It reported an 
error {color:#333333}(and in hive1.1.0 if i create that temporary function 
named lag or lead,it also works as what i want ,but it will cover hive's 
built-in function lag/lead even if deleted  that temporary function,Only when I 
quit hive-cli and reenter hive-cli , built-in function lag/lead can 
work){color}:
{code:java}
hive> SELECT session,sq,distinctLag(channel)over(PARTITION BY session ORDER BY 
sq) FROM elephant_active;
Query ID = root_20190131195959_8047b4ba-a85c-4f39-8a27-989388316c50
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1546504603780_0492, Tracking URL = 
http://dce.bi.com:8088/proxy/application_1546504603780_0492/
Kill Command = 
/opt/cloudera/parcels/CDH-5.10.2-1.cdh5.10.2.p0.5/lib/hadoop/bin/hadoop job  
-kill job_1546504603780_0492
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2019-01-31 20:00:03,639 Stage-1 map = 0%,  reduce = 0%
2019-01-31 20:00:09,972 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.38 
sec
2019-01-31 20:00:30,795 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 1.38 
sec
MapReduce Total cumulative CPU time: 1 seconds 380 msec
Ended Job = job_1546504603780_0492 with errors
Error during job, obtaining debugging information...
Examining task ID: task_1546504603780_0492_m_000000 (and more) from job 
job_1546504603780_0492

Task with the most failures(4): 
-----
Task ID:
  task_1546504603780_0492_r_000000

URL:
  
http://0.0.0.0:8088/taskdetails.jsp?jobid=job_1546504603780_0492&tipid=task_1546504603780_0492_r_000000
-----
Diagnostic Messages for this Task:
Error: java.lang.RuntimeException: Error in configuring object
        at 
org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
        at 
org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
        at 
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
        at 
org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:409)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at 
org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
        ... 9 more
Caused by: java.lang.RuntimeException: Reduce operator initialization failed
        at 
org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:166)
        ... 14 more
Caused by: java.lang.NullPointerException
        at 
org.apache.hadoop.hive.ql.exec.Registry.getFunctionInfo(Registry.java:306)
        at 
org.apache.hadoop.hive.ql.exec.Registry.getWindowFunctionInfo(Registry.java:314)
        at 
org.apache.hadoop.hive.ql.exec.FunctionRegistry.getWindowFunctionInfo(FunctionRegistry.java:504)
        at 
org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.streamingPossible(WindowingTableFunction.java:151)
        at 
org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.setCanAcceptInputAsStream(WindowingTableFunction.java:222)
        at 
org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.initializeStreaming(WindowingTableFunction.java:256)
        at 
org.apache.hadoop.hive.ql.exec.PTFOperator$PTFInvocation.initializeStreaming(PTFOperator.java:291)
        at 
org.apache.hadoop.hive.ql.exec.PTFOperator.initializeOp(PTFOperator.java:86)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469)
        at 
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425)
        at 
org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
        at 
org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:159)
        ... 14 more


FAILED: Execution Error, return code 2 from 
org.apache.hadoop.hive.ql.exec.mr.MapRedTask
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 1.38 sec   HDFS Read: 2958 
HDFS Write: 0 FAIL
Total MapReduce CPU Time Spent: 1 seconds 380 msec
hive> 
{code}
I guess it's FunctionRegistryproblem.
 I am a beginner. I hope someone can tell me the correct way to realize this 
special function. Thank you very much. I use Hive 1.1.0 + cdh5.10.2 + 945.
{code:java}
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.1.0-cdh5.10.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.9</version>
<scope>test</scope>
</dependency>
</dependencies>
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to