Changeset: d36e6bd177bc for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d36e6bd177bc
Modified Files:
        sql/backends/monet5/datacell/Tests/emitter00.mal
        sql/backends/monet5/datacell/Tests/receptor00.mal
        sql/backends/monet5/datacell/datacell.sql
        sql/backends/monet5/datacell/emitter.mx
        sql/backends/monet5/datacell/receptor.mx
Branch: default
Log Message:

Redesign of emitter module
The design is reduced to a simple start-{pause,resume} - drop scheme.
The notion of active/passive emitter replaces the client/server tags.
No testing yet, merely reducing interface complexity.


diffs (truncated from 420 to 300 lines):

diff --git a/sql/backends/monet5/datacell/Tests/emitter00.mal 
b/sql/backends/monet5/datacell/Tests/emitter00.mal
--- a/sql/backends/monet5/datacell/Tests/emitter00.mal
+++ b/sql/backends/monet5/datacell/Tests/emitter00.mal
@@ -1,21 +1,25 @@
-#A test for the emitter
-#The emitter uses the prefix to lock the streams
-libary datacell;
-include basket;
-include emitter;
+#A single thread for a simple stream
+#The test is based on the definition the datacell baskets X 
+# it starts a receptor in debug mode to see arrivals coming
+# from a (client) sensor.
 
-p1:= basket.new("X_p1",:bat[:lng,:int]);
-p2:= basket.new("X_p2",:bat[:lng,:int]);
-basket.group("X","X_p1","X_p2");
-t:= alarm.usec();
-bat.insert(p1,t,1);
-bat.insert(p2,t,1);
-t:= alarm.usec();
-bat.insert(p1,t,1);
-bat.insert(p2,t,1);
+sql.init();
 
-emitter.new("X");
-emitter.start("X","localhost",50001);
-io.print("emitter done");
+emitter.start("datacell","X","localhost",50502,"passive");
+io.print("emitter working");
 alarm.sleep(5);
-emitter.drop("X");
+emitter.pause("datacell","X");
+io.print("emitter paused");
+alarm.sleep(5);
+emitter.resume("datacell","X");
+io.print("emitter restarted");
+alarm.sleep(5);
+emitter.pause("datacell","X");
+io.print("emitter stopped");
+emitter.drop("datacell","X");
+
+# The SQL equivalents
+# call emitter.start('datacell','X','localhost',50502,'passive');
+# call emitter.pause('datacell','X');
+# call emitter.resume('datacell','X');
+# call emitter.drop('datacell','X');
diff --git a/sql/backends/monet5/datacell/Tests/receptor00.mal 
b/sql/backends/monet5/datacell/Tests/receptor00.mal
--- a/sql/backends/monet5/datacell/Tests/receptor00.mal
+++ b/sql/backends/monet5/datacell/Tests/receptor00.mal
@@ -5,7 +5,7 @@
 
 sql.init();
 
-receptor.start("datacell","X","localhost",5001,"passive");
+receptor.start("datacell","X","localhost",50501,"passive");
 io.print("receptor working");
 alarm.sleep(5);
 receptor.pause("datacell","X");
diff --git a/sql/backends/monet5/datacell/datacell.sql 
b/sql/backends/monet5/datacell/datacell.sql
--- a/sql/backends/monet5/datacell/datacell.sql
+++ b/sql/backends/monet5/datacell/datacell.sql
@@ -31,3 +31,18 @@
 create procedure receptor.drop (sch string, tbl string)
     external name receptor.drop;
 
+-- Datacell emitter wrappers
+
+create schema emitter;
+create procedure emitter.start (sch string, tbl string, host string, port int, 
protocol string)
+    external name emitter.start;
+
+create procedure emitter.pause (sch string, tbl string)
+    external name emitter.pause;
+
+create procedure emitter.resume (sch string, tbl string)
+    external name emitter.resume;
+
+create procedure emitter.drop (sch string, tbl string)
+    external name emitter.drop;
+
diff --git a/sql/backends/monet5/datacell/emitter.mx 
b/sql/backends/monet5/datacell/emitter.mx
--- a/sql/backends/monet5/datacell/emitter.mx
+++ b/sql/backends/monet5/datacell/emitter.mx
@@ -21,9 +21,6 @@
 This module is a prototype for the implementation of a
 DataCell emitter.  It can be used as follows.
 @example
