Changeset: d36e6bd177bc for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d36e6bd177bc Modified Files: sql/backends/monet5/datacell/Tests/emitter00.mal sql/backends/monet5/datacell/Tests/receptor00.mal sql/backends/monet5/datacell/datacell.sql sql/backends/monet5/datacell/emitter.mx sql/backends/monet5/datacell/receptor.mx Branch: default Log Message:
Redesign of emitter module The design is reduced to a simple start-{pause,resume} - drop scheme. The notion of active/passive emitter replaces the client/server tags. No testing yet, merely reducing interface complexity. diffs (truncated from 420 to 300 lines): diff --git a/sql/backends/monet5/datacell/Tests/emitter00.mal b/sql/backends/monet5/datacell/Tests/emitter00.mal --- a/sql/backends/monet5/datacell/Tests/emitter00.mal +++ b/sql/backends/monet5/datacell/Tests/emitter00.mal @@ -1,21 +1,25 @@ -#A test for the emitter -#The emitter uses the prefix to lock the streams -libary datacell; -include basket; -include emitter; +#A single thread for a simple stream +#The test is based on the definition the datacell baskets X +# it starts a receptor in debug mode to see arrivals coming +# from a (client) sensor. -p1:= basket.new("X_p1",:bat[:lng,:int]); -p2:= basket.new("X_p2",:bat[:lng,:int]); -basket.group("X","X_p1","X_p2"); -t:= alarm.usec(); -bat.insert(p1,t,1); -bat.insert(p2,t,1); -t:= alarm.usec(); -bat.insert(p1,t,1); -bat.insert(p2,t,1); +sql.init(); -emitter.new("X"); -emitter.start("X","localhost",50001); -io.print("emitter done"); +emitter.start("datacell","X","localhost",50502,"passive"); +io.print("emitter working"); alarm.sleep(5); -emitter.drop("X"); +emitter.pause("datacell","X"); +io.print("emitter paused"); +alarm.sleep(5); +emitter.resume("datacell","X"); +io.print("emitter restarted"); +alarm.sleep(5); +emitter.pause("datacell","X"); +io.print("emitter stopped"); +emitter.drop("datacell","X"); + +# The SQL equivalents +# call emitter.start('datacell','X','localhost',50502,'passive'); +# call emitter.pause('datacell','X'); +# call emitter.resume('datacell','X'); +# call emitter.drop('datacell','X'); diff --git a/sql/backends/monet5/datacell/Tests/receptor00.mal b/sql/backends/monet5/datacell/Tests/receptor00.mal --- a/sql/backends/monet5/datacell/Tests/receptor00.mal +++ b/sql/backends/monet5/datacell/Tests/receptor00.mal @@ -5,7 +5,7 @@ sql.init(); -receptor.start("datacell","X","localhost",5001,"passive"); +receptor.start("datacell","X","localhost",50501,"passive"); io.print("receptor working"); alarm.sleep(5); receptor.pause("datacell","X"); diff --git a/sql/backends/monet5/datacell/datacell.sql b/sql/backends/monet5/datacell/datacell.sql --- a/sql/backends/monet5/datacell/datacell.sql +++ b/sql/backends/monet5/datacell/datacell.sql @@ -31,3 +31,18 @@ create procedure receptor.drop (sch string, tbl string) external name receptor.drop; +-- Datacell emitter wrappers + +create schema emitter; +create procedure emitter.start (sch string, tbl string, host string, port int, protocol string) + external name emitter.start; + +create procedure emitter.pause (sch string, tbl string) + external name emitter.pause; + +create procedure emitter.resume (sch string, tbl string) + external name emitter.resume; + +create procedure emitter.drop (sch string, tbl string) + external name emitter.drop; + diff --git a/sql/backends/monet5/datacell/emitter.mx b/sql/backends/monet5/datacell/emitter.mx --- a/sql/backends/monet5/datacell/emitter.mx +++ b/sql/backends/monet5/datacell/emitter.mx @@ -21,9 +21,6 @@ This module is a prototype for the implementation of a DataCell emitter. It can be used as follows. @example -pl:= basket.new("X_p1",:bat[:lng, :int]); -emitter.new("Y"); -emitter.start("Y","localhost",50000); @end example After this call it will sent tuples from basket X_p1 to the stream Y at the localhost default port. @@ -41,31 +38,24 @@ @mal module emitter; -command new(sch:str,tbl:str):void + +command start{unsafe}(schema:str, tbl:str, host:str, port:int, prot:str) address DCemitterNew -comment "Define a new emitter."; +comment "Define a emitter based on a basket table. +The emitter protocol is either active or passive. Return its handle"; -command setType(sch:str,tbl:str, type:str):void -address DCemitterSetType -comment "Set the emitter as a server or a client."; +command pause(schema:str, tbl:str) +address DCemitterPause +comment "Pause listening"; -command start(sch:str,tbl:str) -address DCemitterStart -comment "Start an emitter thread"; -command start(sch:str,tbl:str, host:str, port:int) -address DCemitterStartFull -comment "Start an emitter thread"; +command resume(schema:str, tbl:str) +address DCemitterResume +comment "Resume a emitter thread"; -command pause(sch:str,tbl:str) -address DCemitterPause -comment "Pause sending"; -command stop(sch:str,tbl:str) -address DCemitterStop -comment "Stop an emitter thread"; +command drop(schema:str, tb:str) +address DCemitterDrop +comment "Drop a emitter"; -command drop(sch:str,tbl:str) -address DCemitterDrop -comment "Drop the emitter"; @- @{ @+ Implementation @@ -79,7 +69,7 @@ #include "mtime.h" #include "basket.h" -/* #define _DEBUG_EMITTER_ */ +#define _DEBUG_EMITTER_ #define EMout GDKout #ifdef WIN32 @@ -92,13 +82,11 @@ #define adapters_export extern #endif -adapters_export str DCemitterNew(int *ret, str *schema, str *grp); -adapters_export str DCemitterSetType(int *ret, str *schema, str *grp, str *type); -adapters_export str DCemitterStart(int *ret, str *schema, str *nme); -adapters_export str DCemitterStartFull(int *ret, str *schema, str *nme, str *host, int *port); -adapters_export str DCemitterStop(int *ret, str *schema, str *nme); +adapters_export str DCemitterNew(int *ret, str *schema, str *tbl, str *host, int *port, str *proto); adapters_export str DCemitterPause(int *ret, str *schema, str *nme); +adapters_export str DCemitterResume(int *ret, str *schema, str *nme); adapters_export str DCemitterDrop(int *ret, str *schema, str *nme); + #endif @c @@ -113,6 +101,9 @@ #define EMDROP 5 #define EMERROR 8 /* failed to establish the stream */ +#define EMPASSIVE 1 +#define EMACTIVE 2 + #define TCP 1 #define UDP 2 static int protocol= TCP; @@ -125,7 +116,7 @@ str host; int port; int status; - int server;/* control the delay between attempts to connect */ + int protocol;/* control the delay between attempts to connect */ int delay; int lck; SOCKET sockfd; @@ -170,28 +161,41 @@ The baskets should already be defined. There order is used to interpret the messages sent. @c -str DCemitterNew(int *ret, str *schema, str *grp) +str DCemitterNew(int *ret, str *schema, str *tbl, str *host, int *port, str *proto) { Emitter em; - int idx, i, j, len; + int protocol, idx, i, j, len; BAT *b; - if (EMfind(*schema, *grp)) + if (EMfind(*schema, *tbl)) throw(MAL, "emitter.new", "Duplicate emitter"); - em = EMnew(*schema, *grp); - em->host = GDKstrdup("localhost"); - em->port = 50001; - em->delay = PAUSEDEFAULT; - em->lck = 0; - em->error = NULL; - em->server = 1; @- All tables are prepended with a default tick bat. It becomes the synchronization handle. @c - len = DCmemberCount(*schema, *grp); + len = DCmemberCount(*schema, *tbl); if (len == 0) - throw(MAL, "receptor.new", "Group has no members"); + throw(MAL, "emitter.new", "Group has no members"); + + idx = DClocate(*schema, *tbl); + if (idx == 0) + throw(MAL, "emitter.new", "basket not found"); + + if ( strcmp(*proto, "active") == 0) + protocol = EMACTIVE; + else + if ( strcmp(*proto, "passive") == 0) + protocol = EMPASSIVE; + else + throw(MAL, "emitter.new", "Illegal protocol"); + + em = EMnew(*schema, *tbl); + em->host = GDKstrdup(*host); + em->port = *port; + em->delay = PAUSEDEFAULT; + em->lck = 0; + em->error = NULL; + em->protocol = protocol; em->table.format = GDKzalloc(sizeof(Column) * (len + 1)); em->table.format[0].c[0] = NULL; @@ -200,10 +204,6 @@ em->table.format[0].seplen = (int)strlen(em->table.format[0].sep); em->status = EMSTOP; - idx = DClocate(*schema, *grp); - if (idx == 0) - throw(MAL, "receptor.new", "basket not found"); - for (j = 0, i = 0; i < baskets[idx].colcount; i++) { b = baskets[idx].primary[j]; if (b == NULL) { @@ -234,41 +234,11 @@ #ifdef _DEBUG_EMITTER_ mnstr_printf(EMout, "Instantiate a new emitter %d fields\n", i); #endif - return MAL_SUCCEED; -} - -str DCemitterSetType(int *ret, str *schema, str *nme, str *type) -{ - Emitter em; - + em->status = EMLISTEN; (void)ret; - em = EMfind(*schema, *nme); - if (em == NULL) - throw(MAL, "emitter.setType", "Emitter not defined"); - if (strcmp(*type, "server") == 0) { - em->server = 1; - return MAL_SUCCEED; + if ( MT_create_thread(&em->pid, (void (*)(void *))EMstartThread, em, MT_THR_DETACHED) != 0) { + throw(MAL, "emitter.start", "Emitter initiation failed"); } - if (strcmp(*type, "client") == 0) { - em->server = 0; - return MAL_SUCCEED; - } - throw(MAL, "emitter.setType", "The type should be <server> or <client>."); -} - -str DCemitterStop(int *ret, str *schema, str *nme) -{ - Emitter em; - - em = EMfind(*schema, *nme); - if (em == NULL) - throw(MAL, "emitter.stop", "Emitter not defined"); - em->status = EMSTOP; - -#ifdef _DEBUG_EMITTER_ - mnstr_printf(EMout, "Stop a new emitter\n"); -#endif - (void)ret; return MAL_SUCCEED; } @@ -291,55 +261,19 @@ return MAL_SUCCEED; } -str DCemitterStart(int *ret, str *schema, str *nme) +str DCemitterResume(int *ret, str *schema, str *nme) _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list