On 01/13/2016 04:39 PM, Will Coleda wrote:

Hi Will,

First I must say that the code is still in development so it might look a bit messy. Now the test file uses the Client to get a MongoDB server using host and port info. This info can default to localhost and port 27017. The tests are using a server setup to run at localhost and port 65000. The Client on its turn will try to manage more servers which are setup using threads(Promises). At the moment only one but it is setup to provide for multiple connections to servers in the same replica set,

When a connection is made a database object is made which uses run-command to run a command to drop a database. This will go through a Collection ending up in Wire(using query()) to send the encoded data to the mongo server. This request expects an answer which is read a few lines further on. On this place (line 40) I get the error.

You will notice that as a workaround I've closed the port before returning the object(At line 38 in Connection). The port will be reopened when the send/receive is called. When I don't close it I will get the above mentioned error.

Thanks for looking into it
Marcel

B.t.w. This is Rakudo version 2015.12-201-g2a8ca94 built on MoarVM version 2015.12-29-g8079ca5
implementing Perl 6.c.

Can you post the code that causes the issue?

On Wed, Jan 13, 2016 at 4:00 AM, mt1957 <mt1...@gmail.com> wrote:
L.s.

I got the following error

'Tried to read() on a socket from outside its originating thread'

This socket is created while in another thread using Promise and the object
is retrieved using
$promise.result. The weird thing is also that just a few lines before that
the same socket is used for writing! This write completed successfully and
data is seen traveling using wireshark.

Is there something still unfinished in a thread or is there some status not
reset?

Marcel Timmerman





use v6;

# use lib '/home/marcel/Languages/Perl6/Projects/BSON/lib';

use MongoDB::Connection;
use BSON::Document;

