Hi Everyone!

I am very new to qpsmtpd and I am trying to write my first plugin.

For starters I want to use qpsmtpd-async instead of the other options
available.  Because of this I have been learning about how qpsmtpd
handles polling within the code.  It looks like certain plugins are
available as async safe such as async/queue/smtp-forward.  I have used
this module as a basis for writing my plugin.

My plugin is designed to initially at startup fork a blocking MySQL
client handler.  Because MySQL client libs are always blocking it is
necessary to have a handler process that can remain blocking, while
the main qmsmtpd process can send commands/queries to handler process
in non blocking mode.  Before the fork I have created a socketpair()
for communication with the handler process.  I have also performed a
$self->SUPER::new($mysock) and used watch_write and watch_read
functions appropriately.

My handler process and socket communication works well but only when I
store my socket in transaction->notes(), the problem with is I lose my
file handle after the first email.  I have attempted to store the
object globally but this generates epolling errors.

Can I store the object in the qp object globally somehow so I can
maintain the object for every email?

Are you aware of any other plugins that behaves closely to what I am
trying to achieve?

I have pasted the test plugin below:

#!/usr/bin/perl -w

use Data::Dumper;
use Socket;


use Qpsmtpd::Constants;
my $fhChild;
my @cmdqueue;
#my $myClass;



sub rcptChild {
        my($fhParent,$self) = @_;
        select($fhParent);$|++;

        $self->log(LOGWARN,"rcptChild() starting PID: $$");

        while(1) {
                my $cmd = <$fhParent>;
                chomp $cmd;
                $self->log(LOGWARN,"rcptChild() got cmd: $cmd");
                sleep 5;
                $self->log(LOGWARN,"rcptChild() sending response");
                print $fhParent "YOUR-DATA\n";
        }
}

sub init {
    my ($self, $qp, @args) = @_;

$self->log(LOGWARN,"init() ");
#$self->log(LOGWARN,"init() dump ".Dumper $qp->config);
#$self->connection->notes('fhChild',1);
#$self->{'fhChild'} = 555;
#return;

        if($fhChild && fileno($fhChild) >= 0) {
                $self->log(LOGWARN,"init() AGAIN! skipping ".fileno
($fhChild));

                return;
        }
        my $fhParent;
#       for(my $x = 0;$x<=15;$x++) {
#               my $x;
#               my $y;
#               socketpair($x, $y, AF_UNIX, SOCK_STREAM, PF_UNSPEC) or
die "socketpair: $!";
#
#       }

        socketpair($fhChild, $fhParent, AF_UNIX, SOCK_STREAM,
PF_UNSPEC) or die "socketpair: $!";
        select($fhChild);$|++;

$self->log(LOGWARN,"Socket for child  is now " . fileno $self->
{'fhChild'});

        my $pid;
        if(($pid = fork) == -1) {
                die "Fork failure!";
        }elsif ($pid == 0) {
                close($fhChild);
                rcptChild($fhParent,$self);
                exit;
        }else{

        }

    my $qp = $self->qp;
#       $self->{'myClass'} =

#    my MyRcptChecker $self = shift;
#       $myClass = MyRcptChecker->new(undef, undef, undef, $fhChild);
        $self->qp->{'myClass'} = MyRcptChecker->new(undef, undef,
undef, $fhChild);
}

#sub hook_config {
 #   my ($self, $transaction) = @_;

#$self->log(LOGWARN,"hook_config() !!!!!!!!!!");

#}
sub hook_rcpt {
    my ($self, $transaction) = @_;

$self->log(LOGWARN,"hook_rcpt() !!!!!!!!!!");
$self->log(LOGWARN,"hook_rcpt() !!!!!!!!!! ".Dumper $self->qp);
#       write($fhChild,"this cmd");

#    my $qp = $self->qp;
#    my $SERVER = $self->{_smtp_server};
 #   my $PORT   = $self->{_smtp_port};

        push(@cmdqueue,"RCPT blah");
#       $myClass->write("DO RCPD\n");

#print $fhChild "yoooooooooo\n";
#my $read = <$fhChild>;
#sleep 6;
#my $read2 = <$fhChild>;
#$self->log(LOGWARN,"hook_rcpt() !!!!!!!!!! READ $read $read2");

#       $self->qp->{'myClass->watch_read(1);
        my $myClass = $self->qp->{'myClass'};
#       $self->qp->{'myClass'}
        $myClass->watch_write(1);

#       if($transaction->{'_notes'}->{'async_sender'}->{'state'}) {
#               $self->log(LOGWARN,"RESPONDING WITH ". $transaction->
{'_notes'}->{'async_sender'}->{'myResult'});
#               return $transaction->{'_notes'}->{'async_sender'}->
{'myResult'};
#       }

#$self->log(LOGWARN,"state is: " . $transaction->{'_notes'}->
{'async_sender'}->{state});

#       my $thisFh = $self->{'fhChild'};
 #   $self->log(LOGWARN, "hook_rcpt() !!!!!!!!!!!! fileno ".fileno
$thisFh);

#                           my ($self, $qp, $pkg, $transaction,
$mysock) = @_;
#    $transaction->notes('async_sender',
#        MyRcptChecker->new(undef, $qp, $self, $transaction,$thisFh)
#    );

#$self->log(LOGWARN,"Dumper is " . Dumper $transaction);

    return YIELD;
}

sub finish_queue {
    my ($self, $transaction) = @_;

    my $sender = $transaction->notes('async_sender');
    $transaction->notes('async_sender', undef);

    my ($rc, $msg) = $sender->results;

    return $rc, $msg;
}

package MyRcptChecker;

use IO::Socket;

