Dear all, we rebased our temporal normalization patch on top of 554ebf687852d045f0418d3242b978b49f160f44 from 2019-02-28.
On 9/7/18 1:02 PM, Peter Moser wrote: > The syntax is > SELECT * FROM (r NORMALIZE s USING() WITH(period_r, period_s)) c; Please find all information about our decisions and current state within the previous email. > What we like to discuss now is: > - Is sort_inner_and_outer the correct place to perform this split? > - How could we support OID_RANGE_ELEM_CONTAINED_OP for a NORMALIZE > mergejoin executor? If we use RANGE_ELEM_CONTAINED as operator, it is > not an equality operator, but if we use RANGE_EQ it assumes that the > right-hand-side of the operator must be a range as well. > - Should we better change our mergeclause to a RANGE_ELEM_CONTAINED > comparison, or keep RANGE_EQ and fix pathkeys later? > - How do we update equivalence classes, and pathkeys > when changing the inner relation's data type from "int4range" to "int" > in the query tree inside "sort_inner_and_outer" to get the correct > ordering and data types I will also add this prototype (WIP) patch to the commitfest of March, as suggested by two developers met at the FOSDEM some weeks ago. Best regards, Anton, Johann, Michael, Peter
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c index 2a1d000b03..a309596fa1 100644 --- a/src/backend/executor/nodeMergejoin.c +++ b/src/backend/executor/nodeMergejoin.c @@ -99,6 +99,106 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" +// XXX TEMPORAL NORMALIZE PEMOSER ---------------------------- +// !!! THis is just for prototyping, delete asap... + +#include "catalog/pg_operator.h" +#include "nodes/nodeFuncs.h" +#include "utils/fmgroids.h" +#include "utils/rangetypes.h" +#include "utils/typcache.h" +#include "access/htup_details.h" /* for heap_getattr */ +#include "nodes/print.h" /* for print_slot */ +#include "utils/datum.h" /* for datumCopy */ + + + +#define TEMPORAL_DEBUG +/* + * #define TEMPORAL_DEBUG + * XXX PEMOSER Maybe we should use execdebug.h stuff here? + */ +#ifdef TEMPORAL_DEBUG +static char* +datumToString(Oid typeinfo, Datum attr) +{ + Oid typoutput; + bool typisvarlena; + getTypeOutputInfo(typeinfo, &typoutput, &typisvarlena); + return OidOutputFunctionCall(typoutput, attr); +} + +#define TPGdebug(...) { printf(__VA_ARGS__); printf("\n"); fflush(stdout); } +#define TPGdebugDatum(attr, typeinfo) TPGdebug("%s = %s %ld\n", #attr, datumToString(typeinfo, attr), attr) +#define TPGdebugSlot(slot) { printf("Printing Slot '%s'\n", #slot); print_slot(slot); fflush(stdout); } + +#else +#define datumToString(typeinfo, attr) +#define TPGdebug(...) +#define TPGdebugDatum(attr, typeinfo) +#define TPGdebugSlot(slot) +#endif + +TypeCacheEntry *testmytypcache; +#define setSweepline(datum) \ + node->sweepline = datumCopy(datum, node->datumFormat->attbyval, node->datumFormat->attlen) + +#define freeSweepline() \ + if (! node->datumFormat->attbyval) pfree(DatumGetPointer(node->sweepline)) + + /* + * slotGetAttrNotNull + * Same as slot_getattr, but throws an error if NULL is returned. + */ +static Datum +slotGetAttrNotNull(TupleTableSlot *slot, int attnum) +{ + bool isNull; + Datum result; + + result = slot_getattr(slot, attnum, &isNull); + + if(isNull) + ereport(ERROR, + (errcode(ERRCODE_NOT_NULL_VIOLATION), + errmsg("Attribute \"%s\" at position %d is null. Temporal " \ + "adjustment not possible.", + NameStr(TupleDescAttr(slot->tts_tupleDescriptor, attnum - 1)->attname), + attnum))); + + return result; +} + +/* + * heapGetAttrNotNull + * Same as heap_getattr, but throws an error if NULL is returned. + */ +static Datum +heapGetAttrNotNull(TupleTableSlot *slot, int attnum) +{ + bool isNull; + Datum result; + HeapTuple tuple; + + tuple = ExecFetchSlotHeapTuple(slot, true, NULL); + result = heap_getattr(tuple, + attnum, + slot->tts_tupleDescriptor, + &isNull); + if(isNull) + ereport(ERROR, + (errcode(ERRCODE_NOT_NULL_VIOLATION), + errmsg("Attribute \"%s\" at position %d is null. Temporal " \ + "adjustment not possible.", + NameStr(TupleDescAttr(slot->tts_tupleDescriptor, + attnum - 1)->attname), + attnum))); + + return result; +} + +// XXX TEMPORAL NORMALIZE PEMOSER END ------------------------ + /* * States of the ExecMergeJoin state machine @@ -138,6 +238,10 @@ typedef struct MergeJoinClauseData * stored here. */ SortSupportData ssup; + + /* needed for Temporal Normalization */ + bool isnormalize; + TypeCacheEntry *range_typcache; } MergeJoinClauseData; /* Result type for MJEvalOuterValues and MJEvalInnerValues */ @@ -152,6 +256,59 @@ typedef enum #define MarkInnerTuple(innerTupleSlot, mergestate) \ ExecCopySlot((mergestate)->mj_MarkedTupleSlot, (innerTupleSlot)) +/* + * temporalAdjustmentStoreTuple + * While we store result tuples, we must add the newly calculated temporal + * boundaries as two scalar fields or create a single range-typed field + * with the two given boundaries. + */ +static void +temporalAdjustmentStoreTuple(MergeJoinState *mergestate, + TupleTableSlot* slotToModify, + TupleTableSlot* slotToStoreIn, + Datum ts, + Datum te, + TypeCacheEntry *typcache) +{ + MemoryContext oldContext; + HeapTuple t; + RangeBound lower; + RangeBound upper; + bool empty = false; + HeapTuple tuple; + + /* + * This should ideally be done with RangeBound types on the right-hand-side + * created during range_split execution. Otherwise, we loose information about + * inclusive/exclusive bounds and infinity. We would need to implement btree + * operators for RangeBounds. + */ + lower.val = ts; + lower.lower = true; + lower.infinite = false; + lower.inclusive = true; + + upper.val = te; + upper.lower = false; + upper.infinite = false; + upper.inclusive = false; + + mergestate->newValues[0] = (Datum) make_range(typcache, &lower, &upper, empty); + + oldContext = MemoryContextSwitchTo(mergestate->js.ps.ps_ResultTupleSlot->tts_mcxt); + tuple = ExecFetchSlotHeapTuple(slotToModify, true, NULL); + t = heap_modify_tuple(tuple, + slotToModify->tts_tupleDescriptor, + mergestate->newValues, + mergestate->nullMask, + mergestate->tsteMask); + MemoryContextSwitchTo(oldContext); + ExecForceStoreHeapTuple(t, slotToStoreIn); + + TPGdebug("Storing tuple:"); + TPGdebugSlot(slotToStoreIn); +} + /* * MJExamineQuals @@ -201,6 +358,8 @@ MJExamineQuals(List *mergeclauses, Oid op_righttype; Oid sortfunc; + pprint(qual); + if (!IsA(qual, OpExpr)) elog(ERROR, "mergejoin clause is not an OpExpr"); @@ -221,12 +380,31 @@ MJExamineQuals(List *mergeclauses, elog(ERROR, "unsupported mergejoin strategy %d", opstrategy); clause->ssup.ssup_nulls_first = nulls_first; + if (qual->opno == OID_RANGE_EQ_OP) { + Oid rngtypid; + + // XXX PEMOSER Change opfamily and opfunc + qual->opfuncid = F_RANGE_CONTAINS; //<<--- opfuncid can be 0 during planning + qual->opno = OID_RANGE_CONTAINS_ELEM_OP; //OID_RANGE_CONTAINS_OP; + clause->isnormalize = true; + + // Attention: cannot merge using non-equality operator 3890 <--- OID_RANGE_CONTAINS_OP + opfamily = 4103; //range_inclusion_ops from pg_opfamily.h + + rngtypid = exprType((Node*)clause->lexpr->expr); + clause->range_typcache = lookup_type_cache(rngtypid, TYPECACHE_RANGE_INFO); + testmytypcache = clause->range_typcache; + } else { + clause->isnormalize = false; + } + + /* Extract the operator's declared left/right datatypes */ get_op_opfamily_properties(qual->opno, opfamily, false, &op_strategy, &op_lefttype, &op_righttype); - if (op_strategy != BTEqualStrategyNumber) /* should not happen */ + if (op_strategy != BTEqualStrategyNumber && !clause->isnormalize) /* should not happen */ elog(ERROR, "cannot merge using non-equality operator %u", qual->opno); @@ -249,7 +427,7 @@ MJExamineQuals(List *mergeclauses, /* The sort support function can provide a comparator */ OidFunctionCall1(sortfunc, PointerGetDatum(&clause->ssup)); } - if (clause->ssup.comparator == NULL) + if (clause->ssup.comparator == NULL && !clause->isnormalize) { /* support not available, get comparison func */ sortfunc = get_opfamily_proc(opfamily, @@ -269,6 +447,77 @@ MJExamineQuals(List *mergeclauses, return clauses; } +static Datum +getLower(Datum range, TypeCacheEntry *typcache) +{ + RangeBound lower; + RangeBound upper; + bool empty; + + range_deserialize(typcache, DatumGetRangeTypeP(range), &lower, &upper, &empty); + + // XXX This is just a prototype function, we do not check for emptiness nor infinity yet... + // We will use RangeBounds in the future directly... + return lower.val; +} + +static Datum +getUpper(Datum range, TypeCacheEntry *typcache) +{ + RangeBound lower; + RangeBound upper; + bool empty; + + range_deserialize(typcache, DatumGetRangeTypeP(range), &lower, &upper, &empty); + + // XXX This is just a prototype function, we do not check for emptiness nor infinity yet... + // We will use RangeBounds in the future directly... + return upper.val; +} + +/* + * Return 0 if point is inside range, <0 if the point is right-of the second, or + * >0 if the point is left-of the range. + * + * This should ideally be done with RangeBound types on the right-hand-side + * created during range_split execution. Otherwise, we loose information about + * inclusive/exclusive bounds and infinity. + */ +static int +ApplyNormalizeMatch(Datum ldatum, bool lisnull, Datum rdatum, bool risnull, + SortSupport ssup, TypeCacheEntry *typcache) +{ + RangeBound lower; + RangeBound upper; + bool empty; + int32 result; + + /* can't handle reverse sort order; should be prevented by optimizer */ + Assert(!ssup->ssup_reverse); + Assert(!lisnull || !risnull); + + if (lisnull) + return ssup->ssup_nulls_first ? -1 : 1; + if (risnull) + return ssup->ssup_nulls_first ? 1 : -1; + + range_deserialize(typcache, DatumGetRangeTypeP(ldatum), &lower, &upper, &empty); + + result = DatumGetInt32(FunctionCall2Coll(&typcache->rng_cmp_proc_finfo, + typcache->rng_collation, + lower.val, rdatum)); + if (result == 1) + return 1; + + result = DatumGetInt32(FunctionCall2Coll(&typcache->rng_cmp_proc_finfo, + typcache->rng_collation, + upper.val, rdatum)); + if (result == 1) + return 0; + + return -1; +} + /* * MJEvalOuterValues * @@ -418,9 +667,19 @@ MJCompare(MergeJoinState *mergestate) continue; } - result = ApplySortComparator(clause->ldatum, clause->lisnull, - clause->rdatum, clause->risnull, - &clause->ssup); + if (clause->isnormalize) + { + result = ApplyNormalizeMatch(clause->ldatum, clause->lisnull, + clause->rdatum, clause->risnull, + &clause->ssup, clause->range_typcache); + } + else + { + result = ApplySortComparator(clause->ldatum, clause->lisnull, + clause->rdatum, clause->risnull, + &clause->ssup); + } + if (result != 0) break; @@ -611,6 +870,7 @@ ExecMergeJoin(PlanState *pstate) ExprContext *econtext; bool doFillOuter; bool doFillInner; + TupleTableSlot *out = NULL; CHECK_FOR_INTERRUPTS(); @@ -656,6 +916,13 @@ ExecMergeJoin(PlanState *pstate) outerTupleSlot = ExecProcNode(outerPlan); node->mj_OuterTupleSlot = outerTupleSlot; + /* XXX normalize (first call) */ + if (node->mj_isNormalizer) + { + node->sameleft = true; + ExecCopySlot(node->prev, outerTupleSlot); + } + /* Compute join values and check for unmatchability */ switch (MJEvalOuterValues(node)) { @@ -704,6 +971,18 @@ ExecMergeJoin(PlanState *pstate) innerTupleSlot = ExecProcNode(innerPlan); node->mj_InnerTupleSlot = innerTupleSlot; + /* + * P1 is made of the lower or upper bounds of the valid time column, + * hence it must have the same type as the range (return element type) + * of lower(T) or upper(T). + */ + if (node->mj_isNormalizer) + { + node->datumFormat = TupleDescAttr(innerTupleSlot->tts_tupleDescriptor, 0); + setSweepline(getLower(slotGetAttrNotNull(outerTupleSlot, 1), testmytypcache)); + TPGdebugDatum(node->sweepline, node->datumFormat->atttypid); + } + /* Compute join values and check for unmatchability */ switch (MJEvalInnerValues(node, innerTupleSlot)) { @@ -789,6 +1068,10 @@ ExecMergeJoin(PlanState *pstate) innerTupleSlot = node->mj_InnerTupleSlot; econtext->ecxt_innertuple = innerTupleSlot; + TPGdebugSlot(outerTupleSlot); + TPGdebugSlot(node->prev); + TPGdebug("sameleft = %d", node->sameleft); + qualResult = (joinqual == NULL || ExecQual(joinqual, econtext)); MJ_DEBUG_QUAL(joinqual, qualResult); @@ -819,13 +1102,56 @@ ExecMergeJoin(PlanState *pstate) if (qualResult) { + TupleTableSlot *out; + bool isNull; + Datum currP1; + /* * qualification succeeded. now form the desired * projection tuple and return the slot containing it. */ MJ_printf("ExecMergeJoin: returning tuple\n"); - return ExecProject(node->js.ps.ps_ProjInfo); + out = ExecProject(node->js.ps.ps_ProjInfo); + + if (!node->mj_isNormalizer) + return out; + + if (node->sameleft) + { + currP1 = slot_getattr(innerTupleSlot, 1, &isNull); + TPGdebugDatum(currP1, node->datumFormat->atttypid); + if (node->sweepline < currP1) + { + temporalAdjustmentStoreTuple(node, outerTupleSlot, out, node->sweepline, currP1, testmytypcache); + freeSweepline(); + setSweepline(currP1); + + TPGdebugDatum(node->sweepline, node->datumFormat->atttypid); + TPGdebugSlot(out); + + return out; + } + + ExecCopySlot(node->prev, outerTupleSlot); + node->mj_JoinState = EXEC_MJ_NEXTINNER; + } + else /* not node->sameleft */ + { + Datum prevTe = getUpper(heapGetAttrNotNull(node->prev, 1), testmytypcache); + + if (node->sweepline < prevTe) + temporalAdjustmentStoreTuple(node, node->prev, out, node->sweepline, prevTe, testmytypcache); + + ExecCopySlot(node->prev, outerTupleSlot); + freeSweepline(); + setSweepline(getLower(slotGetAttrNotNull(outerTupleSlot, 1), testmytypcache)); + TPGdebugDatum(node->sweepline, node->datumFormat->atttypid); + node->sameleft = true; + node->mj_JoinState = EXEC_MJ_NEXTINNER; + TPGdebugSlot(out); + return out; + } } else InstrCountFiltered2(node, 1); @@ -845,6 +1171,9 @@ ExecMergeJoin(PlanState *pstate) case EXEC_MJ_NEXTINNER: MJ_printf("ExecMergeJoin: EXEC_MJ_NEXTINNER\n"); + if (node->mj_isNormalizer) + node->sameleft = true; + if (doFillInner && !node->mj_MatchedInner) { /* @@ -947,6 +1276,9 @@ ExecMergeJoin(PlanState *pstate) case EXEC_MJ_NEXTOUTER: MJ_printf("ExecMergeJoin: EXEC_MJ_NEXTOUTER\n"); + if (node->mj_isNormalizer) + node->sameleft = false; + if (doFillOuter && !node->mj_MatchedOuter) { /* @@ -962,6 +1294,11 @@ ExecMergeJoin(PlanState *pstate) return result; } + // FIXME PEMOSER Only for normalizer... + TupleTableSlot *out = NULL; + if (node->mj_isNormalizer && !TupIsNull(innerTupleSlot)) + out = ExecProject(node->js.ps.ps_ProjInfo); + /* * now we get the next outer tuple, if any */ @@ -994,6 +1331,19 @@ ExecMergeJoin(PlanState *pstate) node->mj_JoinState = EXEC_MJ_ENDOUTER; break; } + + if (node->mj_isNormalizer && !TupIsNull(node->prev) && !TupIsNull(innerTupleSlot)) + { + MJ_printf("finalize normalizer!!!\n"); + Datum prevTe = getUpper(heapGetAttrNotNull(node->prev, 1), testmytypcache); + TPGdebugDatum(prevTe, node->datumFormat->atttypid); + TPGdebugDatum(node->sweepline, node->datumFormat->atttypid); + MJ_debugtup(node->prev); + temporalAdjustmentStoreTuple(node, node->prev, out, node->sweepline, prevTe, testmytypcache); + node->mj_JoinState = EXEC_MJ_ENDOUTER; + return out; + } + /* Otherwise we're done. */ return NULL; } @@ -1048,7 +1398,7 @@ ExecMergeJoin(PlanState *pstate) compareResult = MJCompare(node); MJ_DEBUG_COMPARE(compareResult); - if (compareResult == 0) + if (compareResult == 0 || (node->mj_isNormalizer && node->mj_markSet)) { /* * the merge clause matched so now we restore the inner @@ -1085,7 +1435,10 @@ ExecMergeJoin(PlanState *pstate) /* we need not do MJEvalInnerValues again */ } - node->mj_JoinState = EXEC_MJ_JOINTUPLES; + if (node->mj_isNormalizer) + node->mj_JoinState = EXEC_MJ_SKIP_TEST; + else + node->mj_JoinState = EXEC_MJ_JOINTUPLES; } else { @@ -1190,6 +1543,7 @@ ExecMergeJoin(PlanState *pstate) MarkInnerTuple(node->mj_InnerTupleSlot, node); node->mj_JoinState = EXEC_MJ_JOINTUPLES; + node->mj_markSet = true; } else if (compareResult < 0) node->mj_JoinState = EXEC_MJ_SKIPOUTER_ADVANCE; @@ -1338,6 +1692,9 @@ ExecMergeJoin(PlanState *pstate) case EXEC_MJ_ENDOUTER: MJ_printf("ExecMergeJoin: EXEC_MJ_ENDOUTER\n"); + if (node->mj_isNormalizer) + return NULL; + Assert(doFillInner); if (!node->mj_MatchedInner) @@ -1439,6 +1796,9 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags) TupleDesc outerDesc, innerDesc; const TupleTableSlotOps *innerOps; + const TupleTableSlotOps *prevOps; + int numCols = list_length(node->join.plan.targetlist); + /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -1532,12 +1892,16 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags) ExecInitQual(node->join.joinqual, (PlanState *) mergestate); /* mergeclauses are handled below */ + prevOps = ExecGetResultSlotOps(outerPlanState(mergestate), NULL); + mergestate->prev = ExecInitExtraTupleSlot(estate, outerDesc, prevOps); + /* * detect whether we need only consider the first matching inner tuple */ mergestate->js.single_match = (node->join.inner_unique || node->join.jointype == JOIN_SEMI); + mergestate->mj_isNormalizer = false; /* set up null tuples for outer joins, if needed */ switch (node->join.jointype) { @@ -1587,6 +1951,20 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("FULL JOIN is only supported with merge-joinable join conditions"))); break; + case JOIN_TEMPORAL_NORMALIZE: + mergestate->mj_FillOuter = false; + mergestate->mj_FillInner = false; + mergestate->mj_isNormalizer = true; + + /* Init buffer values for heap_modify_tuple */ + mergestate->newValues = palloc0(sizeof(Datum) * numCols); + mergestate->nullMask = palloc0(sizeof(bool) * numCols); + mergestate->tsteMask = palloc0(sizeof(bool) * numCols); + + /* Not right??? -> Always the last in the list, since we add it during planning phase + * XXX PEMOSER We need to find the correct position of "period" and set that here */ + mergestate->tsteMask[/*numCols - 1*/0] = true; + break; default: elog(ERROR, "unrecognized join type: %d", (int) node->join.jointype); diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c index 92855278ad..5f95f7050d 100644 --- a/src/backend/executor/nodeSort.c +++ b/src/backend/executor/nodeSort.c @@ -86,6 +86,12 @@ ExecSort(PlanState *pstate) outerNode = outerPlanState(node); tupDesc = ExecGetResultType(outerNode); + // XXX PEMOSER Manually fix sort operation of second attribute (former time, now upper(time)) + // We must fix that in general... this is just a proof of concept brute-force solution! + if (plannode->plan.lefttree->type == T_ProjectSet) { + plannode->sortOperators[0] = 97; // 97 means "<" for int4, it was "<" for int4range + } + tuplesortstate = tuplesort_begin_heap(tupDesc, plannode->numCols, plannode->sortColIdx, diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 4b9be13f08..76518c886e 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -4596,6 +4596,7 @@ calc_joinrel_size_estimate(PlannerInfo *root, switch (jointype) { case JOIN_INNER: + case JOIN_TEMPORAL_NORMALIZE: nrows = outer_rows * inner_rows * fkselec * jselec; /* pselec not used */ break; diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index d8ff4bf432..5217c25fc2 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -22,6 +22,11 @@ #include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/planmain.h" +#include "catalog/pg_operator.h" +#include "optimizer/tlist.h" +#include "utils/fmgroids.h" +#include "nodes/makefuncs.h" +#include "utils/lsyscache.h" /* Hook for plugins to get control in add_paths_to_joinrel() */ set_join_pathlist_hook_type set_join_pathlist_hook = NULL; @@ -195,7 +200,7 @@ add_paths_to_joinrel(PlannerInfo *root, * way of implementing a full outer join, so override enable_mergejoin if * it's a full join. */ - if (enable_mergejoin || jointype == JOIN_FULL) + if (enable_mergejoin || jointype == JOIN_FULL || jointype == JOIN_TEMPORAL_NORMALIZE) extra.mergeclause_list = select_mergejoin_clauses(root, joinrel, outerrel, @@ -937,6 +942,80 @@ sort_inner_and_outer(PlannerInfo *root, Assert(inner_path); jointype = JOIN_INNER; } + else if (jointype == JOIN_TEMPORAL_NORMALIZE) + { + /* + * outer_path is just sort; inner_path is append of (B, ts) projection with (B, te) projection + */ + List *exprs = NIL; // to collect inner relation's targets + FuncExpr *f_split; + Var *innervar; + + foreach(l, extra->mergeclause_list) + { + RestrictInfo *rinfo = (RestrictInfo *) lfirst(l); + Expr *clause = (Expr *) rinfo->clause; + + if (IsA(clause, OpExpr)) + { + OpExpr *opexpr = (OpExpr *) clause; + if (opexpr->opno == OID_RANGE_EQ_OP) + { + if (IsA(lsecond(opexpr->args), Var)) { + + // lsecond because it is from the second relation (=inner) + innervar = lsecond(opexpr->args); + + f_split = makeFuncExpr(F_RANGE_SPLIT, 23, list_make1(innervar), 0, 0, 0); + f_split->funcretset = true; + + /* + * OUTER_VAR cannot be used here, because path creation does not know about it, + * it will be introduced in plan creation. + */ + innervar = makeVar(2, innervar->varattno, f_split->funcresulttype, -1, 0, 0); + } + } + else + { + // lsecond because it is from the second relation (=inner) + exprs = lappend(exprs, lsecond(opexpr->args)); + } + } + } + + RestrictInfo *rinfo = (RestrictInfo *) linitial(extra->mergeclause_list); + OpExpr *opexpr = (OpExpr *) rinfo->clause; + lsecond(opexpr->args) = f_split; + rinfo->right_em->em_expr = f_split; + rinfo->mergeopfamilies = get_mergejoin_opfamilies(opexpr->opno); + + PathTarget *target_split = makeNode(PathTarget); + target_split->exprs = lappend(exprs, f_split); + + set_pathtarget_cost_width(root, target_split); + + inner_path = (Path *) create_set_projection_path(root, innerrel, inner_path, target_split); + innerrel->reltarget->exprs = inner_path->pathtarget->exprs;//list_make1(innervar);//copyObject(inner_path->pathtarget->exprs); + joinrel->reltarget->exprs = list_concat(copyObject(outerrel->reltarget->exprs), innerrel->reltarget->exprs); + set_pathtarget_cost_width(root, joinrel->reltarget); + + innerrel->cheapest_total_path = inner_path; + innerrel->cheapest_startup_path = inner_path; + innerrel->cheapest_parameterized_paths = inner_path; + innerrel->pathlist = list_make1(inner_path); + + extra->sjinfo->semi_rhs_exprs = list_make1(f_split); + extra->sjinfo->semi_operators = NIL; + extra->sjinfo->semi_operators = lappend_oid(extra->sjinfo->semi_operators, 96); + + Assert(inner_path); + + innerrel->cheapest_total_path = inner_path; + innerrel->cheapest_startup_path = inner_path; + innerrel->cheapest_parameterized_paths = inner_path; + } + /* * If the joinrel is parallel-safe, we may be able to consider a partial @@ -1028,6 +1107,16 @@ sort_inner_and_outer(PlannerInfo *root, merge_pathkeys = build_join_pathkeys(root, joinrel, jointype, outerkeys); + if (jointype == JOIN_TEMPORAL_NORMALIZE) + { + inner_path = (Path *) create_sort_path(root, innerrel, inner_path, innerkeys, -1); + innerrel->cheapest_total_path = inner_path; + innerrel->cheapest_startup_path = inner_path; + innerrel->cheapest_parameterized_paths = inner_path; + innerrel->pathlist = list_make1(inner_path); + Assert(inner_path); + } + /* * And now we can make the path. * @@ -1360,6 +1449,7 @@ match_unsorted_outer(PlannerInfo *root, break; case JOIN_RIGHT: case JOIN_FULL: + case JOIN_TEMPORAL_NORMALIZE: nestjoinOK = false; useallclauses = true; break; @@ -1686,6 +1776,10 @@ hash_inner_and_outer(PlannerInfo *root, List *hashclauses; ListCell *l; + /* Hashjoin is not allowed for temporal NORMALIZE */ + if (jointype == JOIN_TEMPORAL_NORMALIZE) + return; + /* * We need to build only one hashclauses list for any given pair of outer * and inner relations; all of the hashable clauses will be used as keys. diff --git a/src/backend/optimizer/path/joinrels.c b/src/backend/optimizer/path/joinrels.c index dfbbfdac6d..5b2354cae5 100644 --- a/src/backend/optimizer/path/joinrels.c +++ b/src/backend/optimizer/path/joinrels.c @@ -904,6 +904,21 @@ populate_joinrel_with_paths(PlannerInfo *root, RelOptInfo *rel1, JOIN_ANTI, sjinfo, restrictlist); break; + case JOIN_TEMPORAL_NORMALIZE: + if (is_dummy_rel(rel1) || is_dummy_rel(rel2) || + restriction_is_constant_false(restrictlist, joinrel, false)) + { + mark_dummy_rel(joinrel); + break; + } + + /* + * Temporal normalization does not support re-ordering of rels + */ + add_paths_to_joinrel(root, joinrel, rel1, rel2, + JOIN_TEMPORAL_NORMALIZE, sjinfo, + restrictlist); + break; default: /* other values not expected here */ elog(ERROR, "unrecognized join type: %d", (int) sjinfo->jointype); diff --git a/src/backend/optimizer/path/pathkeys.c b/src/backend/optimizer/path/pathkeys.c index 56d839bb31..4e74c3439e 100644 --- a/src/backend/optimizer/path/pathkeys.c +++ b/src/backend/optimizer/path/pathkeys.c @@ -832,7 +832,12 @@ build_join_pathkeys(PlannerInfo *root, JoinType jointype, List *outer_pathkeys) { - if (jointype == JOIN_FULL || jointype == JOIN_RIGHT) + /* + * TEMPORAL NORMALIZE: To improve this, we would need to remove only + * temporal range types from the path key list, not all + */ + if (jointype == JOIN_FULL || jointype == JOIN_RIGHT + || jointype == JOIN_TEMPORAL_NORMALIZE) return NIL; /* diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 236f506cfb..4de92f0b53 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -4217,6 +4217,15 @@ create_mergejoin_plan(PlannerInfo *root, /* Costs of sort and material steps are included in path cost already */ copy_generic_path_info(&join_plan->join.plan, &best_path->jpath.path); + /* + * XXX PEMOSER NORMALIZE needs a result node above to properly + * handle target lists, functions and constants + * Maybe we need to refine this like in create_unique_plan: + * "If the top plan node can't do projections..." + */ + if (best_path->jpath.jointype == JOIN_TEMPORAL_NORMALIZE) + join_plan = make_result(tlist, NULL, join_plan); + return join_plan; } diff --git a/src/backend/optimizer/plan/initsplan.c b/src/backend/optimizer/plan/initsplan.c index 2afc3f1dfe..16cfd73f71 100644 --- a/src/backend/optimizer/plan/initsplan.c +++ b/src/backend/optimizer/plan/initsplan.c @@ -32,6 +32,7 @@ #include "parser/analyze.h" #include "rewrite/rewriteManip.h" #include "utils/lsyscache.h" +#include "catalog/pg_operator.h" /* These parameters are set by GUC */ @@ -888,6 +889,7 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join, switch (j->jointype) { case JOIN_INNER: + case JOIN_TEMPORAL_NORMALIZE: leftjoinlist = deconstruct_recurse(root, j->larg, below_outer_join, &leftids, &left_inners, @@ -1010,7 +1012,7 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join, *inner_join_rels, j->jointype, my_quals); - if (j->jointype == JOIN_SEMI) + if (j->jointype == JOIN_SEMI || j->jointype == JOIN_TEMPORAL_NORMALIZE) ojscope = NULL; else ojscope = bms_union(sjinfo->min_lefthand, @@ -1429,7 +1431,8 @@ compute_semijoin_info(SpecialJoinInfo *sjinfo, List *clause) sjinfo->semi_rhs_exprs = NIL; /* Nothing more to do if it's not a semijoin */ - if (sjinfo->jointype != JOIN_SEMI) + if (sjinfo->jointype != JOIN_SEMI + && sjinfo->jointype != JOIN_TEMPORAL_NORMALIZE) return; /* @@ -2613,6 +2616,10 @@ check_mergejoinable(RestrictInfo *restrictinfo) opno = ((OpExpr *) clause)->opno; leftarg = linitial(((OpExpr *) clause)->args); + // XXX PEMOSER Hardcoded NORMALIZE detection... change this. Read the note below... + if (opno == OID_RANGE_EQ_OP) + restrictinfo->temp_normalizer = true; + if (op_mergejoinable(opno, exprType(leftarg)) && !contain_volatile_functions((Node *) clause)) restrictinfo->mergeopfamilies = get_mergejoin_opfamilies(opno); diff --git a/src/backend/optimizer/prep/prepjointree.c b/src/backend/optimizer/prep/prepjointree.c index aebe162713..bba27c017a 100644 --- a/src/backend/optimizer/prep/prepjointree.c +++ b/src/backend/optimizer/prep/prepjointree.c @@ -780,6 +780,7 @@ pull_up_subqueries_recurse(PlannerInfo *root, Node *jtnode, switch (j->jointype) { case JOIN_INNER: + case JOIN_TEMPORAL_NORMALIZE: j->larg = pull_up_subqueries_recurse(root, j->larg, lowest_outer_join, lowest_nulling_outer_join, diff --git a/src/backend/optimizer/util/restrictinfo.c b/src/backend/optimizer/util/restrictinfo.c index 03e5f12d0d..090ca72271 100644 --- a/src/backend/optimizer/util/restrictinfo.c +++ b/src/backend/optimizer/util/restrictinfo.c @@ -188,6 +188,7 @@ make_restrictinfo_internal(Expr *clause, restrictinfo->outer_selec = -1; restrictinfo->mergeopfamilies = NIL; + restrictinfo->temp_normalizer = false; restrictinfo->left_ec = NULL; restrictinfo->right_ec = NULL; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 0279013120..6f2e54db59 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -432,6 +432,10 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> join_outer join_qual %type <jtype> join_type +%type <node> normalizer_qual +%type <jexpr> normalized_table +%type <list> temporal_bounds temporal_bounds_list + %type <list> extract_list overlay_list position_list %type <list> substr_list trim_list %type <list> opt_interval interval_second @@ -653,7 +657,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE - NAME_P NAMES NATIONAL NATURAL NCHAR NEW NEXT NO NONE + NAME_P NAMES NATIONAL NATURAL NCHAR NEW NEXT NO NONE NORMALIZE NOT NOTHING NOTIFY NOTNULL NOWAIT NULL_P NULLIF NULLS_P NUMERIC @@ -11866,6 +11870,11 @@ table_ref: relation_expr opt_alias_clause $2->alias = $4; $$ = (Node *) $2; } + | '(' normalized_table ')' alias_clause + { + $2->alias = $4; + $$ = (Node *) $2; + } ; @@ -11984,6 +11993,59 @@ opt_alias_clause: alias_clause { $$ = $1; } | /*EMPTY*/ { $$ = NULL; } ; +/* + * Temporal alignment statements + */ +temporal_bounds: WITH '(' temporal_bounds_list ')' { $$ = $3; } + ; + +temporal_bounds_list: + columnref + { + $$ = list_make1($1); + } + | temporal_bounds_list ',' columnref + { + $$ = lappend($1, $3); + } + ; + +normalizer_qual: + USING '(' name_list ')' { $$ = (Node *) $3; } + | USING '(' ')' { $$ = (Node *) NIL; } + ; + +normalized_table: + table_ref NORMALIZE table_ref normalizer_qual temporal_bounds + { + JoinExpr *n = makeNode(JoinExpr); + n->jointype = JOIN_TEMPORAL_NORMALIZE; + n->isNatural = false; + n->larg = $1; + n->rarg = $3; + + n->usingClause = NIL; + + if ($4 != NULL && IsA($4, List)) + n->usingClause = (List *) $4; /* USING clause */ + + /* + * A list for our valid-time boundaries with two range typed + * values. + */ + if(list_length($5) == 2) + n->temporalBounds = $5; + else + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("Temporal adjustment boundaries must " \ + "have two range typed values"), + parser_errposition(@5))); + + $$ = n; + } + ; + /* * func_alias_clause can include both an Alias and a coldeflist, so we make it * return a 2-element list that gets disassembled by calling production. @@ -15396,6 +15458,7 @@ reserved_keyword: | LIMIT | LOCALTIME | LOCALTIMESTAMP + | NORMALIZE | NOT | NULL_P | OFFSET diff --git a/src/backend/parser/parse_clause.c b/src/backend/parser/parse_clause.c index c6ce1011e2..714a88bfa0 100644 --- a/src/backend/parser/parse_clause.c +++ b/src/backend/parser/parse_clause.c @@ -61,7 +61,7 @@ static void extractRemainingColumns(List *common_colnames, List **res_colnames, List **res_colvars); static Node *transformJoinUsingClause(ParseState *pstate, RangeTblEntry *leftRTE, RangeTblEntry *rightRTE, - List *leftVars, List *rightVars); + List *leftVars, List *rightVars, List *normalizeVars); static Node *transformJoinOnClause(ParseState *pstate, JoinExpr *j, List *namespace); static RangeTblEntry *getRTEForSpecialRelationTypes(ParseState *pstate, @@ -299,7 +299,7 @@ extractRemainingColumns(List *common_colnames, static Node * transformJoinUsingClause(ParseState *pstate, RangeTblEntry *leftRTE, RangeTblEntry *rightRTE, - List *leftVars, List *rightVars) + List *leftVars, List *rightVars, List *normalizeVars) { Node *result; List *andargs = NIL; @@ -333,6 +333,17 @@ transformJoinUsingClause(ParseState *pstate, andargs = lappend(andargs, e); } + /* Temporal NORMALIZE appends an expression to compare temporal bounds */ + if (normalizeVars) + { + A_Expr *e; + e = makeSimpleA_Expr(AEXPR_OP, "=", + (Node *) copyObject(linitial(normalizeVars)), + (Node *) copyObject(lsecond(normalizeVars)), + -1); + andargs = lappend(andargs, e); + } + /* Only need an AND if there's more than one join column */ if (list_length(andargs) == 1) result = (Node *) linitial(andargs); @@ -1193,6 +1204,7 @@ transformFromClauseItem(ParseState *pstate, Node *n, int sv_namespace_length; RangeTblEntry *rte; int k; + int isNormalize = (j->jointype == JOIN_TEMPORAL_NORMALIZE); /* * Recursively process the left subtree, then the right. We must do @@ -1303,7 +1315,8 @@ transformFromClauseItem(ParseState *pstate, Node *n, res_colnames = NIL; res_colvars = NIL; - if (j->usingClause) + /* NORMALIZE supports empty using clauses */ + if (j->usingClause || (isNormalize && j->usingClause == NIL)) { /* * JOIN/USING (or NATURAL JOIN, as transformed above). Transform @@ -1313,6 +1326,7 @@ transformFromClauseItem(ParseState *pstate, Node *n, List *ucols = j->usingClause; List *l_usingvars = NIL; List *r_usingvars = NIL; + List *normalize_vars = NIL; ListCell *ucol; Assert(j->quals == NULL); /* shouldn't have ON() too */ @@ -1398,11 +1412,90 @@ transformFromClauseItem(ParseState *pstate, Node *n, r_colvar)); } + /* Only for temporal NORMALIZE */ + if (isNormalize) + { + int ndx = 0; + ListCell *col; + Var *l_boundvar; + Var *r_boundvar; + + int l_bound_index = -1; + int r_bound_index = -1; + char *l_bound; + char *r_bound; + ListCell *lboundcol = linitial(((ColumnRef *)linitial(j->temporalBounds))->fields); + ListCell *rboundcol = linitial(((ColumnRef *)lsecond(j->temporalBounds))->fields); + + l_bound = strVal(lboundcol); + r_bound = strVal(rboundcol); + + /* Find the first bound in left input */ + foreach(col, l_colnames) + { + char *l_colname = strVal(lfirst(col)); + + if (strcmp(l_colname, l_bound) == 0) + { + if (l_bound_index >= 0) + ereport(ERROR, + (errcode(ERRCODE_AMBIGUOUS_COLUMN), + errmsg("temporal bound name \"%s\" appears more than once in left table", + l_bound))); + l_bound_index = ndx; + } + ndx++; + } + + if (l_bound_index < 0) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" specified in normalizer's WITH clause does not exist in left table", + l_bound))); + + /* Find the second bound in right input */ + ndx = 0; + foreach(col, r_colnames) + { + char *r_colname = strVal(lfirst(col)); + + if (strcmp(r_colname, r_bound) == 0) + { + if (r_bound_index >= 0) + ereport(ERROR, + (errcode(ERRCODE_AMBIGUOUS_COLUMN), + errmsg("temporal bound name \"%s\" appears more than once in right table", + l_bound))); + r_bound_index = ndx; + } + ndx++; + } + + if (r_bound_index < 0) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" specified in normalizer's WITH clause does not exist in right table", + r_bound))); + + l_boundvar = list_nth(l_colvars, l_bound_index); + normalize_vars = lappend(normalize_vars, l_boundvar); + r_boundvar = list_nth(r_colvars, r_bound_index); + normalize_vars = lappend(normalize_vars, r_boundvar); + + res_colnames = lappend(res_colnames, lboundcol); + res_colvars = lappend(res_colvars, + buildMergedJoinVar(pstate, + j->jointype, + l_boundvar, + r_boundvar)); + } + j->quals = transformJoinUsingClause(pstate, l_rte, r_rte, l_usingvars, - r_usingvars); + r_usingvars, + normalize_vars); } else if (j->quals) { @@ -1418,13 +1511,21 @@ transformFromClauseItem(ParseState *pstate, Node *n, extractRemainingColumns(res_colnames, l_colnames, l_colvars, &l_colnames, &l_colvars); - extractRemainingColumns(res_colnames, - r_colnames, r_colvars, - &r_colnames, &r_colvars); + + //Temporal normalizers expose only outer relation's columns... + if (!isNormalize) + extractRemainingColumns(res_colnames, + r_colnames, r_colvars, + &r_colnames, &r_colvars); + res_colnames = list_concat(res_colnames, l_colnames); res_colvars = list_concat(res_colvars, l_colvars); - res_colnames = list_concat(res_colnames, r_colnames); - res_colvars = list_concat(res_colvars, r_colvars); + + if (!isNormalize) + { + res_colnames = list_concat(res_colnames, r_colnames); + res_colvars = list_concat(res_colvars, r_colvars); + } /* * Check alias (AS clause), if any. @@ -1567,6 +1668,7 @@ buildMergedJoinVar(ParseState *pstate, JoinType jointype, switch (jointype) { case JOIN_INNER: + case JOIN_TEMPORAL_NORMALIZE: /* * We can use either var; prefer non-coerced one if available. diff --git a/src/backend/utils/adt/rangetypes.c b/src/backend/utils/adt/rangetypes.c index c171c7db28..8e4ddfa473 100644 --- a/src/backend/utils/adt/rangetypes.c +++ b/src/backend/utils/adt/rangetypes.c @@ -40,6 +40,7 @@ #include "utils/lsyscache.h" #include "utils/rangetypes.h" #include "utils/timestamp.h" +#include "funcapi.h" #define RANGE_EMPTY_LITERAL "empty" @@ -466,6 +467,79 @@ range_upper(PG_FUNCTION_ARGS) PG_RETURN_DATUM(upper.val); } +/* split lower/upper bound into two rows of data */ +Datum +range_split(PG_FUNCTION_ARGS) +{ + typedef struct + { + RangeBound lower; + RangeBound upper; + bool empty; + } RangeSplitFuncContext; + + FuncCallContext *funcctx; + MemoryContext oldcontext; + RangeSplitFuncContext *fctx; + + /* stuff done only on the first call of the function */ + if (SRF_IS_FIRSTCALL()) + { + RangeType *r1; + TypeCacheEntry *typcache; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* + * switch to memory context appropriate for multiple function calls + */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + r1 = PG_GETARG_RANGE_P(0); + + /* allocate memory for user context */ + fctx = (RangeSplitFuncContext *) palloc(sizeof(RangeSplitFuncContext)); + + /* + * We cannot use range_get_typecache, because it would overwrite + * fcinfo->flinfo->fn_extra + */ + typcache = lookup_type_cache(RangeTypeGetOid(r1), TYPECACHE_RANGE_INFO); + if (typcache->rngelemtype == NULL) + elog(ERROR, "type %u is not a range type", RangeTypeGetOid(r1)); + range_deserialize(typcache, r1, &fctx->lower, &fctx->upper, &fctx->empty); + + funcctx->user_fctx = fctx; + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + fctx = (RangeSplitFuncContext *) funcctx->user_fctx; + + if (funcctx->call_cntr == 0) + { + /* Return NULL if there's no finite lower bound */ + if (fctx->empty || fctx->lower.infinite) + SRF_RETURN_NEXT_NULL(funcctx); + + SRF_RETURN_NEXT(funcctx, fctx->lower.val); + } + + if (funcctx->call_cntr == 1) + { + /* Return NULL if there's no finite upper bound */ + if (fctx->empty || fctx->upper.infinite) + SRF_RETURN_NEXT_NULL(funcctx); + + SRF_RETURN_NEXT(funcctx, fctx->upper.val); + } + + /* done, after extracting lower and upper bounds */ + SRF_RETURN_DONE(funcctx); +} + /* range -> bool functions */ diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c index e6837869cf..4d957d5ce2 100644 --- a/src/backend/utils/adt/selfuncs.c +++ b/src/backend/utils/adt/selfuncs.c @@ -1981,6 +1981,7 @@ eqjoinsel(PG_FUNCTION_ARGS) case JOIN_INNER: case JOIN_LEFT: case JOIN_FULL: + case JOIN_TEMPORAL_NORMALIZE: selec = selec_inner; break; case JOIN_SEMI: diff --git a/src/include/catalog/pg_operator.dat b/src/include/catalog/pg_operator.dat index 06aec0780b..333ba207d7 100644 --- a/src/include/catalog/pg_operator.dat +++ b/src/include/catalog/pg_operator.dat @@ -3060,7 +3060,7 @@ oprrest => 'scalargesel', oprjoin => 'scalargejoinsel' }, # generic range type operators -{ oid => '3882', descr => 'equal', +{ oid => '3882', descr => 'equal', oid_symbol => 'OID_RANGE_EQ_OP', oprname => '=', oprcanmerge => 't', oprcanhash => 't', oprleft => 'anyrange', oprright => 'anyrange', oprresult => 'bool', oprcom => '=(anyrange,anyrange)', oprnegate => '<>(anyrange,anyrange)', oprcode => 'range_eq', diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index a4e173b484..e65894c128 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -9427,6 +9427,10 @@ { oid => '3867', proname => 'range_union', prorettype => 'anyrange', proargtypes => 'anyrange anyrange', prosrc => 'range_union' }, +{ oid => '4001', + descr => 'lower and upper bound of range returned as two tuples', + proname => 'range_split', prorettype => 'anyelement', proretset => 't', + proargtypes => 'anyrange', prosrc => 'range_split' }, { oid => '4057', descr => 'the smallest range which includes both of the given ranges', proname => 'range_merge', prorettype => 'anyrange', diff --git a/src/include/executor/execdebug.h b/src/include/executor/execdebug.h index c119fdf4fa..1dd3b2d513 100644 --- a/src/include/executor/execdebug.h +++ b/src/include/executor/execdebug.h @@ -82,6 +82,7 @@ * sort node debugging defines * ---------------- */ +#define EXEC_SORTDEBUG #ifdef EXEC_SORTDEBUG #define SO_nodeDisplay(l) nodeDisplay(l) #define SO_printf(s) printf(s) @@ -96,14 +97,15 @@ * merge join debugging defines * ---------------- */ +#define EXEC_MERGEJOINDEBUG #ifdef EXEC_MERGEJOINDEBUG #define MJ_nodeDisplay(l) nodeDisplay(l) -#define MJ_printf(s) printf(s) -#define MJ1_printf(s, p) printf(s, p) -#define MJ2_printf(s, p1, p2) printf(s, p1, p2) -#define MJ_debugtup(slot) debugtup(slot, NULL) -#define MJ_dump(state) ExecMergeTupleDump(state) +#define MJ_printf(s) printf(s); fflush(stdout) +#define MJ1_printf(s, p) printf(s, p); fflush(stdout) +#define MJ2_printf(s, p1, p2) printf(s, p1, p2); fflush(stdout) +#define MJ_debugtup(slot) debugtup(slot, NULL); fflush(stdout) +#define MJ_dump(state) ExecMergeTupleDump(state); fflush(stdout) #define MJ_DEBUG_COMPARE(res) \ MJ1_printf(" MJCompare() returns %d\n", (res)) #define MJ_DEBUG_QUAL(clause, res) \ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 09f8217c80..52e16684e7 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1831,6 +1831,17 @@ typedef struct MergeJoinState TupleTableSlot *mj_NullInnerTupleSlot; ExprContext *mj_OuterEContext; ExprContext *mj_InnerEContext; + + /* needed by temporal normalization */ + bool mj_markSet; + bool mj_isNormalizer; + bool sameleft; + bool *nullMask; /* See heap_modify_tuple */ + bool *tsteMask; /* See heap_modify_tuple */ + Datum *newValues; /* tuple values that get updated */ + Datum sweepline; + Form_pg_attribute datumFormat; /* Datum format of sweepline, P1, P2 */ + TupleTableSlot *prev; } MergeJoinState; /* ---------------- diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index f9389257c6..d61f8d9951 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -84,6 +84,7 @@ typedef enum NodeTag T_SetOp, T_LockRows, T_Limit, + T_TemporalAdjustment, /* these aren't subclasses of Plan: */ T_NestLoopParam, T_PlanRowMark, @@ -140,6 +141,7 @@ typedef enum NodeTag T_SetOpState, T_LockRowsState, T_LimitState, + T_TemporalAdjustmentState, /* * TAGS FOR PRIMITIVE NODES (primnodes.h) @@ -256,6 +258,7 @@ typedef enum NodeTag T_LockRowsPath, T_ModifyTablePath, T_LimitPath, + T_TemporalAdjustmentPath, /* these aren't subclasses of Path: */ T_EquivalenceClass, T_EquivalenceMember, @@ -476,6 +479,7 @@ typedef enum NodeTag T_PartitionRangeDatum, T_PartitionCmd, T_VacuumRelation, + T_TemporalClause, /* * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) @@ -715,7 +719,12 @@ typedef enum JoinType * by the executor (nor, indeed, by most of the planner). */ JOIN_UNIQUE_OUTER, /* LHS path must be made unique */ - JOIN_UNIQUE_INNER /* RHS path must be made unique */ + JOIN_UNIQUE_INNER, /* RHS path must be made unique */ + + /* + * Temporal adjustment primitives + */ + JOIN_TEMPORAL_NORMALIZE /* * We might need additional join types someday. diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index a008ae07da..71254c5856 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -1981,6 +1981,7 @@ typedef struct RestrictInfo /* valid if clause is hashjoinable, else InvalidOid: */ Oid hashjoinoperator; /* copy of clause operator */ + bool temp_normalizer; /* cache space for hashclause processing; -1 if not yet set */ Selectivity left_bucketsize; /* avg bucketsize of left side */ diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index a7efae7038..0e2a668d18 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -508,6 +508,7 @@ typedef struct OpExpr Oid inputcollid; /* OID of collation that operator should use */ List *args; /* arguments to the operator (1 or 2) */ int location; /* token location, or -1 if unknown */ + bool isnormalize; } OpExpr; /* @@ -1480,6 +1481,8 @@ typedef struct JoinExpr Node *quals; /* qualifiers on join, if any */ Alias *alias; /* user-written alias clause, if any */ int rtindex; /* RT index assigned for join, or 0 */ + List *temporalBounds; /* columns that form bounds for both subtrees, + * used by temporal adjustment primitives */ } JoinExpr; /*---------- diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index f05444008c..ed6f944593 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -261,6 +261,7 @@ PG_KEYWORD("new", NEW, UNRESERVED_KEYWORD) PG_KEYWORD("next", NEXT, UNRESERVED_KEYWORD) PG_KEYWORD("no", NO, UNRESERVED_KEYWORD) PG_KEYWORD("none", NONE, COL_NAME_KEYWORD) +PG_KEYWORD("normalize", NORMALIZE, RESERVED_KEYWORD) PG_KEYWORD("not", NOT, RESERVED_KEYWORD) PG_KEYWORD("nothing", NOTHING, UNRESERVED_KEYWORD) PG_KEYWORD("notify", NOTIFY, UNRESERVED_KEYWORD)