[ https://issues.apache.org/jira/browse/HIVE-21191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
one updated HIVE-21191: ----------------------- Description: i want a distinctLag functions ,The function is like lag, but the difference is to select different values in front of it. Example: {color:#14892c}select * from active{color} ||session||sq||channel|| |1|1|A| |1|2|B| |1|3|B| |1|4|C| |1|5|B| |2|1|C| |2|2|B| |2|3|B| |2|4|A| |2|5|B| {color:#14892c} select session,sq,lag(channel)over(partition by session order by sq) from active{color} ||session||sq||channel|| |1|1|null| |1|2|A| |1|3|B| |1|4|B| |1|5|C| |2|1|null| |2|2|C| |2|3|B| |2|4|B| |2|5|A| The function I want is:{color:#14892c} select session,sq,distinctLag(channel)over(partition by session order by sq) from active{color} ||session||sq||channel|| |1|1|null| |1|2|A| |1|3|A| |1|4|B| |1|5|C| |2|1|null| |2|2|C| |2|3|C| |2|4|B| |2|5|A| i try to extend GenericUDFLeadLag and Override: {code:java} import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; @Description( name = "distinctLag", value = "distinctLag (scalar_expression [,offset] [,default]) OVER ([query_partition_clause] order_by_clause); " + "The distinctLag function is used to access data from a distinct previous row.", extended = "Example:\n " + "select p1.p_mfgr, p1.p_name, p1.p_size,\n" + " p1.p_size - distinctLag(p1.p_size,1,p1.p_size) over( distribute by p1.p_mfgr sort by p1.p_name) as deltaSz\n" + " from part p1 join part p2 on p1.p_partkey = p2.p_partkey") @UDFType(impliesOrder = true) public class GenericUDFDistinctLag extends GenericUDFLeadLag { @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { Object defaultVal = null; if (arguments.length == 3) { defaultVal = ObjectInspectorUtils.copyToStandardObject(getDefaultValueConverter().convert(arguments[2].get()), getDefaultArgOI()); } int idx = getpItr().getIndex() - 1; int start = 0; int end = getpItr().getPartition().size(); try { Object currValue = ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().resetToIndex(idx)), getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE); Object ret = null; int newIdx = idx; do { --newIdx; if (newIdx >= end || newIdx < start) { ret = defaultVal; return ret; }else{ ret = ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().lag(1)), getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE); if(ret.equals(currValue)){ setAmt(getAmt() - 1); } } } while (getAmt() > 0); return ret; } finally { Object currRow = getpItr().resetToIndex(idx); // reevaluate expression on current Row, to trigger the Lazy object // caches to be reset to the current row. getExprEvaluator().evaluate(currRow); } } @Override protected String _getFnName(){ return "distinctLag"; } @Override protected Object getRow(int amt) throws HiveException { throw new HiveException("distinctLag error: cannot call getRow"); } @Override protected int getIndex(int amt) { // TODO Auto-generated method stub return 0; } }{code} and package as a jar,add into hive,create a temporary function. then,i run: {color:#14892c}select session,sq,distinctLag(channel)over(partition by session order by sq) from active;{color} {color:#333333}It reported an error:{color} {color:#d04437}FAILED: SemanticException Failed to breakup Windowing invocations into Groups. At least 1 group must only depend on input columns. Also check for circular dependencies. Underlying error: Invalid function distinctLag{color} {color:#333333}I don't know exactly what the problem is. I hope someone can give me a hint. Thank you.{color} {color:#333333}then,I noticed that there have a UDAF function GenericUDAFLag.I tried to imitate it.{color} {code:java} import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing; @WindowFunctionDescription(description = @Description(name = "lag", value = "_FUNC_(expr, amt, default)"), supportsWindow = false, pivotResult = true, impliesOrder = true) public class GenericUDAFDistinctLag extends GenericUDAFDistinctLeadLag { static final Log LOG = LogFactory.getLog(GenericUDAFDistinctLag.class.getName()); @Override protected String functionName() { return "Lag"; } @Override protected GenericUDAFDistinctLeadLagEvaluator createLLEvaluator() { return new GenericUDAFDistinctLagEvaluator(); } public static class GenericUDAFDistinctLagEvaluator extends GenericUDAFDistinctLeadLagEvaluator { public GenericUDAFDistinctLagEvaluator() { } /* * used to initialize Streaming Evaluator. */ protected GenericUDAFDistinctLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) { super(src); } @Override protected DistinctLeadLagBuffer getNewLLBuffer() throws HiveException { return new DistinctLagBuffer(); } @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { return new GenericUDAFDistinctLagEvaluatorStreaming(this); } } static class DistinctLagBuffer implements DistinctLeadLagBuffer { ArrayList<Object> values; int lagAmt; ArrayList<Object> lagValues; @Override public void initialize(int lagAmt) { this.lagAmt = lagAmt; lagValues = new ArrayList<Object>(); values = new ArrayList<Object>(); } @Override public void addRow(Object currValue, Object defaultValue) { int i = values.size() - 1; int noEquals = 0; for (; i >= 0; i--) { if (!currValue.equals(values.get(i))) { if (++noEquals == lagAmt) { break; } } } lagValues.add(i == -1 ? defaultValue : values.get(i)); values.add(currValue); } @Override public Object terminate() { return lagValues; } } /* * StreamingEval: wrap regular eval. on getNext remove first row from values * and return it. */ static class GenericUDAFDistinctLagEvaluatorStreaming extends GenericUDAFDistinctLagEvaluator implements ISupportStreamingModeForWindowing { protected GenericUDAFDistinctLagEvaluatorStreaming(GenericUDAFDistinctLeadLagEvaluator src) { super(src); } @Override public Object getNextResult(AggregationBuffer agg) throws HiveException { DistinctLagBuffer lb = (DistinctLagBuffer) agg; if (!lb.lagValues.isEmpty()) { Object res = lb.lagValues.remove(0); if (res == null) { return ISupportStreamingModeForWindowing.NULL_RESULT; } return res; } else if (!lb.values.isEmpty()) { Object res = lb.values.remove(0); if (res == null) { return ISupportStreamingModeForWindowing.NULL_RESULT; } return res; } return null; } @Override public int getRowsRemainingAfterTerminate() throws HiveException { return getAmt(); } } } {code} {code:java} import java.lang.reflect.Field; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.io.IntWritable; public abstract class GenericUDAFDistinctLeadLag extends AbstractGenericUDAFResolver { static final Log LOG = LogFactory.getLog(GenericUDAFLead.class.getName()); @Override public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo parameters) throws SemanticException { ObjectInspector[] paramOIs = parameters.getParameterObjectInspectors(); String fNm = functionName(); if (!(paramOIs.length >= 1 && paramOIs.length <= 3)) { throw new UDFArgumentTypeException(paramOIs.length - 1, "Incorrect invocation of " + fNm + ": _FUNC_(expr, amt, default)"); } int amt = 1; if (paramOIs.length > 1) { ObjectInspector amtOI = paramOIs[1]; if (!ObjectInspectorUtils.isConstantObjectInspector(amtOI) || (amtOI.getCategory() != ObjectInspector.Category.PRIMITIVE) || ((PrimitiveObjectInspector) amtOI).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT) { throw new UDFArgumentTypeException(1, fNm + " amount must be a integer value " + amtOI.getTypeName() + " was passed as parameter 1."); } Object o = ((ConstantObjectInspector) amtOI).getWritableConstantValue(); amt = ((IntWritable) o).get(); if (amt < 0) { throw new UDFArgumentTypeException(1, fNm + " amount can not be nagative. Specified: " + amt); } } if (paramOIs.length == 3) { ObjectInspectorConverters.getConverter(paramOIs[2], paramOIs[0]); } GenericUDAFDistinctLeadLagEvaluator eval = createLLEvaluator(); eval.setAmt(amt); return eval; } protected abstract String functionName(); protected abstract GenericUDAFDistinctLeadLagEvaluator createLLEvaluator(); public static abstract class GenericUDAFDistinctLeadLagEvaluator extends GenericUDAFEvaluator { private transient ObjectInspector[] inputOI; private int amt; String fnName; private transient Converter defaultValueConverter; public GenericUDAFDistinctLeadLagEvaluator() { } /* * used to initialize Streaming Evaluator. */ protected GenericUDAFDistinctLeadLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) { this.inputOI = src.inputOI; this.amt = src.amt; this.fnName = src.fnName; this.defaultValueConverter = src.defaultValueConverter; try { Field mode = GenericUDAFEvaluator.class.getDeclaredField("mode"); mode.setAccessible(true); mode.set(this, mode.get(src)); mode.setAccessible(false); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalAccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (NoSuchFieldException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (SecurityException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); if (m != Mode.COMPLETE) { throw new HiveException("Only COMPLETE mode supported for " + fnName + " function"); } inputOI = parameters; if (parameters.length == 3) { defaultValueConverter = ObjectInspectorConverters.getConverter(parameters[2], parameters[0]); } return ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(parameters[0])); } public int getAmt() { return amt; } public void setAmt(int amt) { this.amt = amt; } public String getFnName() { return fnName; } public void setFnName(String fnName) { this.fnName = fnName; } protected abstract DistinctLeadLagBuffer getNewLLBuffer() throws HiveException; @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { DistinctLeadLagBuffer lb = getNewLLBuffer(); lb.initialize(amt); return lb; } @Override public void reset(AggregationBuffer agg) throws HiveException { ((DistinctLeadLagBuffer) agg).initialize(amt); } @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { Object rowExprVal = ObjectInspectorUtils.copyToStandardObject(parameters[0], inputOI[0]); Object defaultVal = parameters.length > 2 ? ObjectInspectorUtils.copyToStandardObject(defaultValueConverter.convert(parameters[2]), inputOI[0]) : null; ((DistinctLeadLagBuffer) agg).addRow(rowExprVal, defaultVal); } @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { throw new HiveException("terminatePartial not supported"); } @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { throw new HiveException("merge not supported"); } @Override public Object terminate(AggregationBuffer agg) throws HiveException { return ((DistinctLeadLagBuffer) agg).terminate(); } } } {code} {code:java} import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; @SuppressWarnings("deprecation") interface DistinctLeadLagBuffer extends AggregationBuffer { void initialize(int leadAmt); void addRow(Object leadExprValue, Object defaultValue); Object terminate(); } {code} and package as a jar,add into hive,create a temporary function.in hige hive vesion,it works, but in{color:#14892c} hive1.1.0{color} version ,It reported an error {color:#333333}(and in hive1.1.0 if i create that temporary function named lag or lead,it also works as what i want ,but it will cover hive's built-in function lag/lead even if deleted that temporary function,Only when I quit hive-cli and reenter hive-cli , built-in function lag/lead can work){color}: {code:java} hive> SELECT session,sq,distinctLag(channel)over(PARTITION BY session ORDER BY sq) FROM elephant_active; Query ID = root_20190131195959_8047b4ba-a85c-4f39-8a27-989388316c50 Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1546504603780_0492, Tracking URL = http://dce.bi.com:8088/proxy/application_1546504603780_0492/ Kill Command = /opt/cloudera/parcels/CDH-5.10.2-1.cdh5.10.2.p0.5/lib/hadoop/bin/hadoop job -kill job_1546504603780_0492 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2019-01-31 20:00:03,639 Stage-1 map = 0%, reduce = 0% 2019-01-31 20:00:09,972 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.38 sec 2019-01-31 20:00:30,795 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 1.38 sec MapReduce Total cumulative CPU time: 1 seconds 380 msec Ended Job = job_1546504603780_0492 with errors Error during job, obtaining debugging information... Examining task ID: task_1546504603780_0492_m_000000 (and more) from job job_1546504603780_0492 Task with the most failures(4): ----- Task ID: task_1546504603780_0492_r_000000 URL: http://0.0.0.0:8088/taskdetails.jsp?jobid=job_1546504603780_0492&tipid=task_1546504603780_0492_r_000000 ----- Diagnostic Messages for this Task: Error: java.lang.RuntimeException: Error in configuring object at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:409) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106) ... 9 more Caused by: java.lang.RuntimeException: Reduce operator initialization failed at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:166) ... 14 more Caused by: java.lang.NullPointerException at org.apache.hadoop.hive.ql.exec.Registry.getFunctionInfo(Registry.java:306) at org.apache.hadoop.hive.ql.exec.Registry.getWindowFunctionInfo(Registry.java:314) at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getWindowFunctionInfo(FunctionRegistry.java:504) at org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.streamingPossible(WindowingTableFunction.java:151) at org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.setCanAcceptInputAsStream(WindowingTableFunction.java:222) at org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.initializeStreaming(WindowingTableFunction.java:256) at org.apache.hadoop.hive.ql.exec.PTFOperator$PTFInvocation.initializeStreaming(PTFOperator.java:291) at org.apache.hadoop.hive.ql.exec.PTFOperator.initializeOp(PTFOperator.java:86) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469) at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425) at org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:159) ... 14 more FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask MapReduce Jobs Launched: Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 1.38 sec HDFS Read: 2958 HDFS Write: 0 FAIL Total MapReduce CPU Time Spent: 1 seconds 380 msec hive> {code} I guess it's FunctionRegistry problem. I am a beginner. I hope someone can tell me the correct way to realize this special function. Thank you very much. I use Hive 1.1.0 + cdh5.10.2 + 945. {code:java} <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.1.0-cdh5.10.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.9</version> <scope>test</scope> </dependency> </dependencies> {code} was: i want a distinctLag functions ,The function is like lag, but the difference is to select different values in front of it. Example: {color:#14892c}select * from active{color} ||session||sq||channel|| |1|1|A| |1|2|B| |1|3|B| |1|4|C| |1|5|B| |2|1|C| |2|2|B| |2|3|B| |2|4|A| |2|5|B| {color:#14892c} select session,sq,lag(channel)over(partition by session order by sq) from active{color} ||session||sq||channel|| |1|1|null| |1|2|A| |1|3|B| |1|4|B| |1|5|C| |2|1|null| |2|2|C| |2|3|B| |2|4|B| |2|5|A| The function I want is:{color:#14892c} select session,sq,distinctLag(channel)over(partition by session order by sq) from active{color} ||session||sq||channel|| |1|1|null| |1|2|A| |1|3|A| |1|4|B| |1|5|C| |2|1|null| |2|2|C| |2|3|C| |2|4|B| |2|5|A| i try to extend GenericUDFLeadLag and Override: {code:java} import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; @Description( name = "distinctLag", value = "distinctLag (scalar_expression [,offset] [,default]) OVER ([query_partition_clause] order_by_clause); " + "The distinctLag function is used to access data from a distinct previous row.", extended = "Example:\n " + "select p1.p_mfgr, p1.p_name, p1.p_size,\n" + " p1.p_size - distinctLag(p1.p_size,1,p1.p_size) over( distribute by p1.p_mfgr sort by p1.p_name) as deltaSz\n" + " from part p1 join part p2 on p1.p_partkey = p2.p_partkey") @UDFType(impliesOrder = true) public class GenericUDFDistinctLag extends GenericUDFLeadLag { @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { Object defaultVal = null; if (arguments.length == 3) { defaultVal = ObjectInspectorUtils.copyToStandardObject(getDefaultValueConverter().convert(arguments[2].get()), getDefaultArgOI()); } int idx = getpItr().getIndex() - 1; int start = 0; int end = getpItr().getPartition().size(); try { Object currValue = ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().resetToIndex(idx)), getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE); Object ret = null; int newIdx = idx; do { --newIdx; if (newIdx >= end || newIdx < start) { ret = defaultVal; return ret; }else{ ret = ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().lag(1)), getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE); if(ret.equals(currValue)){ setAmt(getAmt() - 1); } } } while (getAmt() > 0); return ret; } finally { Object currRow = getpItr().resetToIndex(idx); // reevaluate expression on current Row, to trigger the Lazy object // caches to be reset to the current row. getExprEvaluator().evaluate(currRow); } } @Override protected String _getFnName(){ return "distinctLag"; } @Override protected Object getRow(int amt) throws HiveException { throw new HiveException("distinctLag error: cannot call getRow"); } @Override protected int getIndex(int amt) { // TODO Auto-generated method stub return 0; } }{code} and package as a jar,add into hive,create a temporary function. then,i run: {color:#14892c}select session,sq,distinctLag(channel)over(partition by session order by sq) from active;{color} {color:#333333}It reported an error:{color} {color:#d04437}FAILED: SemanticException Failed to breakup Windowing invocations into Groups. At least 1 group must only depend on input columns. Also check for circular dependencies. Underlying error: Invalid function distinctLag{color} {color:#333333}I don't know exactly what the problem is. I hope someone can give me a hint. Thank you.{color} {color:#333333}then,I noticed that there have a UDAF function GenericUDAFLag.I tried to imitate it.{color} {code:java} import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing; @WindowFunctionDescription(description = @Description(name = "lag", value = "_FUNC_(expr, amt, default)"), supportsWindow = false, pivotResult = true, impliesOrder = true) public class GenericUDAFDistinctLag extends GenericUDAFDistinctLeadLag { static final Log LOG = LogFactory.getLog(GenericUDAFDistinctLag.class.getName()); @Override protected String functionName() { return "Lag"; } @Override protected GenericUDAFDistinctLeadLagEvaluator createLLEvaluator() { return new GenericUDAFDistinctLagEvaluator(); } public static class GenericUDAFDistinctLagEvaluator extends GenericUDAFDistinctLeadLagEvaluator { public GenericUDAFDistinctLagEvaluator() { } /* * used to initialize Streaming Evaluator. */ protected GenericUDAFDistinctLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) { super(src); } @Override protected DistinctLeadLagBuffer getNewLLBuffer() throws HiveException { return new DistinctLagBuffer(); } @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { return new GenericUDAFDistinctLagEvaluatorStreaming(this); } } static class DistinctLagBuffer implements DistinctLeadLagBuffer { ArrayList<Object> values; int lagAmt; ArrayList<Object> lagValues; @Override public void initialize(int lagAmt) { this.lagAmt = lagAmt; lagValues = new ArrayList<Object>(); values = new ArrayList<Object>(); } @Override public void addRow(Object currValue, Object defaultValue) { int i = values.size() - 1; int noEquals = 0; for (; i >= 0; i--) { if (!currValue.equals(values.get(i))) { if (++noEquals == lagAmt) { break; } } } lagValues.add(i == -1 ? defaultValue : values.get(i)); values.add(currValue); } @Override public Object terminate() { return lagValues; } } /* * StreamingEval: wrap regular eval. on getNext remove first row from values * and return it. */ static class GenericUDAFDistinctLagEvaluatorStreaming extends GenericUDAFDistinctLagEvaluator implements ISupportStreamingModeForWindowing { protected GenericUDAFDistinctLagEvaluatorStreaming(GenericUDAFDistinctLeadLagEvaluator src) { super(src); } @Override public Object getNextResult(AggregationBuffer agg) throws HiveException { DistinctLagBuffer lb = (DistinctLagBuffer) agg; if (!lb.lagValues.isEmpty()) { Object res = lb.lagValues.remove(0); if (res == null) { return ISupportStreamingModeForWindowing.NULL_RESULT; } return res; } else if (!lb.values.isEmpty()) { Object res = lb.values.remove(0); if (res == null) { return ISupportStreamingModeForWindowing.NULL_RESULT; } return res; } return null; } @Override public int getRowsRemainingAfterTerminate() throws HiveException { return getAmt(); } } } {code} {code:java} import java.lang.reflect.Field; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.io.IntWritable; public abstract class GenericUDAFDistinctLeadLag extends AbstractGenericUDAFResolver { static final Log LOG = LogFactory.getLog(GenericUDAFLead.class.getName()); @Override public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo parameters) throws SemanticException { ObjectInspector[] paramOIs = parameters.getParameterObjectInspectors(); String fNm = functionName(); if (!(paramOIs.length >= 1 && paramOIs.length <= 3)) { throw new UDFArgumentTypeException(paramOIs.length - 1, "Incorrect invocation of " + fNm + ": _FUNC_(expr, amt, default)"); } int amt = 1; if (paramOIs.length > 1) { ObjectInspector amtOI = paramOIs[1]; if (!ObjectInspectorUtils.isConstantObjectInspector(amtOI) || (amtOI.getCategory() != ObjectInspector.Category.PRIMITIVE) || ((PrimitiveObjectInspector) amtOI).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT) { throw new UDFArgumentTypeException(1, fNm + " amount must be a integer value " + amtOI.getTypeName() + " was passed as parameter 1."); } Object o = ((ConstantObjectInspector) amtOI).getWritableConstantValue(); amt = ((IntWritable) o).get(); if (amt < 0) { throw new UDFArgumentTypeException(1, fNm + " amount can not be nagative. Specified: " + amt); } } if (paramOIs.length == 3) { ObjectInspectorConverters.getConverter(paramOIs[2], paramOIs[0]); } GenericUDAFDistinctLeadLagEvaluator eval = createLLEvaluator(); eval.setAmt(amt); return eval; } protected abstract String functionName(); protected abstract GenericUDAFDistinctLeadLagEvaluator createLLEvaluator(); public static abstract class GenericUDAFDistinctLeadLagEvaluator extends GenericUDAFEvaluator { private transient ObjectInspector[] inputOI; private int amt; String fnName; private transient Converter defaultValueConverter; public GenericUDAFDistinctLeadLagEvaluator() { } /* * used to initialize Streaming Evaluator. */ protected GenericUDAFDistinctLeadLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) { this.inputOI = src.inputOI; this.amt = src.amt; this.fnName = src.fnName; this.defaultValueConverter = src.defaultValueConverter; try { Field mode = GenericUDAFEvaluator.class.getDeclaredField("mode"); mode.setAccessible(true); mode.set(this, mode.get(src)); mode.setAccessible(false); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalAccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (NoSuchFieldException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (SecurityException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); if (m != Mode.COMPLETE) { throw new HiveException("Only COMPLETE mode supported for " + fnName + " function"); } inputOI = parameters; if (parameters.length == 3) { defaultValueConverter = ObjectInspectorConverters.getConverter(parameters[2], parameters[0]); } return ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(parameters[0])); } public int getAmt() { return amt; } public void setAmt(int amt) { this.amt = amt; } public String getFnName() { return fnName; } public void setFnName(String fnName) { this.fnName = fnName; } protected abstract DistinctLeadLagBuffer getNewLLBuffer() throws HiveException; @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { DistinctLeadLagBuffer lb = getNewLLBuffer(); lb.initialize(amt); return lb; } @Override public void reset(AggregationBuffer agg) throws HiveException { ((DistinctLeadLagBuffer) agg).initialize(amt); } @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { Object rowExprVal = ObjectInspectorUtils.copyToStandardObject(parameters[0], inputOI[0]); Object defaultVal = parameters.length > 2 ? ObjectInspectorUtils.copyToStandardObject(defaultValueConverter.convert(parameters[2]), inputOI[0]) : null; ((DistinctLeadLagBuffer) agg).addRow(rowExprVal, defaultVal); } @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { throw new HiveException("terminatePartial not supported"); } @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { throw new HiveException("merge not supported"); } @Override public Object terminate(AggregationBuffer agg) throws HiveException { return ((DistinctLeadLagBuffer) agg).terminate(); } } } {code} {code:java} import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; @SuppressWarnings("deprecation") interface DistinctLeadLagBuffer extends AggregationBuffer { void initialize(int leadAmt); void addRow(Object leadExprValue, Object defaultValue); Object terminate(); } {code} and package as a jar,add into hive,create a temporary function.in hige hive vesion,it works, but in{color:#14892c} hive1.1.0{color} version ,It reported an error {color:#333333}(and in hive1.1.0 if i create that temporary function named lag or lead,it also works as what i want ,but it will cover hive's built-in function lag/lead even if deleted that temporary function,Only when I quit hive-cli and reenter hive-cli , built-in function lag/lead can work){color}: {code:java} hive> SELECT session,sq,distinctLag(channel)over(PARTITION BY session ORDER BY sq) FROM elephant_active; Query ID = root_20190131195959_8047b4ba-a85c-4f39-8a27-989388316c50 Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1546504603780_0492, Tracking URL = http://dce.bi.com:8088/proxy/application_1546504603780_0492/ Kill Command = /opt/cloudera/parcels/CDH-5.10.2-1.cdh5.10.2.p0.5/lib/hadoop/bin/hadoop job -kill job_1546504603780_0492 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2019-01-31 20:00:03,639 Stage-1 map = 0%, reduce = 0% 2019-01-31 20:00:09,972 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.38 sec 2019-01-31 20:00:30,795 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 1.38 sec MapReduce Total cumulative CPU time: 1 seconds 380 msec Ended Job = job_1546504603780_0492 with errors Error during job, obtaining debugging information... Examining task ID: task_1546504603780_0492_m_000000 (and more) from job job_1546504603780_0492 Task with the most failures(4): ----- Task ID: task_1546504603780_0492_r_000000 URL: http://0.0.0.0:8088/taskdetails.jsp?jobid=job_1546504603780_0492&tipid=task_1546504603780_0492_r_000000 ----- Diagnostic Messages for this Task: Error: java.lang.RuntimeException: Error in configuring object at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:409) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106) ... 9 more Caused by: java.lang.RuntimeException: Reduce operator initialization failed at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:166) ... 14 more Caused by: java.lang.NullPointerException at org.apache.hadoop.hive.ql.exec.Registry.getFunctionInfo(Registry.java:306) at org.apache.hadoop.hive.ql.exec.Registry.getWindowFunctionInfo(Registry.java:314) at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getWindowFunctionInfo(FunctionRegistry.java:504) at org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.streamingPossible(WindowingTableFunction.java:151) at org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.setCanAcceptInputAsStream(WindowingTableFunction.java:222) at org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.initializeStreaming(WindowingTableFunction.java:256) at org.apache.hadoop.hive.ql.exec.PTFOperator$PTFInvocation.initializeStreaming(PTFOperator.java:291) at org.apache.hadoop.hive.ql.exec.PTFOperator.initializeOp(PTFOperator.java:86) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469) at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425) at org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:159) ... 14 more FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask MapReduce Jobs Launched: Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 1.38 sec HDFS Read: 2958 HDFS Write: 0 FAIL Total MapReduce CPU Time Spent: 1 seconds 380 msec hive> {code} I guess it's FunctionRegistryproblem. I am a beginner. I hope someone can tell me the correct way to realize this special function. Thank you very much. I use Hive 1.1.0 + cdh5.10.2 + 945. {code:java} <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.1.0-cdh5.10.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.9</version> <scope>test</scope> </dependency> </dependencies> {code} > I want to extends lag/lead functions to Implementing some special functions, > And I met some problems > ----------------------------------------------------------------------------------------------------- > > Key: HIVE-21191 > URL: https://issues.apache.org/jira/browse/HIVE-21191 > Project: Hive > Issue Type: Wish > Components: Hive, UDF, Windows > Affects Versions: 1.1.0 > Reporter: one > Priority: Minor > Labels: LAG(), UDAF, UDF, window_function > > i want a distinctLag functions ,The function is like lag, but the difference > is to select different values in front of it. > Example: > {color:#14892c}select * from active{color} > ||session||sq||channel|| > |1|1|A| > |1|2|B| > |1|3|B| > |1|4|C| > |1|5|B| > |2|1|C| > |2|2|B| > |2|3|B| > |2|4|A| > |2|5|B| > {color:#14892c} > select session,sq,lag(channel)over(partition by session order by sq) from > active{color} > ||session||sq||channel|| > |1|1|null| > |1|2|A| > |1|3|B| > |1|4|B| > |1|5|C| > |2|1|null| > |2|2|C| > |2|3|B| > |2|4|B| > |2|5|A| > The function I want is:{color:#14892c} > select session,sq,distinctLag(channel)over(partition by session order by sq) > from active{color} > ||session||sq||channel|| > |1|1|null| > |1|2|A| > |1|3|A| > |1|4|B| > |1|5|C| > |2|1|null| > |2|2|C| > |2|3|C| > |2|4|B| > |2|5|A| > > i try to extend GenericUDFLeadLag and Override: > {code:java} > import org.apache.hadoop.hive.ql.exec.Description; > import org.apache.hadoop.hive.ql.metadata.HiveException; > import org.apache.hadoop.hive.ql.udf.UDFType; > import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag; > import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; > import > org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; > @Description( > name = "distinctLag", > value = "distinctLag (scalar_expression [,offset] [,default]) OVER > ([query_partition_clause] order_by_clause); " > + "The distinctLag function is used to access data from a > distinct previous row.", > extended = "Example:\n " > + "select p1.p_mfgr, p1.p_name, p1.p_size,\n" > + " p1.p_size - distinctLag(p1.p_size,1,p1.p_size) over( distribute > by p1.p_mfgr sort by p1.p_name) as deltaSz\n" > + " from part p1 join part p2 on p1.p_partkey = p2.p_partkey") > @UDFType(impliesOrder = true) > public class GenericUDFDistinctLag extends GenericUDFLeadLag { > @Override > public Object evaluate(DeferredObject[] arguments) throws HiveException > { > Object defaultVal = null; > if (arguments.length == 3) { > defaultVal = > ObjectInspectorUtils.copyToStandardObject(getDefaultValueConverter().convert(arguments[2].get()), > getDefaultArgOI()); > } > int idx = getpItr().getIndex() - 1; > int start = 0; > int end = getpItr().getPartition().size(); > try { > Object currValue = > ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().resetToIndex(idx)), > getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE); > Object ret = null; > int newIdx = idx; > do { > --newIdx; > if (newIdx >= end || newIdx < start) { > ret = defaultVal; > return ret; > }else{ > ret = > ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().lag(1)), > getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE); > if(ret.equals(currValue)){ > setAmt(getAmt() - 1); > } > } > } while (getAmt() > 0); > return ret; > } finally { > Object currRow = getpItr().resetToIndex(idx); > // reevaluate expression on current Row, to trigger the > Lazy object > // caches to be reset to the current row. > getExprEvaluator().evaluate(currRow); > } > } > @Override > protected String _getFnName(){ > return "distinctLag"; > } > @Override > protected Object getRow(int amt) throws HiveException { > throw new HiveException("distinctLag error: cannot call > getRow"); > } > @Override > protected int getIndex(int amt) { > // TODO Auto-generated method stub > return 0; > } > }{code} > and package as a jar,add into hive,create a temporary function. > then,i run: > {color:#14892c}select session,sq,distinctLag(channel)over(partition by > session order by sq) from active;{color} > {color:#333333}It reported an error:{color} > {color:#d04437}FAILED: SemanticException Failed to breakup Windowing > invocations into Groups. At least 1 group must only depend on input columns. > Also check for circular dependencies. > Underlying error: Invalid function distinctLag{color} > {color:#333333}I don't know exactly what the problem is. I hope someone can > give me a hint. Thank you.{color} > {color:#333333}then,I noticed that there have a UDAF function > GenericUDAFLag.I tried to imitate it.{color} > {code:java} > import java.util.ArrayList; > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.apache.hadoop.hive.ql.exec.Description; > import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription; > import org.apache.hadoop.hive.ql.metadata.HiveException; > import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; > import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; > import > org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing; > @WindowFunctionDescription(description = @Description(name = "lag", value = > "_FUNC_(expr, amt, default)"), supportsWindow = false, pivotResult = true, > impliesOrder = true) > public class GenericUDAFDistinctLag extends GenericUDAFDistinctLeadLag { > static final Log LOG = > LogFactory.getLog(GenericUDAFDistinctLag.class.getName()); > @Override > protected String functionName() { > return "Lag"; > } > @Override > protected GenericUDAFDistinctLeadLagEvaluator createLLEvaluator() { > return new GenericUDAFDistinctLagEvaluator(); > } > public static class GenericUDAFDistinctLagEvaluator extends > GenericUDAFDistinctLeadLagEvaluator { > public GenericUDAFDistinctLagEvaluator() { > } > /* > * used to initialize Streaming Evaluator. > */ > protected > GenericUDAFDistinctLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) { > super(src); > } > @Override > protected DistinctLeadLagBuffer getNewLLBuffer() throws > HiveException { > return new DistinctLagBuffer(); > } > @Override > public GenericUDAFEvaluator > getWindowingEvaluator(WindowFrameDef wFrmDef) { > return new > GenericUDAFDistinctLagEvaluatorStreaming(this); > } > } > static class DistinctLagBuffer implements DistinctLeadLagBuffer { > ArrayList<Object> values; > int lagAmt; > ArrayList<Object> lagValues; > @Override > public void initialize(int lagAmt) { > this.lagAmt = lagAmt; > lagValues = new ArrayList<Object>(); > values = new ArrayList<Object>(); > } > @Override > public void addRow(Object currValue, Object defaultValue) { > int i = values.size() - 1; > int noEquals = 0; > for (; i >= 0; i--) { > if (!currValue.equals(values.get(i))) { > if (++noEquals == lagAmt) { > break; > } > } > } > lagValues.add(i == -1 ? defaultValue : values.get(i)); > values.add(currValue); > } > @Override > public Object terminate() { > return lagValues; > } > } > /* > * StreamingEval: wrap regular eval. on getNext remove first row from > values > * and return it. > */ > static class GenericUDAFDistinctLagEvaluatorStreaming extends > GenericUDAFDistinctLagEvaluator implements ISupportStreamingModeForWindowing { > protected > GenericUDAFDistinctLagEvaluatorStreaming(GenericUDAFDistinctLeadLagEvaluator > src) { > super(src); > } > @Override > public Object getNextResult(AggregationBuffer agg) throws > HiveException { > DistinctLagBuffer lb = (DistinctLagBuffer) agg; > if (!lb.lagValues.isEmpty()) { > Object res = lb.lagValues.remove(0); > if (res == null) { > return > ISupportStreamingModeForWindowing.NULL_RESULT; > } > return res; > } else if (!lb.values.isEmpty()) { > Object res = lb.values.remove(0); > if (res == null) { > return > ISupportStreamingModeForWindowing.NULL_RESULT; > } > return res; > } > return null; > } > @Override > public int getRowsRemainingAfterTerminate() throws > HiveException { > return getAmt(); > } > } > } > {code} > {code:java} > import java.lang.reflect.Field; > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; > import org.apache.hadoop.hive.ql.metadata.HiveException; > import org.apache.hadoop.hive.ql.parse.SemanticException; > import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; > import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; > import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead; > import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo; > import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; > import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; > import > org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; > import > org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; > import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; > import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; > import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; > import org.apache.hadoop.io.IntWritable; > public abstract class GenericUDAFDistinctLeadLag extends > AbstractGenericUDAFResolver { > static final Log LOG = > LogFactory.getLog(GenericUDAFLead.class.getName()); > @Override > public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo > parameters) throws SemanticException { > ObjectInspector[] paramOIs = > parameters.getParameterObjectInspectors(); > String fNm = functionName(); > if (!(paramOIs.length >= 1 && paramOIs.length <= 3)) { > throw new UDFArgumentTypeException(paramOIs.length - 1, > "Incorrect invocation of " + fNm + ": _FUNC_(expr, amt, default)"); > } > int amt = 1; > if (paramOIs.length > 1) { > ObjectInspector amtOI = paramOIs[1]; > if > (!ObjectInspectorUtils.isConstantObjectInspector(amtOI) || > (amtOI.getCategory() != ObjectInspector.Category.PRIMITIVE) > || ((PrimitiveObjectInspector) > amtOI).getPrimitiveCategory() != > PrimitiveObjectInspector.PrimitiveCategory.INT) { > throw new UDFArgumentTypeException(1, fNm + " > amount must be a integer value " + amtOI.getTypeName() + " was passed as > parameter 1."); > } > Object o = ((ConstantObjectInspector) > amtOI).getWritableConstantValue(); > amt = ((IntWritable) o).get(); > if (amt < 0) { > throw new UDFArgumentTypeException(1, fNm + " > amount can not be nagative. Specified: " + amt); > } > } > if (paramOIs.length == 3) { > ObjectInspectorConverters.getConverter(paramOIs[2], > paramOIs[0]); > } > GenericUDAFDistinctLeadLagEvaluator eval = createLLEvaluator(); > eval.setAmt(amt); > return eval; > } > protected abstract String functionName(); > protected abstract GenericUDAFDistinctLeadLagEvaluator > createLLEvaluator(); > public static abstract class GenericUDAFDistinctLeadLagEvaluator > extends GenericUDAFEvaluator { > private transient ObjectInspector[] inputOI; > private int amt; > String fnName; > private transient Converter defaultValueConverter; > public GenericUDAFDistinctLeadLagEvaluator() { > } > /* > * used to initialize Streaming Evaluator. > */ > protected > GenericUDAFDistinctLeadLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) { > this.inputOI = src.inputOI; > this.amt = src.amt; > this.fnName = src.fnName; > this.defaultValueConverter = src.defaultValueConverter; > try { > Field mode = > GenericUDAFEvaluator.class.getDeclaredField("mode"); > mode.setAccessible(true); > mode.set(this, mode.get(src)); > mode.setAccessible(false); > } catch (IllegalArgumentException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } catch (IllegalAccessException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } catch (NoSuchFieldException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } catch (SecurityException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > } > @Override > public ObjectInspector init(Mode m, ObjectInspector[] > parameters) throws HiveException { > super.init(m, parameters); > if (m != Mode.COMPLETE) { > throw new HiveException("Only COMPLETE mode > supported for " + fnName + " function"); > } > inputOI = parameters; > if (parameters.length == 3) { > defaultValueConverter = > ObjectInspectorConverters.getConverter(parameters[2], parameters[0]); > } > return > ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(parameters[0])); > } > public int getAmt() { > return amt; > } > public void setAmt(int amt) { > this.amt = amt; > } > public String getFnName() { > return fnName; > } > public void setFnName(String fnName) { > this.fnName = fnName; > } > protected abstract DistinctLeadLagBuffer getNewLLBuffer() > throws HiveException; > @Override > public AggregationBuffer getNewAggregationBuffer() throws > HiveException { > DistinctLeadLagBuffer lb = getNewLLBuffer(); > lb.initialize(amt); > return lb; > } > @Override > public void reset(AggregationBuffer agg) throws HiveException { > ((DistinctLeadLagBuffer) agg).initialize(amt); > } > @Override > public void iterate(AggregationBuffer agg, Object[] parameters) > throws HiveException { > Object rowExprVal = > ObjectInspectorUtils.copyToStandardObject(parameters[0], inputOI[0]); > Object defaultVal = parameters.length > 2 ? > ObjectInspectorUtils.copyToStandardObject(defaultValueConverter.convert(parameters[2]), > inputOI[0]) : null; > ((DistinctLeadLagBuffer) agg).addRow(rowExprVal, > defaultVal); > } > @Override > public Object terminatePartial(AggregationBuffer agg) throws > HiveException { > throw new HiveException("terminatePartial not > supported"); > } > @Override > public void merge(AggregationBuffer agg, Object partial) throws > HiveException { > throw new HiveException("merge not supported"); > } > @Override > public Object terminate(AggregationBuffer agg) throws > HiveException { > return ((DistinctLeadLagBuffer) agg).terminate(); > } > } > } > {code} > {code:java} > import > org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; > @SuppressWarnings("deprecation") > interface DistinctLeadLagBuffer extends AggregationBuffer { > void initialize(int leadAmt); > void addRow(Object leadExprValue, Object defaultValue); > Object terminate(); > } > {code} > and package as a jar,add into hive,create a temporary function.in hige hive > vesion,it works, but in{color:#14892c} hive1.1.0{color} version ,It reported > an error {color:#333333}(and in hive1.1.0 if i create that temporary function > named lag or lead,it also works as what i want ,but it will cover hive's > built-in function lag/lead even if deleted that temporary function,Only when > I quit hive-cli and reenter hive-cli , built-in function lag/lead can > work){color}: > {code:java} > hive> SELECT session,sq,distinctLag(channel)over(PARTITION BY session ORDER > BY sq) FROM elephant_active; > Query ID = root_20190131195959_8047b4ba-a85c-4f39-8a27-989388316c50 > Total jobs = 1 > Launching Job 1 out of 1 > Number of reduce tasks not specified. Estimated from input data size: 1 > In order to change the average load for a reducer (in bytes): > set hive.exec.reducers.bytes.per.reducer=<number> > In order to limit the maximum number of reducers: > set hive.exec.reducers.max=<number> > In order to set a constant number of reducers: > set mapreduce.job.reduces=<number> > Starting Job = job_1546504603780_0492, Tracking URL = > http://dce.bi.com:8088/proxy/application_1546504603780_0492/ > Kill Command = > /opt/cloudera/parcels/CDH-5.10.2-1.cdh5.10.2.p0.5/lib/hadoop/bin/hadoop job > -kill job_1546504603780_0492 > Hadoop job information for Stage-1: number of mappers: 1; number of reducers: > 1 > 2019-01-31 20:00:03,639 Stage-1 map = 0%, reduce = 0% > 2019-01-31 20:00:09,972 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.38 > sec > 2019-01-31 20:00:30,795 Stage-1 map = 100%, reduce = 100%, Cumulative CPU > 1.38 sec > MapReduce Total cumulative CPU time: 1 seconds 380 msec > Ended Job = job_1546504603780_0492 with errors > Error during job, obtaining debugging information... > Examining task ID: task_1546504603780_0492_m_000000 (and more) from job > job_1546504603780_0492 > Task with the most failures(4): > ----- > Task ID: > task_1546504603780_0492_r_000000 > URL: > > http://0.0.0.0:8088/taskdetails.jsp?jobid=job_1546504603780_0492&tipid=task_1546504603780_0492_r_000000 > ----- > Diagnostic Messages for this Task: > Error: java.lang.RuntimeException: Error in configuring object > at > org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) > at > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) > at > org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) > at > org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:409) > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) > at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) > at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106) > ... 9 more > Caused by: java.lang.RuntimeException: Reduce operator initialization failed > at > org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:166) > ... 14 more > Caused by: java.lang.NullPointerException > at > org.apache.hadoop.hive.ql.exec.Registry.getFunctionInfo(Registry.java:306) > at > org.apache.hadoop.hive.ql.exec.Registry.getWindowFunctionInfo(Registry.java:314) > at > org.apache.hadoop.hive.ql.exec.FunctionRegistry.getWindowFunctionInfo(FunctionRegistry.java:504) > at > org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.streamingPossible(WindowingTableFunction.java:151) > at > org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.setCanAcceptInputAsStream(WindowingTableFunction.java:222) > at > org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.initializeStreaming(WindowingTableFunction.java:256) > at > org.apache.hadoop.hive.ql.exec.PTFOperator$PTFInvocation.initializeStreaming(PTFOperator.java:291) > at > org.apache.hadoop.hive.ql.exec.PTFOperator.initializeOp(PTFOperator.java:86) > at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) > at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469) > at > org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425) > at > org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65) > at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) > at > org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:159) > ... 14 more > FAILED: Execution Error, return code 2 from > org.apache.hadoop.hive.ql.exec.mr.MapRedTask > MapReduce Jobs Launched: > Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 1.38 sec HDFS Read: 2958 > HDFS Write: 0 FAIL > Total MapReduce CPU Time Spent: 1 seconds 380 msec > hive> > {code} > I guess it's FunctionRegistry problem. > I am a beginner. I hope someone can tell me the correct way to realize this > special function. Thank you very much. I use Hive 1.1.0 + cdh5.10.2 + 945. > {code:java} > <repositories> > <repository> > <id>cloudera</id> > <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> > </repository> > </repositories> > <dependencies> > <dependency> > <groupId>org.apache.hive</groupId> > <artifactId>hive-exec</artifactId> > <version>1.1.0-cdh5.10.2</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>junit</groupId> > <artifactId>junit</artifactId> > <version>4.9</version> > <scope>test</scope> > </dependency> > </dependencies> > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)