package MongoDB {

  #-----------------------------------------------------------------------------
  #
  class Client {

#TODO refine this method of using server name/port, connection pooling etc

    my Array $server-connections;       # Array of connections
    my Array $server-discovery;         # Array of promises

    #---------------------------------------------------------------------------
    #
    multi submethod BUILD (
      Str :$host,
      Int :$port where (!$_.defined or 0 <= $_ <= 65535),
      Str :$url
    ) {

      $server-connections = [] unless $server-connections.defined;
      $server-discovery = [] unless $server-discovery.defined;

      my Pair @server-specs = ();
      my Str $server-name;
      my Int $server-port;

say "H & P: {$host//'nh'}, {$port//'np'}, {$url // 'nu'}";

      if ?$url {
#TODO process url
#        $server-name = 'localhost';
#        $server-port = 27017;
      }

      else {
        # Test for the server name. When no cases match a previously stored
        # server name is taken
        #
        if !?$host and !?$server-name and !?$url {
          $server-name = 'localhost';
        }

        elsif ?$host {
          $server-name = $host;
        }

        # Test for the server port. When no cases match a previously stored
        # server port is taken
        #
        if !$port.defined and !$server-port.defined {
          $server-port = 27017;
        }

        elsif $port.defined {
          $server-port = $port;
        }

        @server-specs.push: ($server-name => $server-port);
      }

      # Background process to discover hosts only if there are no servers
      # discovered yet or that new non default cases are presnted.
      #
      if !$server-connections.elems
         or $server-name ne 'localhost'
         or $server-port != 27017 {

        for @server-specs -> Pair $spec {
          $server-discovery.push: Promise.start( {
              MongoDB::Connection.new(
                :client(self),
                :host($spec.key),
                :port($spec.value)
              );
            }
          );
say "KV: {$spec.kv}, {@server-specs.elems}, {$server-discovery.elems}";
        }
      }
    }

    #---------------------------------------------------------------------------
    # Server discovery
    #
    method !discover-servers ( ) {

    }

    #---------------------------------------------------------------------------
    #
#    method select-server ( Bool :$need-master = True --> MongoDB::Connection ) 
{
    method select-server ( --> MongoDB::Connection ) {

      my MongoDB::Connection $server;
      while !$server.defined {
        my Bool $isMaster = False;

        loop ( my $pi = 0; $pi < $server-discovery.elems; $pi++ ) {
          my $promise = $server-discovery[$pi];
say "P: $pi, ", $promise.WHAT;

          next unless $promise ~~ Promise and $promise.defined;
say "P sts: ", $promise.status;

          # If promise is kept, the Connection object has been created
          #
          if $promise.status ~~ Kept {
            
            # Get the Connection object from the promise result and check
            # its status. When True, there is a proper server found and its
            # socket can be used for I/O.
            #
            $server = $promise.result;
say "C sts: ", $server.WHAT, ', ', $server.status;
            if $server.status {

              $server-connections.push: $server;

#TODO Test for master server, continue if not and needed
#$isMaster = True;
#last;
            }
            
            else {
              $server = Nil;
            }

            $server-discovery[$pi] = Nil;
            $server-discovery.splice( $pi, 1);
          }

          elsif $promise.status == Broken {
            $server-discovery[$pi] = Nil;
            $server-discovery.splice( $pi, 1);
          }
        
#          elsif $promise.status == Planned {
#            $server-discovery[$pi] = Nil;
#            $server-discovery.splice( $pi, 1);
#          }
        }

        # When there isn't a server found from newly created servers
        # try cached entries
        #
        unless $server.defined {
          loop ( my $si = 0; $si < $server-connections.elems; $si++) {
            $server = $server-connections[$si];
#TODO Test for master server, continue if not and needed
#$isMaster = True;
#last;
say "Cached server $si: ", $server.WHAT, ', ', $server.status;
          }
        }

        unless $server.defined {
          if $server-discovery.elems {
say "sleep ...";
            sleep 1;
          }

          else {
say "server discovery data exhausted";
            return $server;
          }
        }
      }

      return $server;
    }
  }
}

use v6;

use MongoDB;
use MongoDB::Wire;
use MongoDB::Cursor;
use BSON::Document;

#-------------------------------------------------------------------------------
#
package MongoDB {

  class Collection {

    # State used so it initializes only once
    #
    state MongoDB::Wire $wire .= new;

    has $.database;
    has Str $.name;
    has Str $.full-collection-name;

    has BSON::Javascript $!default-js = BSON::Javascript.new(:javascript(''));

    #---------------------------------------------------------------------------
    #
    submethod BUILD ( :$database!, Str:D :$name ) {
      $!database = $database;
      $!name = $name;
      $!full-collection-name = [~] $!database.name, '.', $name;

      # This should be possible: 'admin.$cmd' which is used by run-command
      #
      if $name ~~ m/^ <[\$ _ A..Z a..z]> <[\$ . \w _]>+ $/ {
        $!name = $name;
      }

      else {
        die X::MongoDB.new(
          error-text => "Illegal collection name: '$name'",
          oper-name => 'MongoDB::Collection.new()',
          severity => MongoDB::Severity::Error
        );
      }
    }

    #---------------------------------------------------------------------------
    # Find record in a collection. One of the few left to use the wire protocol.
    #
    # Method using Pair.
    #
    multi method find (
      List :$criteria where all(@$criteria) ~~ Pair = (),
      List :$projection where all(@$criteria) ~~ Pair = (),
      Int :$number-to-skip = 0, Int :$number-to-return = 0,
      Int :$flags = 0
      --> MongoDB::Cursor
    ) {
      my BSON::Document $cr .= new: $criteria;
      my BSON::Document $pr .= new: $projection;
      my BSON::Document $server-reply = $wire.query(
        self, $cr, $pr, :$flags, :$number-to-skip,
        :$number-to-return
      );

      return MongoDB::Cursor.new( :collection(self), :$server-reply);
    }

    # Find record in a collection using a BSON::Document
    #
    multi method find (
      BSON::Document :$criteria = BSON::Document.new,
      BSON::Document :$projection?,
      Int :$number-to-skip = 0, Int :$number-to-return = 0,
      Int :$flags = 0
      --> MongoDB::Cursor
    ) {
      my BSON::Document $server-reply = $wire.query(
        self, $criteria, $projection, :$flags, :$number-to-skip,
        :$number-to-return
      );

      return MongoDB::Cursor.new( :collection(self), :$server-reply);
    }
  }
}





=finish

#`{{
    #---------------------------------------------------------------------------
    # Some methods also created in Cursor.pm
    #---------------------------------------------------------------------------
    # Get explanation about given search criteria
    #
    method explain ( Hash $criteria = {} --> Hash ) {
      my Pair @req = '$query' => $criteria, '$explain' => 1;
      my MongoDB::Cursor $cursor = self.find( @req, :number-to-return(1));
      my $docs = $cursor.fetch();
      return $docs;
    }

    #---------------------------------------------------------------------------
    # Aggregate methods
    #---------------------------------------------------------------------------
    #
    multi method group ( Str $reduce-js-func, Str :$key = '',
                         :%initial = {}, Str :$key_js_func = '',
                         :%condition = {}, Str :$finalize = ''
                         --> Hash ) {

      self.group(
        BSON::Javascript.new(:javascript($reduce-js-func)),
        key_js_func => BSON::Javascript.new(:javascript($key_js_func)),
        finalize => BSON::Javascript.new(:javascript($finalize)),
        :$key, :%initial, :%condition
      );
    }

    multi method group ( BSON::Javascript $reduce-js-func,
                         BSON::Javascript :$key_js_func = $!default-js,
                         BSON::Javascript :$finalize = $!default-js,
                         Str :$key = '',
                         Hash :$initial = {},
                         Hash :$condition = {}
                         --> Hash ) {

      my Pair @req = group => {};
      @req[0]<group><ns> = $!name;
      @req[0]<group><initial> = $initial;
      @req[0]<group>{'$reduce'} = $reduce-js-func;
      @req[0]<group><key> = {$key => 1};

      if $key_js_func.has_javascript {
        @req[0]<group><keyf> = $key_js_func;
        @req[0]<group><key>:delete;
      }

      @req[0]<group><condition> = $condition if ?$condition;
      @req[0]<group><finalize> = $finalize if $finalize.has_javascript;

      my $doc = $!database.run-command(@req);

      # Check error and throw X::MongoDB if there is one
      #
      if $doc<ok>.Bool == False {
        die X::MongoDB.new(
          error-text => $doc<errmsg>,
          oper-name => 'group',
          oper-data => @req.perl,
          collection-ns => [~] $!database.name, '.', $!name
        );
      }

      return $doc;
    }

    #---------------------------------------------------------------------------
    #
    multi method map-reduce ( Str:D $map-js-func, Str:D $reduce-js-func,
                              Hash :$out, Str :$finalize, Hash :$criteria,
                              Hash :$sort, Hash :$scope, Int :$limit,
                              Bool :$jsMode = False
                              --> Hash ) {

      self.map-reduce( BSON::Javascript.new(:javascript($map-js-func)),
                       BSON::Javascript.new(:javascript($reduce-js-func)),
                       :finalize(BSON::Javascript.new(:javascript($finalize))),
                       :$out, :$criteria, :$sort, :$scope, :$limit, :$jsMode
                     );
    }

    multi method map-reduce ( BSON::Javascript:D $map-js-func,
                              BSON::Javascript:D $reduce-js-func,
                              BSON::Javascript :$finalize = $!default-js,
                              Hash :$out, Hash :criteria($query), Hash :$sort,
                              Hash :$scope, Int :$limit, Bool :$jsMode = False
                              --> Hash
                            ) {

      my Pair @req = mapReduce => $!name;
      @req.push: (:$query) if ?$query;
      @req.push: (:$sort) if $sort;
      @req.push: (:$limit) if $limit;
      @req.push: (:$finalize) if $finalize.has_javascript;
      @req.push: (:$scope) if $scope;

      @req.push: |(
        :map($map-js-func),
        :reduce($reduce-js-func),
        :$jsMode
      );

      if ?$out {
        @req.push: (:$out);
      }

      else {
        @req.push: (:out(:replace($!name ~ '_MapReduce')));
      }

#say "MPR P: {@req.perl}";
      my Hash $doc = $!database.run-command(@req);

      # Check error and throw X::MongoDB if there is one
      #
      if $doc<ok>.Bool == False {
        die X::MongoDB.new(
          error-text => $doc<errmsg>,
          oper-name => 'map-reduce',
          oper-data => @req.perl,
          collection-ns => [~] $!database.name, '.', $!name
        );
      }

      return $doc;
    }

    #---------------------------------------------------------------------------
    # Index methods
    #---------------------------------------------------------------------------
    # Add indexes for collection
    #
    # Steps done by the mongo shell
    #
    # * Insert a document into a system table <dbname>.system.indexes
    # * Run get-last-error to see result
    #
    # * According to documentation indexes cannot be changed. They must be
    #   deleted first. Therefore check first. drop index if exists then set new
    #   index.
    #
    method ensure-index ( %key-spec!, %options = {} ) {

      # Generate name of index if not given in options
      #
      if %options<name>:!exists {
        my Str $name = '';

        # If no name for the index is set then imitate the default of
        # MongoDB or keyname1_dir1_keyname2_dir2_..._keynameN_dirN.
        #
        for %key-spec.keys -> $k {
            $name ~= [~] ($name ?? '_' !! ''), $k, '_', %key-spec{$k};
        }

        %options<name> = $name;
      }


      # Check if index exists
      #
      my $system-indexes = $!database.collection('system.indexes');
      my $doc = $system-indexes.find-one(%(key => %key-spec));

      # If found do nothing for the moment
      #
      if +$doc {
      }

      # Insert index if not exists
      #
      else {
        my %doc = %( ns => ([~] $!database.name, '.', $!name),
                     key => %key-spec,
                     %options
                   );

        $system-indexes.insert(%doc);

        # Check error and throw X::MongoDB if there is one
        #
        my $error-doc = $!database.get-last-error;
        if $error-doc<err> {
          die X::MongoDB.new(
            error-text => $error-doc<err>,
            error-code => $error-doc<code>,
            oper-name => 'ensure-index',
            oper-data => %doc.perl,
            collection-ns => [~] $!database.name, '.', $!name
          );
        }
      }
    }

    
#-----------------------------------------------------------------------------
    # Drop an index
    #
    method drop-index ( $key-spec! --> Hash ) {
      my Pair @req = deleteIndexes => $!name,
                     index => $key-spec,
                     ;

      my $doc = $!database.run-command(@req);

      # Check error and throw X::MongoDB if there is one
      #
      if $doc<ok>.Bool == False {
        die X::MongoDB.new(
          error-text => $doc<errmsg>,
          oper-name => 'drop-index',
          oper-data => @req.perl,
          collection-ns => [~] $!database.name, '.', $!name
        );
      }

      return $doc;
    }

    
#-----------------------------------------------------------------------------
    # Drop all indexes
    #
    method drop-indexes ( --> Hash ) {
      return self.drop-index('*');
    }

    
#-----------------------------------------------------------------------------
    # Get indexes for the current collection
    #
    method get-indexes ( --> MongoDB::Cursor ) {
      my $system-indexes = $!database.collection('system.indexes');
      return $system-indexes.find(%(ns => [~] $!database.name, '.', $!name));
    }

    
#-----------------------------------------------------------------------------
    # Collection statistics
    
#-----------------------------------------------------------------------------
    # Get collections statistics
    #
    method stats ( Int :$scale = 1, Bool :index-details($indexDetails) = False,
                   Hash :index-details-field($indexDetailsField),
                   Str :index-details-name($indexDetailsName)
                   --> Hash ) {

      my Pair @req = collstats => $!name, options => {:$scale};
      @req[1]<options><indexDetails> = True if $indexDetails;
      @req[1]<options><indexDetailsName> = $indexDetailsName
        if ?$indexDetailsName;
      @req[1]<options><indexDetailsField> = $indexDetailsField
        if ?$indexDetailsField and !?$indexDetailsName; # One or the other

      my $doc = $!database.run-command(@req);

      # Check error and throw X::MongoDB if there is one
      #
      if $doc<ok>.Bool == False {
        die X::MongoDB.new(
          error-text => $doc<errmsg>,
          oper-name => 'stats',
          oper-data => @req.perl,
          collection-ns => [~] $!database.name, '.', $!name
        );
      }

      return $doc;
    }

    
#-----------------------------------------------------------------------------
    # Return size of collection in bytes
    #
    method data-size ( --> Int ) {
      my Hash $doc = self.stats();
      return $doc<size>;
    }

    #---------------------------------------------------------------------------
    #
    method find-and-modify (
      Hash $criteria = { }, Hash $projection = { },
      Hash :$update = { }, Hash :$sort = { },
      Bool :$remove = False, Bool :$new = False,
      Bool :$upsert = False
      --> Hash
    ) {

      my Pair @req = findAndModify => self.name, query => $criteria;
      @req.push: (:$sort) if ?$sort;
      @req.push: (:remove) if $remove;
      @req.push: (:$update) if ?$update;
      @req.push: (:new) if $new;
      @req.push: (:upsert) if $upsert;
      @req.push: (:$projection) if ?$projection;

      my Hash $doc = $!database.run-command(@req);
      if $doc<ok>.Bool == False {
        die X::MongoDB.new(
          error-text => $doc<errmsg>,
          oper-name => 'find-and-modify',
          oper-data => @req.perl,
          collection-ns => [~] $!database.name, '.', $!name
        );
      }

      # Return its value of the status document
      #
      return $doc<value>;
    }

    #---------------------------------------------------------------------------
    # Check keys in documents for insert operations
    # See http://docs.mongodb.org/meta-driver/latest/legacy/bson/
    #
    method !check-doc-keys ( @docs! ) {
      for @docs -> $d {
        die X::MongoDB.new(
          error-text => qq:to/EODIE/,
            Document is not a hash.
            EODIE
          oper-name => 'insert',
          oper-data => @docs.perl,
          collection-ns => [~] $!database.name, '.', $!name
        ) unless $d ~~ Hash;

        for $d.keys -> $k {
          if $k ~~ m/ (^ '$' | '.') / {
            die X::MongoDB.new(
              error-text => qq:to/EODIE/,
                $k is not properly defined.
                Please see 
'http://docs.mongodb.org/meta-driver/latest/legacy/bson/'
                point 1; Data storage
                EODIE
              oper-name => 'insert',
              oper-data => @docs.perl,
              collection-ns => [~] $!database.name, '.', $!name
            );
          }

          elsif $k ~~ m/ ^ '_id' $ / {
            # Check if unique in the document
            my $cursor = self.find( hash( _id => $d{$k}));

            # If there are records(at most one!) this id is not unique
            #
            if $cursor.count {
              die X::MongoDB.new(
                error-text => "$k => $d{$k} value for id is not unique",
                oper-name => 'insert',
                oper-data => @docs.perl,
                collection-ns => [~] $!database.name, '.', $!name
              );
            }
          }

          # Recursively go through sub documents
          #
          elsif $d{$k} ~~ Hash {
            self!cdk($d{$k});
          }
        }
      }
    }

    #---------------------------------------------------------------------------
    #
    method !cdk ( $sub-doc! ) {
      for $sub-doc.keys -> $k {
        if $k ~~ m/ (^ '$' | '.') / {
          die X::MongoDB.new(
            error-text => qq:to/EODIE/,
              $k is not properly defined.
              Please see 
'http://docs.mongodb.org/meta-driver/latest/legacy/bson/'
              point 1; Data storage
              EODIE
            oper-name => 'insert',
            oper-data => $sub-doc.perl,
            collection-ns => [~] $!database.name, '.', $!name
          );
        }

        elsif $sub-doc{$k} ~~ Hash {
          self!cdk($sub-doc{$k});
        }
      }
    }

    #---------------------------------------------------------------------------
    #
    method find-one ( %criteria = { }, %projection = { } --> Hash ) {
      my MongoDB::Cursor $cursor = self.find( %criteria, %projection,
                                              :number-to-return(1)
                                            );
      my $doc = $cursor.fetch();
      return $doc.defined ?? $doc !! %();
    }

    #---------------------------------------------------------------------------
    # Drop collection
    #
    method drop ( --> Hash ) {
      my Pair @req = drop => $!name;
      my $doc = $!database.run-command(@req);
      if $doc<ok>.Bool == False {
        die X::MongoDB.new(
          error-text => $doc<errmsg>,
          oper-name => 'drop',
          oper-data => @req.perl,
          collection-ns => [~] $!database.name, '.', $!name
        );
      }

      return $doc;
    }

    #---------------------------------------------------------------------------
    # Get count of documents depending on criteria
    #
    method count ( Hash $criteria = {} --> Int ) {

      # fields is seen with wireshark
      #
      my Pair @req = count => $!name, query => $criteria, fields => %();
      my $doc = $!database.run-command(@req);

      # Check error and throw X::MongoDB if there is one
      #
      if $doc<ok>.Bool == False {
        die X::MongoDB.new(
          error-text => $doc<errmsg>,
          oper-name => 'count',
          oper-data => @req.perl,
          collection-ns => [~] $!database.name, '.', $!name
        );
      }

      return Int($doc<n>);
    }

    #---------------------------------------------------------------------------
    #
    #---------------------------------------------------------------------------
    # Find distinct values of a field depending on criteria
    #
    method distinct( Str:D $field-name, %criteria = {} --> Array ) {
      my Pair @req = distinct => $!name,
                     key => $field-name,
                     query => %criteria
                     ;

      my $doc = $!database.run-command(@req);

      # Check error and throw X::MongoDB if there is one
      #
      if $doc<ok>.Bool == False {
        die X::MongoDB.new(
          error-text => $doc<errmsg>,
          oper-name => 'distinct',
          oper-data => @req.perl,
          collection-ns => [~] $!database.name, '.', $!name
        );
      }

      # What do we do with $doc<stats> ?
      #
      return $doc<values>.list;
    }

    #---------------------------------------------------------------------------
    #
    method insert ( **@documents, Bool :$continue-on-error = False
    ) is DEPRECATED("run-command\(BSON::Document.new: insert => 
'collection`,...")
    {
#      self!check-doc-keys(@documents);
#      my $flags = +$continue-on-error;
#      $wire.OP-INSERT( self, $flags, @documents);
    }

    #---------------------------------------------------------------------------
    #
    method update (
      Hash %selector, %update!, Bool :$upsert = False,
      Bool :$multi-update = False
    ) is DEPRECATED("run-command\(BSON::Document.new: update => 
'collection`,...")
    {
#      my $flags = +$upsert + +$multi-update +< 1;
#      $wire.OP_UPDATE( self, $flags, %selector, %update);
    }

    #---------------------------------------------------------------------------
    #
    method remove ( %selector = { }, Bool :$single-remove = False
    ) is DEPRECATED("run-command\(BSON::Document.new: update => 
'collection`,...")
    {
#      my $flags = +$single-remove;
#      $wire.OP_DELETE( self, $flags, %selector );
    }

}}
use v6;

#use lib '/home/marcel/Languages/Perl6/Projects/BSON/lib';

use BSON::Document;
use MongoDB::Client;
use MongoDB::Header;

package MongoDB {

  class Wire {

    #---------------------------------------------------------------------------
    # 
    method query (
      $collection, BSON::Document:D $qdoc,
      $projection?, :$flags, :$number-to-skip, :$number-to-return
      --> BSON::Document
    ) {
      # Must clone the document otherwise the MongoDB::Header will be added
      # to the $qdoc even when is copy trait is used.
      #
      my BSON::Document $d = $qdoc.clone;
      $d does MongoDB::Header;

#      my $database = $collection.database;
      my $full-collection-name = $collection.full-collection-name;

      my Buf $encoded-query = $d.encode-query(
        $full-collection-name, $projection,
        :$flags, :$number-to-skip, :$number-to-return
      );

      my MongoDB::Client $client .= new;
      my $connection = $client.select-server;
      $connection.send($encoded-query);

      # Read 4 bytes for int32 response size
      #
      my Buf $size-bytes = $connection.receive(4);
      my Int $response-size = decode-int32( $size-bytes, 0) - 4;

      # Receive remaining response bytes from socket. Prefix it with the already
      # read bytes and decode. Return the resulting document.
      #
      my Buf $server-reply = $size-bytes ~ $connection.receive($response-size);
#$client.close;
#say "SR: ", $server-reply;
# TODO check if requestID matches responseTo
      return $d.decode-reply($server-reply);
    }

    #---------------------------------------------------------------------------
    #
    method get-more ( $cursor --> BSON::Document ) {

      my BSON::Document $d .= new;
      $d does MongoDB::Header;

      my Buf $encoded-get-more = $d.encode-get-more(
        $cursor.full-collection-name, $cursor.id
      );

      my MongoDB::Client $client .= new;
      my $connection = $client.select-server;
      $connection.send($encoded-get-more);

      # Read 4 bytes for int32 response size
      #
      my Buf $size-bytes = $client.receive(4);
      my Int $response-size = decode-int32( $size-bytes, 0) - 4;

      # Receive remaining response bytes from socket. Prefix it with the already
      # read bytes and decode. Return the resulting document.
      #
      my Buf $server-reply = $size-bytes ~ $connection.receive($response-size);
# TODO check if requestID matches responseTo
# TODO check if cursorID matches (if present)
      return $d.decode-reply($server-reply);
    }

    #---------------------------------------------------------------------------
    #
    method kill-cursors ( @cursors where $_.elems > 0 ) {

      my BSON::Document $d .= new;
      $d does MongoDB::Header;

      # Gather the ids only when they are non-zero.i.e. still active.
      #
      my Buf @cursor-ids;
      for @cursors -> $cursor {
        @cursor-ids.push($cursor.id) if [+] $cursor.id.list;
      }

      # Kill the cursors if found any
      #
      my $client = MongoDB::Client.new;
      my $connection = $client.select-server;
      if +@cursor-ids {
        my Buf $encoded-kill-cursors = $d.encode-kill-cursors(@cursor-ids);
        $connection.send($encoded-kill-cursors);
      }
    }
  }
}



=finish

#`{{
    #---------------------------------------------------------------------------
    #
    method OP_INSERT (
      $collection, Int $flags, *@documents --> Nil
    ) is DEPRECATED('OP-INSERT') {

      self.OP-INSERT( $collection, $flags, @documents);
    }

    method OP-INSERT ( $collection, Int $flags, *@documents --> Nil ) {
      # 
http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPINSERT

      my Buf $B-OP-INSERT = [~]

        # int32 flags
        # bit vector
        #
        encode-int32($flags),

        # cstring fullCollectionName
        # "dbname.collectionname"
        #
        encode-cstring($collection.full.collection-name);

      # document* documents
      # one or more documents to insert into the collection
      #
      for @documents -> $document {
        $B-OP-INSERT ~= self.encode-document($document);
      }

      # MsgHeader header
      # standard message header
      #
      my Buf $msg-header = self!enc-msg-header( $B-OP-INSERT.elems, 
C-OP-INSERT);

      # send message without waiting for response
      #
      $collection.database.client.send( $msg-header ~ $B-OP-INSERT, False);
    }
}}
#`{{
    #---------------------------------------------------------------------------
    #
    method OP_KILL_CURSORS ( *@cursors --> Nil ) is 
