Subject: | [PATCH] MIME/Decoder.pm (filter): use select() for IO multiplexing |
Date: | Sun, 29 Oct 2006 00:32:16 +0400 |
To: | bug-mime-tools [...] rt.cpan.org |
From: | Alexey Tourbin <at [...] altlinux.ru> |
Before now, the filter() code was crap. The problem is that open2() does
only one half of the work, i.e. creating pipes and connecting descriptors.
Another half, which is doing reads and writes, cannot be done in a dumb
"first write all, then read all" manner. It works only in simplest cases,
when buffers are large enough to hold the stuff. Actually it is
deadlock-prone.
The following script, which I call "filtertest.pl", can demonstrate the
deadlock.
require MIME::Decoder;
require MIME::Decoder::Gzip64;
install MIME::Decoder::Gzip64 'x-gzip64';
my $decoder = MIME::Decoder->new('x-gzip64');
$decoder->encode(\*STDIN, \*STDOUT);
In a simple case, it works:
$ echo 123 |perl filtertest.pl
H4sIAN6AQ0UAAzM0MuYCAAj9gloEAAAA
$
However, it blocks under heavy load:
$ cat /usr/lib/perl5/pod/*.pod |perl -d:Trace filtertest.pl
...
Show quoted text
>> /usr/lib/perl5/vendor_perl/IO/Wrap.pm:68: my $self = shift;
>> /usr/lib/perl5/vendor_perl/IO/Wrap.pm:69: return read($$self, $_[0], $_[1]);
>> /usr/lib/perl5/vendor_perl/MIME/Decoder.pm:438: while ($in->read($buf, 2048)) { print CHILDIN $buf }
>> /usr/lib/perl5/vendor_perl/IO/Wrap.pm:68: my $self = shift;
>> /usr/lib/perl5/vendor_perl/IO/Wrap.pm:69: return read($$self, $_[0], $_[1]);
>> /usr/lib/perl5/vendor_perl/MIME/Decoder.pm:438: while ($in->read($buf, 2048)) { print CHILDIN $buf }
>> /usr/lib/perl5/vendor_perl/IO/Wrap.pm:68: my $self = shift;
>> /usr/lib/perl5/vendor_perl/IO/Wrap.pm:69: return read($$self, $_[0], $_[1]);
>> /usr/lib/perl5/vendor_perl/MIME/Decoder.pm:438: while ($in->read($buf, 2048)) { print CHILDIN $buf }
(deadlock)
...
What happens here is that gzip has cooked up its stuff and tries to flush
it. Before gzip does that, it is not going to read its input any further.
However, we are not going to check gzip output, too, because we are doing
"first write all" thing. That's deadlock.
The right thing to do is to use select() for IO multiplexing, i.e. to
intermix reads and writes as needed. This is all mentioned in IPC::Open2
and IPC::Open3 documentation.
This patch does the right thing. After CHILDIN and CHOULDOUT descriptors
are connected, I set up read/write descriptor sets and excercise select()
loop. When writing to CHILDIN, I also install SIGPIPE handler so as to
protect me from being killed if the child exits prematurely. But SIGPIPE
handler only issues a warning. Ultimately I check $? exit status, and when
it is non-zero, it's then I die.
The above test case now works.
---
lib/MIME/Decoder.pm | 59 ++++++++++++++++++++++++++++++++++++++++++--------
1 files changed, 49 insertions(+), 10 deletions(-)
diff --git a/lib/MIME/Decoder.pm b/lib/MIME/Decoder.pm
index 0ea35f2..4899ee7 100644
--- a/lib/MIME/Decoder.pm
+++ b/lib/MIME/Decoder.pm
@@ -436,18 +436,57 @@ sub filter {
### Open pipe:
STDOUT->flush; ### very important, or else we get duplicate output!
- my $kidpid = open2(\*CHILDOUT, \*CHILDIN, @cmd) || die "open2 failed: $!";
-
- ### Write all:
- while ($in->read($buf, 2048)) { print CHILDIN $buf }
- close \*CHILDIN;
-
- ### Read all:
- while (read(\*CHILDOUT, $buf, 2048)) { $out->print($buf) }
- close \*CHILDOUT;
+ local (*CHILDOUT, *CHILDIN);
+ my $kidpid = open2(\*CHILDOUT, \*CHILDIN, @cmd) || die "@cmd: open2 failed: $!";
+
+ ### We have to use select() for doing both reading and writing.
+ my $rno = fileno(CHILDOUT);
+ my $wno = fileno(CHILDIN);
+ vec(my $rfds='', $rno, 1) = 1;
+ vec(my $wfds='', $wno, 1) = 1;
+
+ while (1) {
+ ### Wait for one hour; if that fails, it's too bad.
+ my $n = select(my $rout=$rfds, my $wout=$wfds, undef, 3600);
+ if ($n <= 0) {
+ kill 1, $kidpid;
+ waitpid $kidpid, 0;
+ die "@cmd: select failed: $!" if $n < 0;
+ die "@cmd: select timeout" if $n == 0;
+ }
+ ### If can read from child:
+ if ($rout && vec($rout, $rno, 1)) {
+ if (sysread(CHILDOUT, my $buf, 1024)) {
+ $out->print($buf);
+ }
+ else {
+ close CHILDOUT;
+ undef $rfds;
+ }
+ }
+ ### If can write to child:
+ if ($wout && vec($wout, $wno, 1)) {
+ if ($in->read(my $buf, 1024)) {
+ local $SIG{PIPE} = sub {
+ warn "got SIGPIPE from @cmd";
+ close CHILDIN;
+ undef $wfds;
+ };
+ syswrite(CHILDIN, $buf);
+ }
+ else {
+ close CHILDIN;
+ undef $wfds;
+ }
+ }
+ ### If both CHILDOUT and CHILDIN are done:
+ last unless $rfds || $wfds;
+ }
### Wait for it:
- waitpid($kidpid,0) or die "couldn't reap child $kidpid";
+ waitpid($kidpid,0) == $kidpid or die "@cmd: couldn't reap child $kidpid";
+ ### Check if it failed:
+ $? == 0 or die "@cmd: bad exit status: \$? = $?";
1;
}
--
1.4.3.GIT