Changeset: 64531233dab6 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=64531233dab6 Modified Files: MonetDB5/src/modules/mal/mal_init.mx MonetDB5/src/optimizer/Makefile.ag MonetDB5/src/optimizer/opt_mapreduce.mx Branch: Jun2010 Log Message:
mapreduce is not ready to be distributed, remove from Jun2010 candidate branch diffs (truncated from 874 to 300 lines): diff -r 01417a10ef0d -r 64531233dab6 MonetDB5/src/modules/mal/mal_init.mx --- a/MonetDB5/src/modules/mal/mal_init.mx Mon May 17 11:35:26 2010 +0200 +++ b/MonetDB5/src/modules/mal/mal_init.mx Mon May 17 14:29:46 2010 +0200 @@ -222,7 +222,6 @@ include opt_strengthReduction; include opt_statistics; include opt_trace; -include opt_mapreduce; include chopper; include cluster; diff -r 01417a10ef0d -r 64531233dab6 MonetDB5/src/optimizer/Makefile.ag --- a/MonetDB5/src/optimizer/Makefile.ag Mon May 17 11:35:26 2010 +0200 +++ b/MonetDB5/src/optimizer/Makefile.ag Mon May 17 14:29:46 2010 +0200 @@ -36,7 +36,7 @@ opt_evaluate.mx opt_inline.mx opt_pushranges.mx opt_derivepath.mx \ opt_accessmode.mx opt_joinpath.mx opt_heuristics.mx opt_remap.mx \ opt_statistics.mx opt_trace.mx opt_recycler.mx opt_dataflow.mx \ - opt_cluster.mx opt_replication.mx opt_dictionary.mx opt_mapreduce.mx \ + opt_cluster.mx opt_replication.mx opt_dictionary.mx \ opt_mitosis.mx opt_octopus.mx opt_history.mx opt_selcrack.mx opt_sidcrack.mx #SCRIPTS = mal @@ -61,7 +61,7 @@ opt_evaluate.mx opt_inline.mx opt_pushranges.mx opt_derivepath.mx \ opt_accessmode.mx opt_joinpath.mx opt_heuristics.mx opt_remap.mx \ opt_statistics.mx opt_trace.mx opt_recycler.mx opt_dataflow.mx \ - opt_cluster.mx opt_replication.mx opt_dictionary.mx opt_mapreduce.mx \ + opt_cluster.mx opt_replication.mx opt_dictionary.mx \ opt_mitosis.mx opt_octopus.mx opt_history.mx opt_selcrack.mx opt_sidcrack.mx HEADERS = h @@ -82,7 +82,7 @@ opt_evaluate.mx opt_inline.mx opt_pushranges.mx opt_derivepath.mx \ opt_accessmode.mx opt_joinpath.mx opt_heuristics.mx opt_remap.mx \ opt_statistics.mx opt_trace.mx opt_recycler.mx opt_dataflow.mx \ - opt_cluster.mx opt_replication.mx opt_dictionary.mx opt_mapreduce.mx \ + opt_cluster.mx opt_replication.mx opt_dictionary.mx \ opt_mitosis.mx opt_octopus.mx opt_history.mx opt_selcrack.mx opt_sidcrack.mx } diff -r 01417a10ef0d -r 64531233dab6 MonetDB5/src/optimizer/opt_mapreduce.mx --- a/MonetDB5/src/optimizer/opt_mapreduce.mx Mon May 17 11:35:26 2010 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,829 +0,0 @@ -@' The contents of this file are subject to the MonetDB Public License -@' Version 1.1 (the "License"); you may not use this file except in -@' compliance with the License. You may obtain a copy of the License at -@' http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html -@' -@' Software distributed under the License is distributed on an "AS IS" -@' basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -@' License for the specific language governing rights and limitations -@' under the License. -@' -@' The Original Code is the MonetDB Database System. -@' -@' The Initial Developer of the Original Code is CWI. -@' Portions created by CWI are Copyright (C) 1997-July 2008 CWI. -@' Copyright August 2008-2010 MonetDB B.V. -@' All Rights Reserved. - -...@f opt_mapreduce -...@a M. Kersten, F. Groffen -...@- Map-Reduce -The Map-Reduce infrastructure requires a little optimizer to turn -an arbitrary query into a plan to be executed on the elements of the Cloud. - -In the first implementation we don't optimize the plan against the mapping scheme. -We simply assume that the complete query can be executed and that only the -result sets should be assembled. - -[OUTOFDATE] - -Consider part of the query plan for 'select * from tables' -...@verbatim -function user.s0_0{autoCommit=true}():void; - _23:bat[:oid,:sht] := sql.bind("sys","_tables","type",1); - _24:bat[:oid,:oid] := sql.bind_dbat("sys","_tables",1); - _25 := bat.reverse(_24); -... - _96:bat[:oid,:bte] := bat.new(nil:oid,nil:bte); - _98 := bat.append(_96,_95,true); - _96:bat[:oid,:bte] := nil:BAT; - _99 := bat.append(_98,_93,true); - _100 := sql.resultSet(8,1,_33); - sql.rsColumn(_100,".tables","id","int",32,0,_33); - sql.rsColumn(_100,".tables","name","varchar",1024,0,_44); - sql.rsColumn(_100,".tables","schema_id","int",32,0,_54); - sql.rsColumn(_100,".tables","query","varchar",2048,0,_64); - sql.rsColumn(_100,".tables","type","smallint",16,0,_70); - sql.rsColumn(_100,".tables","system","boolean",1,0,_81); - sql.rsColumn(_100,".tables","commit_action","smallint",16,0,_91); - sql.rsColumn(_100,".tables","temporary","tinyint",8,0,_99); - _121 := io.stdout(); - sql.exportResult(_121,_100); -end s0_0; -...@end verbatim -This plan is turned into two routines. One to be executed -on the individual nodes and one to assemble the results. -...@verbatim -function user.s0_0mp() (s0_0:void,X61:bat[:oid,:int],X85:bat[:oid,:str],X109:bat[:oid,:int],X134:bat[:oid,:str],X142:bat[:oid,:sht],X168:bat[:oid,:bit],X191:bat[:oid,:sht],X201:bat[:oid,:bte]); - _23:bat[:oid,:sht] := sql.bind("sys","_tables","type",1); - _24:bat[:oid,:oid] := sql.bind_dbat("sys","_tables",1); - _25 := bat.reverse(_24); -... - _96:bat[:oid,:bte] := bat.new(nil:oid,nil:bte); - _98 := bat.append(_96,_95,true); - _96:bat[:oid,:bte] := nil:BAT; - _99 := bat.append(_98,_93,true); - return (s0_0,X61,X85,X109,X134,X142,X168,X191,X201); -end s0_0mp; -function user.s0_0():void; - s0_0 := nil:void; - X61 := nil:bat[:oid,:int]; - X85 := nil:bat[:oid,:str]; - X109 := nil:bat[:oid,:int]; - X134 := nil:bat[:oid,:str]; - X142 := nil:bat[:oid,:sht]; - X168 := nil:bat[:oid,:bit]; - X191 := nil:bat[:oid,:sht]; - X201 := nil:bat[:oid,:bte]; - (_253,_254,_255,_256,_257,_258,_259,_260,_261) := mapreduce.exec(0,"user","s0_0mp"); - (_263,_264,_265,_266,_267,_268,_269,_270,_271) := mapreduce.exec(1,"user","s0_0mp"); - (_273,_274,_275,_276,_277,_278,_279,_280,_281) := mapreduce.exec(2,"user","s0_0mp"); - X61 := mat.pack(_254,_264,_274); - X85 := mat.pack(_255,_265,_275); - X109 := mat.pack(_256,_266,_276); - X134 := mat.pack(_257,_267,_277); - X142 := mat.pack(_258,_268,_278); - X168 := mat.pack(_259,_269,_279); - X191 := mat.pack(_260,_270,_280); - X201 := mat.pack(_261,_271,_281); -exit _250; - X202 := sql.resultSet(8,1,X61); - sql.rsColumn(X202,".tables","id","int",32,0,X61); - sql.rsColumn(X202,".tables","name","varchar",1024,0,X85); - sql.rsColumn(X202,".tables","schema_id","int",32,0,X109); - sql.rsColumn(X202,".tables","query","varchar",2048,0,X134); - sql.rsColumn(X202,".tables","type","smallint",16,0,X142); - sql.rsColumn(X202,".tables","system","boolean",1,0,X168); - sql.rsColumn(X202,".tables","commit_action","smallint",16,0,X191); - sql.rsColumn(X202,".tables","temporary","tinyint",8,0,X201); - X232 := io.stdout(); - sql.exportResult(X232,X202); -end s0_0; -function mapreduce.exec_3(conn:str, mod:str, fcn:str):any_1...; - remote.register(conn, mod, fcn); - (X201,X202,X203) := remote.exec(conn, mod, fcn); - Y201 := remote.get(conn, X201); - Y202 := remote.get(conn, X202); - Y203 := remote.get(conn, X203); - return (Y201,Y202,Y203); -end exec; -...@end verbatim -The code can be considered a refinement of the Octopus. -...@{ -...@mal -pattern optimizer.mapreduce():str -address OPTmapreduce; -pattern optimizer.mapreduce(mod:str, fcn:str):str -address OPTmapreduce -comment "Modify the plan to exploit parallel processing on multiple systems using map-reduce"; -...@h -#ifndef _OPT_MAPREDUCE_ -#define _OPT_MAPREDUCE_ -#include "opt_prelude.h" -#include "opt_support.h" - -opt_export str MRexec(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); -...@c -#include "mal_config.h" -#include "opt_mapreduce.h" -#include "mal_interpreter.h" -#include "remote.h" - -/* #define _DEBUG_OPT_MAPREDUCE*/ - -...@- -The work distribution assumes that we know at compile time -the number of nodes participating in the cloud setting. -It calls the map-reduce executor to produce a result -possible with the aid of a replica. -...@c - -typedef struct _mapnode { - str uri; - str user; - str pass; -} mapnode; - -#define MAXNODES 256 -static mapnode mapnodes[MAXNODES]; - -static int -MRcloudSize(str mrcluster) -{ - str msg; - bat bid = 0; - BAT *b; - BUN p, q; - BATiter bi; - int mapcount = 0; - char nodes[1024]; - char *n = nodes; - - snprintf(nodes, sizeof(nodes), "*/%s/node/*", mrcluster); - - msg = RMTresolve(&bid, &n); - if (msg != MAL_SUCCEED) { - if (msg != M5OutOfMemory) - GDKfree(msg); - return(0); - } - - b = BATdescriptor(bid); - if (b == NULL) - return(0); - - bi = bat_iterator(b); - BATloop(b, p, q) { - str t = (str)BUNtail(bi, p); - - if (mapcount == MAXNODES) - break; - - mapnodes[mapcount].uri = GDKstrdup(t); - mapnodes[mapcount].user = GDKstrdup("monetdb"); - mapnodes[mapcount].pass = GDKstrdup("monetdb"); - mapcount++; - } - BBPreleaseref(bid); - - if (GDKnr_threads < mapcount) - GDKnr_threads = mapcount; - - return(mapcount); -} - -static void -MRcleanCloud(int mapcount) -{ - int i; - - for (i = 0; i < mapcount; i++) { - if (mapnodes[mapcount].uri != NULL) - GDKfree(mapnodes[mapcount].uri); - if (mapnodes[mapcount].user != NULL) - GDKfree(mapnodes[mapcount].user); - if (mapnodes[mapcount].pass != NULL) - GDKfree(mapnodes[mapcount].pass); - } -} - -typedef struct _mapcol { - int val1; - int val1type; - int val2; - int val2type; - int val3; - int val3type; - struct _mapcol *next; -} mapcol; - -static void -MRdistributework( - Client cntxt, - MalBlkPtr mb, - mapcol *col, - InstrPtr sig, - str mrcluster) -{ - InstrPtr o, p, *packs; - int i, n, j, q, v, retc; - int *gets, *w; - mapcol *lcol; - (void)cntxt; - - n = MRcloudSize(mrcluster); - - assert(n); - assert(col); - - retc = 0; - for (lcol = col; lcol != NULL; lcol = lcol->next) - retc++; - - assert(retc); - - packs = (InstrPtr *)alloca(retc * sizeof(InstrPtr)); - gets = (int *)alloca(n * retc * sizeof(int)); - w = (int *)alloca(retc * sizeof(int)); - - for (lcol = col, j = 0; lcol != NULL; lcol = lcol->next, j++) { - packs[j] = p = newFcnCall(mb, batRef, newRef); - p = pushType(mb, p, getHeadType(lcol->val1type)); - p = pushType(mb, p, getTailType(lcol->val1type)); - getArg(p, 0) = lcol->val1; - - /* same for all sub results that we push into the mat.pack as _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list