Hi Everyone! I am very new to qpsmtpd and I am trying to write my first plugin.
For starters I want to use qpsmtpd-async instead of the other options available. Because of this I have been learning about how qpsmtpd handles polling within the code. It looks like certain plugins are available as async safe such as async/queue/smtp-forward. I have used this module as a basis for writing my plugin. My plugin is designed to initially at startup fork a blocking MySQL client handler. Because MySQL client libs are always blocking it is necessary to have a handler process that can remain blocking, while the main qmsmtpd process can send commands/queries to handler process in non blocking mode. Before the fork I have created a socketpair() for communication with the handler process. I have also performed a $self->SUPER::new($mysock) and used watch_write and watch_read functions appropriately. My handler process and socket communication works well but only when I store my socket in transaction->notes(), the problem with is I lose my file handle after the first email. I have attempted to store the object globally but this generates epolling errors. Can I store the object in the qp object globally somehow so I can maintain the object for every email? Are you aware of any other plugins that behaves closely to what I am trying to achieve? I have pasted the test plugin below: #!/usr/bin/perl -w use Data::Dumper; use Socket; use Qpsmtpd::Constants; my $fhChild; my @cmdqueue; #my $myClass; sub rcptChild { my($fhParent,$self) = @_; select($fhParent);$|++; $self->log(LOGWARN,"rcptChild() starting PID: $$"); while(1) { my $cmd = <$fhParent>; chomp $cmd; $self->log(LOGWARN,"rcptChild() got cmd: $cmd"); sleep 5; $self->log(LOGWARN,"rcptChild() sending response"); print $fhParent "YOUR-DATA\n"; } } sub init { my ($self, $qp, @args) = @_; $self->log(LOGWARN,"init() "); #$self->log(LOGWARN,"init() dump ".Dumper $qp->config); #$self->connection->notes('fhChild',1); #$self->{'fhChild'} = 555; #return; if($fhChild && fileno($fhChild) >= 0) { $self->log(LOGWARN,"init() AGAIN! skipping ".fileno ($fhChild)); return; } my $fhParent; # for(my $x = 0;$x<=15;$x++) { # my $x; # my $y; # socketpair($x, $y, AF_UNIX, SOCK_STREAM, PF_UNSPEC) or die "socketpair: $!"; # # } socketpair($fhChild, $fhParent, AF_UNIX, SOCK_STREAM, PF_UNSPEC) or die "socketpair: $!"; select($fhChild);$|++; $self->log(LOGWARN,"Socket for child is now " . fileno $self-> {'fhChild'}); my $pid; if(($pid = fork) == -1) { die "Fork failure!"; }elsif ($pid == 0) { close($fhChild); rcptChild($fhParent,$self); exit; }else{ } my $qp = $self->qp; # $self->{'myClass'} = # my MyRcptChecker $self = shift; # $myClass = MyRcptChecker->new(undef, undef, undef, $fhChild); $self->qp->{'myClass'} = MyRcptChecker->new(undef, undef, undef, $fhChild); } #sub hook_config { # my ($self, $transaction) = @_; #$self->log(LOGWARN,"hook_config() !!!!!!!!!!"); #} sub hook_rcpt { my ($self, $transaction) = @_; $self->log(LOGWARN,"hook_rcpt() !!!!!!!!!!"); $self->log(LOGWARN,"hook_rcpt() !!!!!!!!!! ".Dumper $self->qp); # write($fhChild,"this cmd"); # my $qp = $self->qp; # my $SERVER = $self->{_smtp_server}; # my $PORT = $self->{_smtp_port}; push(@cmdqueue,"RCPT blah"); # $myClass->write("DO RCPD\n"); #print $fhChild "yoooooooooo\n"; #my $read = <$fhChild>; #sleep 6; #my $read2 = <$fhChild>; #$self->log(LOGWARN,"hook_rcpt() !!!!!!!!!! READ $read $read2"); # $self->qp->{'myClass->watch_read(1); my $myClass = $self->qp->{'myClass'}; # $self->qp->{'myClass'} $myClass->watch_write(1); # if($transaction->{'_notes'}->{'async_sender'}->{'state'}) { # $self->log(LOGWARN,"RESPONDING WITH ". $transaction-> {'_notes'}->{'async_sender'}->{'myResult'}); # return $transaction->{'_notes'}->{'async_sender'}-> {'myResult'}; # } #$self->log(LOGWARN,"state is: " . $transaction->{'_notes'}-> {'async_sender'}->{state}); # my $thisFh = $self->{'fhChild'}; # $self->log(LOGWARN, "hook_rcpt() !!!!!!!!!!!! fileno ".fileno $thisFh); # my ($self, $qp, $pkg, $transaction, $mysock) = @_; # $transaction->notes('async_sender', # MyRcptChecker->new(undef, $qp, $self, $transaction,$thisFh) # ); #$self->log(LOGWARN,"Dumper is " . Dumper $transaction); return YIELD; } sub finish_queue { my ($self, $transaction) = @_; my $sender = $transaction->notes('async_sender'); $transaction->notes('async_sender', undef); my ($rc, $msg) = $sender->results; return $rc, $msg; } package MyRcptChecker; use IO::Socket; use base qw(Danga::Socket); use fields qw( qp pkg tran state rcode rmsg buf command resp to myResult ); use constant ST_CONNECTING => 0; use constant ST_CONNECTED => 1; use constant ST_COMMANDS => 2; use constant ST_DATA => 3; use Qpsmtpd::Constants; sub new { # my ($self, undef, $qp, $pkg, $transaction,$sock) = @_; my ($self, undef, $qp, $pkg, $sock) = @_; #$pkg->log(LOGWARN,"sock is going ot be below0"); #$pkg->log(LOGWARN,"sock wis going ot be below ".fileno $sock); #$pkg->log(LOGWARN,"sock is going ot be below ".Dumper $mysock); #my $sock; #$self->log(LOGWARN,"sock is " . fileno $sock); my $self = fields::new($self) unless ref $self; # my $sock = IO::Socket::INET->new( # PeerAddr => 'localhost', # PeerPort => '7070', # Blocking => 0, # ) or die "Error connecting to server $server:$port : $!\n"; #$self->log("trying to log here"); IO::Handle::blocking($sock, 0); binmode($sock, ':raw'); $self->{qp} = $qp; # $self->{pkg} = $pkg; # $self->{tran} = $transaction; $self->{state} = ST_CONNECTING; $self->{rcode} = DECLINED; $self->{command} = 'connect'; $self->{buf} = ''; $self->{resp} = []; # copy the recipients so we can pop them off one by one # $self->{to} = [ $transaction->recipients ]; $self->SUPER::new($sock); # Watch for write first, this is when the TCP session is established. $self->watch_write(1); return $self; } sub results { my MyRcptChecker $self = shift; return ( $self->{rcode}, $self->{rmsg} ); } sub log { my MyRcptChecker $self = shift; # $self->{qp}->log(@_); } sub cont { my MyRcptChecker $self = shift; $self->log(LOGWARN,"cont() here!!!!!"); $self->{qp}->run_continuation; } sub command { my MyRcptChecker $self = shift; my ($command, $params) = @_; $params ||= ''; $self->log(LOGDEBUG, ">> $command $params"); $self->write(($command =~ m/ / ? "$command:" : $command) . ($params ? " $params" : "") . "\r\n"); $self->watch_read(1); $self->{command} = ($command =~ /(\S+)/)[0]; } sub datasend { my MyRcptChecker $self = shift; my ($data) = @_; $data =~ s/^\./../mg; $self->write(\$data); } sub event_read { my MyRcptChecker $self = shift; $self->log(LOGWARN,"event_read() "); if ($self->{state} == ST_CONNECTED) { $self->{state} = ST_COMMANDS; } if ($self->{state} == ST_COMMANDS) { my $in = $self->read(1024); if (!$in) { $self->log(LOGERROR, "CONNECTION CLOSED RESPONDING"); # XXX: connection closed # $self->close("lost connection"); } my @lines = split /\r?\n/, $self->{buf} . $$in, -1; $self->{buf} = delete $lines[-1]; for(@lines) { $self->log(LOGERROR, "Response line: $_ response might be".OK); $self->{rmsg} = "Error from upstream SMTP server"; #$self->write("..............\n"); $self->watch_read(0); $self->{'myResult'} = OK; #$self->close; #$self->cont; } } else { $self->log(LOGERROR, "SMTP Session occurred out of order"); # $self->close; $self->cont; } } sub event_write { my MyRcptChecker $self = shift; $self->log(LOGWARN,"event_write() "); # if ($self->{state} == ST_CONNECTING) { $self->watch_write(0); $self->{state} = ST_CONNECTED; my $didWrite = 0; foreach my $x (@cmdqueue) { $self->log(LOGWARN,"sending cmd x: $x ".$cmdqueue [$x]); $self->write($cmdqueue[$x]."\n"); shift @cmdqueue; $didWrite = 1; } #if($didWrite != 0) { $self->watch_read(1); #} #$self->write("22222222222222\n"); # } # elsif (0 && $self->{state} == ST_DATA) { # # send more data # if (my $line = $self->{tran}->body_getline) { # $self->log(LOGDEBUG, ">> $line"); # $line =~ s/\r?\n/\r\n/; # $self->datasend($line); # } # else { # # no more data. # $self->log(LOGINFO, "No more data"); # $self->watch_write(0); # $self->{state} = ST_COMMANDS; # } # } # else { # $self->write(undef); # } } sub event_err { my ($self) = @_; $self->log(LOGWARN,"event_err() "); eval { $self->read(1); }; # gives us the correct error in errno $self->{rmsg} = "Read error from remote server: $!"; #print "lost connection: $!\n"; # $self->close; $self->cont; } sub event_hup { my ($self) = @_; $self->log(LOGWARN,"event_hup() "); eval { $self->read(1); }; # gives us the correct error in errno $self->{rmsg} = "HUP error from remote server: $!"; #print "lost connection: $!\n"; # $self->close; $self->cont; }