DEPRECATED('OP-KILL-CURSORS') {
      self.OP-KILL-CURSORS(@cursors);
    }
}}
#`{{
    method OP-KILL-CURSORS ( *@cursors --> Nil ) {
      # 
http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPKILLCURSORS

      my Buf $B-OP-KILL_CURSORS = [~]

        # int32 ZERO
        # 0 - reserved for future use
        #
        encode-int32(0),

        # int32 numberOfCursorIDs
        # number of cursorIDs in message
        #
        encode-int32(+@cursors);

      # int64* cursorIDs
      # sequence of cursorIDs to close
      #
      for @cursors -> $cursor {
        $B-OP-KILL_CURSORS ~= $cursor.id;
      }

      # MsgHeader header
      # standard message header
      #
      my Buf $msg-header = self!enc-msg-header(
        $B-OP-KILL_CURSORS.elems,
        BSON::C-OP-KILL-CURSORS
      );

      # send message without waiting for response
      #
      @cursors[0].collection.database.client.send( $msg-header ~ 
$B-OP-KILL_CURSORS, False);
    }
}}
#`{{
    #---------------------------------------------------------------------------
    #
    method OP_UPDATE (
      $collection, Int $flags, %selector, %update
      --> Nil
    ) is DEPRECATED('OP-UPDATE') {

      self.OP-UPDATE( $collection, $flags, %selector, %update);
    }

    method OP-UPDATE ( $collection, Int $flags, %selector, %update --> Nil ) {
      # 
http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPUPDATE

      my Buf $B-OP-UPDATE = [~]

        # int32 ZERO
        # 0 - reserved for future use
        #
        encode-int32(0),

        # cstring fullCollectionName
        # "dbname.collectionname"
        #
        encode-cstring($collection.full-collection-name),

        # int32 flags
        # bit vector
        #
        encode-int32($flags),

        # document selector
        # query object
        #
        self.encode-document(%selector),

        # document update
        # specification of the update to perform
        #
        self.encode-document(%update);

      # MsgHeader header
      # standard message header
      #
      my Buf $msg-header = self!enc-msg-header(
        $B-OP-UPDATE.elems, C-OP-UPDATE
      );

      # send message without waiting for response
      #
      $collection.database.client.send( $msg-header ~ $B-OP-UPDATE, False);
    }
}}
#`{{
    #---------------------------------------------------------------------------
    #
    method OP_DELETE (
      $collection, Int $flags, %selector
      --> Nil
    ) is DEPRECATED('OP-DELETE') {

      self.OP-DELETE( $collection, $flags, %selector);
    }

    method OP-DELETE ( $collection, Int $flags, %selector --> Nil ) {
      # 
http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPDELETE

      my Buf $B-OP-DELETE = [~]

        # int32 ZERO
        # 0 - reserved for future use
        #
        encode-int32(0),

        # cstring fullCollectionName
        # "dbname.collectionname"
        #
        encode-cstring($collection.full-collection-name),

        # int32 flags
        # bit vector
        #
        encode-int32($flags),

        # document selector
        # query object
        #
        self.encode-document(%selector);

      # MsgHeader header
      # standard message header
      #
      my Buf $msg-header = self!enc-msg-header(
        $B-OP-DELETE.elems, C-OP-DELETE
      );

      # send message without waiting for response
      #
      $collection.database.client.send( $msg-header ~ $B-OP-DELETE, False);
    }
}}

Attachment: 100-Connection.t
Description: Perl program

Reply via email to