use base qw(Danga::Socket);
use fields qw(
    qp
    pkg
    tran
    state
    rcode
    rmsg
    buf
    command
    resp
    to
    myResult
    );

use constant ST_CONNECTING => 0;
use constant ST_CONNECTED  => 1;
use constant ST_COMMANDS   => 2;
use constant ST_DATA       => 3;

use Qpsmtpd::Constants;

sub new {
#    my ($self, undef, $qp, $pkg, $transaction,$sock) = @_;
    my ($self, undef, $qp, $pkg, $sock) = @_;

#$pkg->log(LOGWARN,"sock is going ot be below0");
#$pkg->log(LOGWARN,"sock wis going ot be below ".fileno $sock);
#$pkg->log(LOGWARN,"sock is going ot be below ".Dumper $mysock);
#my $sock;
#$self->log(LOGWARN,"sock is " . fileno $sock);
    my $self = fields::new($self) unless ref $self;

#    my $sock = IO::Socket::INET->new(
 #       PeerAddr => 'localhost',
  #      PeerPort => '7070',
   #     Blocking  => 0,
   # ) or die "Error connecting to server $server:$port : $!\n";

#$self->log("trying to log here");
    IO::Handle::blocking($sock, 0);
    binmode($sock, ':raw');

    $self->{qp} = $qp;
#    $self->{pkg} = $pkg;
#    $self->{tran} = $transaction;
    $self->{state} = ST_CONNECTING;
    $self->{rcode} = DECLINED;
    $self->{command} = 'connect';
    $self->{buf} = '';
    $self->{resp} = [];
    # copy the recipients so we can pop them off one by one
#    $self->{to} = [ $transaction->recipients ];

    $self->SUPER::new($sock);
    # Watch for write first, this is when the TCP session is
established.

    $self->watch_write(1);

    return $self;
}

sub results {
    my MyRcptChecker $self = shift;
    return ( $self->{rcode}, $self->{rmsg} );
}

sub log {
    my MyRcptChecker $self = shift;
#    $self->{qp}->log(@_);
}

sub cont {
    my MyRcptChecker $self = shift;
$self->log(LOGWARN,"cont() here!!!!!");
    $self->{qp}->run_continuation;

}

sub command {
    my MyRcptChecker $self = shift;
    my ($command, $params) = @_;
    $params ||= '';

    $self->log(LOGDEBUG, ">> $command $params");

    $self->write(($command =~ m/ / ? "$command:" : $command)
      . ($params ? " $params" : "") . "\r\n");
    $self->watch_read(1);
    $self->{command} = ($command =~ /(\S+)/)[0];
}


sub datasend {
    my MyRcptChecker $self = shift;
    my ($data) = @_;
    $data =~ s/^\./../mg;
    $self->write(\$data);
}

sub event_read {
    my MyRcptChecker $self = shift;

$self->log(LOGWARN,"event_read() ");

    if ($self->{state} == ST_CONNECTED) {
        $self->{state} = ST_COMMANDS;
    }

    if ($self->{state} == ST_COMMANDS) {
        my $in = $self->read(1024);
        if (!$in) {
                $self->log(LOGERROR, "CONNECTION CLOSED RESPONDING");
            # XXX: connection closed
#            $self->close("lost connection");
        }

        my @lines = split /\r?\n/, $self->{buf} . $$in, -1;
        $self->{buf} = delete $lines[-1];

        for(@lines) {
                $self->log(LOGERROR, "Response line: $_ response might
be".OK);
                $self->{rmsg} = "Error from upstream SMTP server";
                #$self->write("..............\n");
                $self->watch_read(0);
                $self->{'myResult'} = OK;
                #$self->close;
                #$self->cont;
        }
    }
    else {
        $self->log(LOGERROR, "SMTP Session occurred out of order");
#        $self->close;
        $self->cont;
    }
}

sub event_write {
    my MyRcptChecker $self = shift;

$self->log(LOGWARN,"event_write() ");

 #   if ($self->{state} == ST_CONNECTING) {
        $self->watch_write(0);
        $self->{state} = ST_CONNECTED;


        my $didWrite = 0;
        foreach my $x (@cmdqueue) {
                $self->log(LOGWARN,"sending cmd x: $x ".$cmdqueue
[$x]);
                $self->write($cmdqueue[$x]."\n");
                shift @cmdqueue;
                $didWrite = 1;
        }
        #if($didWrite != 0) {
                $self->watch_read(1);
        #}
        #$self->write("22222222222222\n");
#    }
#    elsif (0 && $self->{state} == ST_DATA) {
 #       # send more data
  #      if (my $line = $self->{tran}->body_getline) {
   #         $self->log(LOGDEBUG, ">> $line");
    #        $line =~ s/\r?\n/\r\n/;
     #       $self->datasend($line);
      #  }
#        else {
 #           # no more data.
  #          $self->log(LOGINFO, "No more data");
   #         $self->watch_write(0);
    #        $self->{state} = ST_COMMANDS;
#        }
 #   }
#    else {
 #       $self->write(undef);
#    }
}

sub event_err {
    my ($self) = @_;

$self->log(LOGWARN,"event_err() ");

    eval { $self->read(1); }; # gives us the correct error in errno
    $self->{rmsg} = "Read error from remote server: $!";
    #print "lost connection: $!\n";
#    $self->close;
    $self->cont;
}

sub event_hup {
    my ($self) = @_;
$self->log(LOGWARN,"event_hup() ");

    eval { $self->read(1); }; # gives us the correct error in errno
    $self->{rmsg} = "HUP error from remote server: $!";
    #print "lost connection: $!\n";
#    $self->close;
    $self->cont;
}

Reply via email to