Changeset: b3cb4b442795 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b3cb4b442795 Removed Files: monetdb5/optimizer/opt_mapreduce.c monetdb5/optimizer/opt_mapreduce.h Modified Files: monetdb5/optimizer/Makefile.ag monetdb5/optimizer/opt_support.c monetdb5/optimizer/opt_support.h monetdb5/optimizer/opt_wrapper.c monetdb5/optimizer/optimizer.mal Branch: default Log Message:
Drop experimental mapreduce optimizer It is based on old fashioned MAL code structures and will be superseeded by the merge/remote tables approach. diffs (truncated from 1093 to 300 lines): diff --git a/monetdb5/optimizer/Makefile.ag b/monetdb5/optimizer/Makefile.ag --- a/monetdb5/optimizer/Makefile.ag +++ b/monetdb5/optimizer/Makefile.ag @@ -48,7 +48,6 @@ lib_optimizer = { opt_inline.c opt_inline.h \ opt_joinpath.c opt_joinpath.h \ opt_macro.c opt_macro.h \ - opt_mapreduce.c opt_mapreduce.h \ opt_matpack.c opt_matpack.h \ opt_json.c opt_json.h \ opt_mergetable.c opt_mergetable.h \ diff --git a/monetdb5/optimizer/opt_mapreduce.c b/monetdb5/optimizer/opt_mapreduce.c deleted file mode 100644 --- a/monetdb5/optimizer/opt_mapreduce.c +++ /dev/null @@ -1,971 +0,0 @@ -/* - * The contents of this file are subject to the MonetDB Public License - * Version 1.1 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * http://www.monetdb.org/Legal/MonetDBLicense - * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the - * License for the specific language governing rights and limitations - * under the License. - * - * The Original Code is the MonetDB Database System. - * - * The Initial Developer of the Original Code is CWI. - * Portions created by CWI are Copyright (C) 1997-July 2008 CWI. - * Copyright August 2008-2015 MonetDB B.V. - * All Rights Reserved. - */ - -/* -@a F. Groffen, M. Kersten -@- Map-Reduce -The Map-Reduce infrastructure requires a little optimizer to turn -an arbitrary query into a plan to be executed on the systems in the cloud. -Each cloud consists of a series of named servers, managed by Merovingian -with the pattern "cloudname/node//". The cloudname is detected from -the schema in which an SQL table is stored. Only schemas starting with -"mr_" are considered to be mapreduce schemas on the query node. The -cloudname is the schema name without the leading "mr_" prefix. -# -Determining the clould is an expensive operation and for the time being -performed each time when a query is compiled. -# -In the first implementation we don't optimize the plan against the mapping scheme. -We simply assume that the complete query can be executed and that only the -result sets should be assembled. -*/ -#include "monetdb_config.h" -#include "opt_mapreduce.h" -#include "mal_interpreter.h" -#include "remote.h" - -typedef struct _mapnode { - str uri; - str user; - str pass; -} mapnode; - -static mapnode *mapnodes; - -static void -MRcleanCloud(void) -{ - int i; - - MT_lock_set(&mal_contextLock, "mapreduce"); - for (i = 0; mapnodes[i].uri; i++) { - if (mapnodes[i].uri != NULL) - GDKfree(mapnodes[i].uri); - if (mapnodes[i].user != NULL) - GDKfree(mapnodes[i].user); - if (mapnodes[i].pass != NULL) - GDKfree(mapnodes[i].pass); - mapnodes[i].uri = mapnodes[i].user = mapnodes[i].pass = 0; - } - MT_lock_unset(&mal_contextLock, "mapreduce"); -} - -str -MRgetCloud(bat *ret, str *mrcluster) -{ - str msg; - BAT *cloud; - BUN p, q; - BATiter bi; - char nodes[BUFSIZ]; - char *n = nodes; - int mapcount = 0; - - snprintf(nodes, sizeof(nodes), "*/%s/node/*", *mrcluster); - - if ((msg = RMTresolve(ret, &n)) != MAL_SUCCEED) - return msg; - - MT_lock_set(&mal_contextLock, "mapreduce"); - cloud = BATdescriptor(*ret); /* should succeed */ - if (cloud == NULL) - throw(MAL, "mapreduce.getCloud", RUNTIME_OBJECT_MISSING); - - mapnodes = (mapnode*)GDKzalloc(sizeof(mapnode) * (BATcount(cloud) + 1)); - if (mapnodes == NULL) { - BBPunfix(*ret); - throw(MAL, "mapreduce.getCloud", MAL_MALLOC_FAIL); - } - - bi = bat_iterator(cloud); - BATloop(cloud, p, q) { - str t = (str)BUNtail(bi, p); - mapnodes[mapcount].uri = GDKstrdup(t); - mapnodes[mapcount].user = GDKstrdup("monetdb"); - mapnodes[mapcount].pass = GDKstrdup("monetdb"); - mapcount++; - } - - BBPkeepref(*ret); /* we're done, keep for caller */ - cloud = NULL; - MT_lock_unset(&mal_contextLock, "mapreduce"); - - return MAL_SUCCEED; -} - -static int -MRcloudSize(str mrcluster) -{ - str msg; - int bid; - BAT *cloud; - int cnt; - - msg = MRgetCloud(&bid, &mrcluster); - if (msg) { - GDKfree(msg); /* bad programming */ - return 0; - } - cloud = BATdescriptor(bid); - if (cloud == NULL) - return 0; - cnt = (int)BATcount(cloud); - BBPunfix(bid); /* we're done with it */ - return(cnt); -} - - -enum poper { pBAT = 1, SUM, MAX, MIN, SORT, SORTDESC, LIMIT }; - -typedef struct _mapcol { - int mapid; /* var in map plan that is in its signature - and return */ - int reduceid; /* original column var in reduce program we - eventually need to replace */ - int type; /* type of the map plan var */ - int mapbat; /* the var that is a BAT containing all values - returned from map nodes (function), can only be - used *after* MRdistributework */ - enum poper postop;/* the operation that needs to be performed on - mapbat to turn it into reduceid */ - struct _mapcol *next; -} mapcol; - -static void -MRdistributework( - Client cntxt, - MalBlkPtr reduce, - mapcol *col, - InstrPtr sig, - str mrcluster) -{ - InstrPtr o, p = NULL, *packs; - int i, n, j, q, v, retc; - int *gets, *w; - mapcol *lcol; - (void)cntxt; - - n = MRcloudSize(mrcluster); - - assert(n); - assert(col); - - retc = 0; - for (lcol = col; lcol != NULL; lcol = lcol->next) - retc++; - - assert(retc); - - packs = (InstrPtr *)GDKmalloc(retc * sizeof(InstrPtr)); - gets = (int *)GDKmalloc(n * retc * sizeof(int)); - w = (int *)GDKmalloc(retc * sizeof(int)); - - for (lcol = col, j = 0; lcol != NULL; lcol = lcol->next, j++) { - /* define and create the container bat for all results from the - * map nodes */ - packs[j] = p = newFcnCall(reduce, batRef, newRef); - if (isaBatType(lcol->type)) { - p = pushType(reduce, p, getHeadType(lcol->type)); - p = pushType(reduce, p, getColumnType(lcol->type)); - setArgType(reduce, p, 0, lcol->type); - } else { - p = pushNil(reduce, p, TYPE_void); - p = pushType(reduce, p, lcol->type); - setArgType(reduce, p, 0, newBatType(TYPE_void, lcol->type)); - } - lcol->mapbat = getArg(p, 0); - - /* we need to declare the variables that we will use with put, - * exec and get */ - for (i = 0; i < n; i++) { - if (isaBatType(lcol->type)) { - p = newFcnCall(reduce, batRef, newRef); - p = pushType(reduce, p, getHeadType(lcol->type)); - p = pushType(reduce, p, getColumnType(lcol->type)); - } else { - p = newAssignment(reduce); - p = pushNil(reduce, p, lcol->type); - } - setArgType(reduce, p, 0, lcol->type); - gets[(i * retc) + j] = getArg(p, 0); - } - } - - for (i = 0; i < n; i++) { - /* q := remote.connect("uri", "user", "pass"); */ - p = newStmt(reduce, remoteRef, connectRef); - p = pushStr(reduce, p, mapnodes[i].uri); - p = pushStr(reduce, p, mapnodes[i].user); - p = pushStr(reduce, p, mapnodes[i].pass); - p = pushStr(reduce, p, "msql"); - q = getArg(p, 0); - - /* remote.register(q, "mod", "fcn"); */ - p = newStmt(reduce, remoteRef, putName("register", 8)); - p = pushArgument(reduce, p, q); - p = pushStr(reduce, p, getModuleId(sig)); - p = pushStr(reduce, p, getFunctionId(sig)); - - /* (x1, x2, ..., xn) := remote.exec(q, "mod", "fcn"); */ - p = newInstruction(reduce, ASSIGNsymbol); - setModuleId(p, remoteRef); - setFunctionId(p, execRef); - p = pushArgument(reduce, p, q); - p = pushStr(reduce, p, getModuleId(sig)); - p = pushStr(reduce, p, getFunctionId(sig)); - for (j = 0; j < retc; j++) { - /* x1 := remote.put(q, :type) */ - o = newFcnCall(reduce, remoteRef, putRef); - o = pushArgument(reduce, o, q); - o = pushArgument(reduce, o, gets[(i * retc) + j]); - v = getArg(o, 0); - p = pushReturn(reduce, p, v); - w[j] = v; - } - for (j = sig->retc; j < sig->argc; j++) { - /* x1 := remote.put(q, A0); */ - o = newStmt(reduce, remoteRef, putRef); - o = pushArgument(reduce, o, q); - o = pushArgument(reduce, o, getArg(sig, j)); - p = pushArgument(reduce, p, getArg(o, 0)); - } - pushInstruction(reduce, p); - - /* y1 := remote.get(q, x1); */ - for (j = 0; j < retc; j++) { - p = newFcnCall(reduce, remoteRef, getRef); - p = pushArgument(reduce, p, q); - p = pushArgument(reduce, p, w[j]); - getArg(p, 0) = gets[(i * retc) + j]; - } - - /* remote.disconnect(q); */ - p = newStmt(reduce, remoteRef, disconnectRef); - p = pushArgument(reduce, p, q); - } - - /* delayed bat.inserts for easily creating a deterministic flow */ - for (lcol = col, j = 0; lcol != NULL; lcol = lcol->next, j++) { - q = lcol->mapbat; - /* p := bat.insert(b, y1) */ - for (i = 0; i < n; i++) { - p = newStmt(reduce, batRef, insertRef); - p = pushArgument(reduce, p, q); - if (!isaBatType(lcol->type)) - p = pushNil(reduce, p, TYPE_void); - p = pushArgument(reduce, p, gets[(i * retc) + j]); - q = getArg(p, 0); - } - - lcol->mapbat = getArg(p, 0); - - /* We must deliver here the variables (reduceid) that the rest - * of the reduce plan uses in such a way that they can deal with - * it. Since this code is ran at latest (after the possible - * optimisations are known) the optimisation code cannot know - * what vars come out of here (in particular mapbat), so it must - * be able to rely on what it knows (reduceid). */ - switch (lcol->postop) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list