-pl:= basket.new("X_p1",:bat[:lng, :int]);
-emitter.new("Y");
-emitter.start("Y","localhost",50000);
 @end example
 After this call it will sent tuples from basket X_p1
 to the stream Y at the localhost default port.
@@ -41,31 +38,24 @@
 
 @mal
 module emitter;
-command new(sch:str,tbl:str):void
+
+command start{unsafe}(schema:str, tbl:str, host:str, port:int, prot:str)
 address DCemitterNew
-comment "Define a new emitter.";
+comment "Define a emitter based on a basket table. 
+The emitter protocol is either active or passive. Return its handle";
 
-command setType(sch:str,tbl:str, type:str):void
-address DCemitterSetType
-comment "Set the emitter as a server or a client.";
+command pause(schema:str, tbl:str)
+address DCemitterPause
+comment "Pause listening";
 
-command start(sch:str,tbl:str)
-address DCemitterStart
-comment "Start an emitter thread";
-command start(sch:str,tbl:str, host:str, port:int)
-address DCemitterStartFull
-comment "Start an emitter thread";
+command resume(schema:str, tbl:str)
+address DCemitterResume
+comment "Resume a emitter thread";
 
-command pause(sch:str,tbl:str)
-address DCemitterPause
-comment "Pause sending";
-command stop(sch:str,tbl:str)
-address DCemitterStop
-comment "Stop an emitter thread";
+command drop(schema:str, tb:str)
+address DCemitterDrop
+comment "Drop a emitter";
 
-command drop(sch:str,tbl:str)
-address DCemitterDrop
-comment "Drop the emitter";
 @-
 @{
 @+ Implementation
@@ -79,7 +69,7 @@
 #include "mtime.h"
 #include "basket.h"
 
-/* #define _DEBUG_EMITTER_ */
+#define _DEBUG_EMITTER_ 
 #define EMout GDKout
 
 #ifdef WIN32
@@ -92,13 +82,11 @@
 #define adapters_export extern
 #endif
 
-adapters_export str DCemitterNew(int *ret, str *schema, str *grp);
-adapters_export str DCemitterSetType(int *ret, str *schema, str *grp, str 
*type);
-adapters_export str DCemitterStart(int *ret, str *schema, str *nme);
-adapters_export str DCemitterStartFull(int *ret, str *schema, str *nme, str 
*host, int *port);
-adapters_export str DCemitterStop(int *ret, str *schema, str *nme);
+adapters_export str DCemitterNew(int *ret, str *schema, str *tbl, str *host, 
int *port, str *proto);
 adapters_export str DCemitterPause(int *ret, str *schema, str *nme);
+adapters_export str DCemitterResume(int *ret, str *schema, str *nme);
 adapters_export str DCemitterDrop(int *ret, str *schema, str *nme);
+
 #endif
 
 @c
@@ -113,6 +101,9 @@
 #define EMDROP 5               
 #define EMERROR 8               /* failed to establish the stream */
 
+#define EMPASSIVE 1
+#define EMACTIVE 2
+
 #define TCP 1
 #define UDP 2
 static int protocol= TCP;
@@ -125,7 +116,7 @@
        str host;
        int port;
        int status;
-       int server;/* control the delay between attempts to connect */
+       int protocol;/* control the delay between attempts to connect */
        int delay;
        int lck;
        SOCKET sockfd;
@@ -170,28 +161,41 @@
 The baskets should already be defined. There order
 is used to interpret the messages sent.
 @c
-str DCemitterNew(int *ret, str *schema, str *grp)
+str DCemitterNew(int *ret, str *schema, str *tbl, str *host, int *port, str 
*proto)
 {
        Emitter em;
-       int idx, i, j, len;
+       int protocol, idx, i, j, len;
        BAT *b;
 
-       if (EMfind(*schema, *grp))
+       if (EMfind(*schema, *tbl))
                throw(MAL, "emitter.new", "Duplicate emitter");
-       em = EMnew(*schema, *grp);
-       em->host = GDKstrdup("localhost");
-       em->port = 50001;
-       em->delay = PAUSEDEFAULT;
-       em->lck = 0;
-       em->error = NULL;
-       em->server = 1;
 @-
 All tables are prepended with a default tick bat.
 It becomes the synchronization handle.
 @c
-       len = DCmemberCount(*schema, *grp);
+       len = DCmemberCount(*schema, *tbl);
        if (len == 0)
-               throw(MAL, "receptor.new", "Group has no members");
+               throw(MAL, "emitter.new", "Group has no members");
+
+       idx = DClocate(*schema, *tbl);
+       if (idx == 0)
+               throw(MAL, "emitter.new", "basket not found");
+
+       if ( strcmp(*proto, "active") == 0)
+               protocol = EMACTIVE;
+       else
+       if ( strcmp(*proto, "passive") == 0)
+               protocol = EMPASSIVE;
+       else
+               throw(MAL, "emitter.new", "Illegal protocol");
+
+       em = EMnew(*schema, *tbl);
+       em->host = GDKstrdup(*host);
+       em->port = *port;
+       em->delay = PAUSEDEFAULT;
+       em->lck = 0;
+       em->error = NULL;
+       em->protocol = protocol;
 
        em->table.format = GDKzalloc(sizeof(Column) * (len + 1));
        em->table.format[0].c[0] = NULL;
@@ -200,10 +204,6 @@
        em->table.format[0].seplen = (int)strlen(em->table.format[0].sep);
        em->status = EMSTOP;
 
-       idx = DClocate(*schema, *grp);
-       if (idx == 0)
-               throw(MAL, "receptor.new", "basket not found");
-
        for (j = 0, i = 0; i < baskets[idx].colcount; i++) {
                b = baskets[idx].primary[j];
                if (b == NULL) {
@@ -234,41 +234,11 @@
 #ifdef _DEBUG_EMITTER_
        mnstr_printf(EMout, "Instantiate a new emitter %d fields\n", i);
 #endif
-       return MAL_SUCCEED;
-}
-
-str DCemitterSetType(int *ret, str *schema, str *nme, str *type)
-{
-       Emitter em;
-
+       em->status = EMLISTEN;
        (void)ret;
-       em = EMfind(*schema, *nme);
-       if (em == NULL)
-               throw(MAL, "emitter.setType", "Emitter not defined");
-       if (strcmp(*type, "server") == 0) {
-               em->server = 1;
-               return MAL_SUCCEED;
+       if ( MT_create_thread(&em->pid, (void (*)(void *))EMstartThread, em, 
MT_THR_DETACHED) != 0) {
+               throw(MAL, "emitter.start", "Emitter initiation failed");
        }
-       if (strcmp(*type, "client") == 0) {
-               em->server = 0;
-               return MAL_SUCCEED;
-       }
-       throw(MAL, "emitter.setType", "The type should be <server> or 
<client>.");
-}
-
-str DCemitterStop(int *ret, str *schema, str *nme)
-{
-       Emitter em;
-
-       em = EMfind(*schema, *nme);
-       if (em == NULL)
-               throw(MAL, "emitter.stop", "Emitter not defined");
-       em->status = EMSTOP;
-
-#ifdef _DEBUG_EMITTER_
-       mnstr_printf(EMout, "Stop a new emitter\n");
-#endif
-       (void)ret;
        return MAL_SUCCEED;
 }
 
@@ -291,55 +261,19 @@
        return MAL_SUCCEED;
 }
 
-str DCemitterStart(int *ret, str *schema, str *nme)
+str DCemitterResume(int *ret, str *schema, str *nme)
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to