Skip Menu |

This queue is for tickets about the POE CPAN distribution.

Report information
The Basics
Id: 73126
Status: resolved
Priority: 0/
Queue: POE

People
Owner: Nobody in particular
Requestors: hinrik.sig [...] gmail.com
Cc:
AdminCc:

Bug Information
Severity: (no value)
Broken in: (no value)
Fixed in: (no value)



Subject: SIGCHLD sometimes not delivered
A test of mine for POE::Quickie was hanging sometimes due to a SIGCHLD not being delivered, so I decided to write a pure-POE test to track down the problem. I have attached the test file to this bug report. I cannot simplify it any further because the act of removing ANY code from the test -- however irrelevant (e.g. the Exporter code) -- reduces the incidence of failure. Try running it in a loop, like so: for i in {1..1000}; do prove sigchld-race.t; done On my laptop, the hangs after 10 seconds on average. The problem persists on every Perl version I've tried (Perl 5.8.9, 5.10.1, 5.12.2 and 5.14.0).
The test.
Subject: sigchld-race.t
use strict; use warnings FATAL => 'all'; use POE; use Test::More tests => 2; { package WheelWrapper; use strict; use warnings FATAL => 'all'; use Carp 'croak'; use POE; use POE::Filter::Stream; use POE::Wheel::Run; require Exporter; use base 'Exporter'; our @EXPORT = qw(quickie_run quickie quickie_merged quickie_tee quickie_tee_merged); our @EXPORT_OK = @EXPORT; our %EXPORT_TAGS = (ALL => [@EXPORT]); our %OBJECTS; sub new { my ($package, %args) = @_; my $parent_id = $poe_kernel->get_active_session->ID; if (my $self = $OBJECTS{$parent_id}) { return $self; } my $self = bless \%args, $package; $self->{parent_id} = $parent_id; $OBJECTS{$parent_id} = $self; return $self; } sub _create_session { my ($self) = @_; POE::Session->create( object_states => [ $self => [qw( _start _stop _exception _create_wheel _child_signal _child_timeout _child_stdin _child_stdout _child_stderr _killall )], ], options => { ($self->{debug} ? (debug => 1) : ()), ($self->{default} ? (default => 1) : ()), ($self->{trace} ? (trace => 1) : ()), }, ); return; } sub _start { my ($kernel, $session, $self) = @_[KERNEL, SESSION, OBJECT]; my $session_id = $session->ID; $self->{session_id} = $session_id; $kernel->sig(DIE => '_exception'); return; } sub _stop { my $self = $_[OBJECT]; delete $self->{session_id}; return; } sub run { my ($self, %args) = @_; croak 'Program parameter not supplied' if !defined $args{Program}; if ($args{AltFork} && ref $args{Program}) { croak 'Program must be a string when AltFork is enabled'; } if ($args{AltFork} && $^O eq 'Win32') { croak 'AltFork does not currently work on Win32'; } $self->_create_session() if !defined $self->{session_id}; my ($exception, $wheel) = $poe_kernel->call($self->{session_id}, '_create_wheel', \%args); # propagate possible exception from POE::Wheel::Run->new() croak $exception if $exception; return $wheel->PID; } sub _create_wheel { my ($kernel, $self, $args) = @_[KERNEL, OBJECT, ARG0]; my %data; for my $arg (qw(AltFork Timeout Input Program Context ProgramArgs StdoutEvent StderrEvent ExitEvent ResultEvent Tee Merged)) { next if !exists $args->{$arg}; $data{$arg} = delete $args->{$arg}; } if ($data{AltFork}) { my @inc = map { +'-I' => $_ } @INC; $data{Program} = [$^X, @inc, '-e', $data{Program}]; } my $wheel; eval { $wheel = POE::Wheel::Run->new( StdinFilter => POE::Filter::Stream->new(), StdinEvent => '_child_stdin', StdoutEvent => '_child_stdout', StderrEvent => '_child_stderr', Program => $data{Program}, (defined $data{ProgramArgs} ? (ProgramArgs => $data{ProgramArgs}) : () ), ($^O ne 'Win32' ? (CloseOnCall => 1) : () ), %$args, ); }; if ($@) { chomp $@; return $@; } $data{obj} = $wheel; $data{extra_args} = $args; $self->{wheels}{$wheel->ID} = \%data; if (defined $data{Input}) { $wheel->put($data{Input}); } else { $wheel->shutdown_stdin(); } if ($data{Timeout}) { $data{alrm} = $kernel->delay_set('_child_timeout', $data{Timeout}, $wheel->ID); } $kernel->sig_child($wheel->PID, '_child_signal'); return (undef, $wheel); } sub _exception { my ($kernel, $self, $ex) = @_[KERNEL, OBJECT, ARG1]; chomp $ex->{error_str}; warn __PACKAGE__.": Event $ex->{event} in session " .$ex->{dest_session}->ID." raised exception:\n $ex->{error_str}\n"; $kernel->sig_handled(); return; } sub _child_signal { my ($kernel, $self, $pid, $status) = @_[KERNEL, OBJECT, ARG1, ARG2]; my $id = $self->_pid_to_id($pid); my $data = $self->{wheels}{$id}; my $s = $status >> 8; if ($s != 0 && !exists $data->{ExitEvent} && !exists $data->{ResultEvent}) { warn "Child $pid exited with nonzero status $s\n"; } $kernel->alarm_remove($data->{alrm}) if $data->{Timeout}; if ($data->{lazy}) { $self->{lazy}{$id} = { merged => $data->{merged}, stdout => $data->{stdout}, stderr => $data->{stderr}, status => $status, } } delete $self->{wheels}{$id}; if (defined $data->{ExitEvent}) { $kernel->call( $self->{parent_id}, $data->{ExitEvent}, $status, $pid, (defined $data->{Context} ? $data->{Context} : ()), ); } if (defined $data->{ResultEvent}) { $kernel->call( $self->{parent_id}, $data->{ResultEvent}, $data->{stdout}, $data->{stderr}, $data->{merged}, $status, $pid, (defined $data->{Context} ? $data->{Context} : ()), ); } return; } sub _child_timeout { my ($self, $id) = @_[OBJECT, ARG0]; $self->{wheels}{$id}{obj}->kill(); return; } sub _child_stdin { my ($self, $id) = @_[OBJECT, ARG0]; $self->{wheels}{$id}{obj}->shutdown_stdin(); return; } sub _child_stdout { my ($kernel, $self, $output, $id) = @_[KERNEL, OBJECT, ARG0, ARG1]; my $data = $self->{wheels}{$id}; if ($data->{lazy} || defined $data->{ResultEvent}) { push @{ $data->{merged} }, $output; push @{ $data->{stdout} }, $output; if ($data->{lazy}{Tee}) { print $output, "\n"; } } elsif (!exists $data->{StdoutEvent}) { print "$output\n"; } elsif (defined (my $event = $data->{StdoutEvent})) { my $context = $data->{Context}; $kernel->call( $self->{parent_id}, $event, $output, $data->{obj}->PID, (defined $context ? $context : ()), ); } return; } sub _child_stderr { my ($kernel, $self, $error, $id) = @_[KERNEL, OBJECT, ARG0, ARG1]; my $data = $self->{wheels}{$id}; if ($data->{lazy} || defined $data->{ResultEvent}) { push @{ $data->{merged} }, $error; push @{ $data->{stderr} }, $error; if ($data->{lazy}{Tee}) { $data->{lazy}{Merged} ? print $error, "\n" : warn $error, "\n"; } } elsif (!exists $data->{StderrEvent}) { warn "$error\n"; } elsif (defined (my $event = $data->{StderrEvent})) { my $context = $data->{Context}; $kernel->call( $self->{parent_id}, $event, $error, $data->{obj}->PID, (defined $context ? $context : ()), ); } return; } sub _pid_to_id { my ($self, $pid) = @_; for my $id (keys %{ $self->{wheels} }) { return $id if $self->{wheels}{$id}{obj}->PID == $pid; } return; } sub killall { my $self = shift; $self = POE::Quickie->new() if ref $self ne 'POE::Quickie'; $poe_kernel->call($self->{session_id}, '_killall', @_); return; } sub _killall { my ($kernel, $self, $signal) = @_[KERNEL, OBJECT, ARG0]; $kernel->alarm_remove_all(); for my $id (keys %{ $self->{wheels}}) { $self->{wheels}{$id}{obj}->kill($signal); } return; } sub processes { my ($self) = @_; $self = POE::Quickie->new() if ref $self ne 'POE::Quickie'; my %wheels; for my $id (keys %{ $self->{wheels} }) { my $pid = $self->{wheels}{$id}{obj}->PID; $wheels{$pid} = $self->{wheels}{$id}{Context}; } return \%wheels; } sub _lazy_run { my ($self, %args) = @_; my %run_args; if (@{ $args{RunArgs} } == 1 && (!ref $args{RunArgs}[0] || ref ($args{RunArgs}[0]) =~ /^(?:ARRAY|CODE)$/)) { $run_args{Program} = $args{RunArgs}[0]; } else { %run_args = @{ $args{RunArgs} }; } my $pid = $self->run( %run_args, ExitEvent => undef, ($args{Tee} ? () : (StderrEvent => undef)), ($args{Tee} ? () : (StdoutEvent => undef)), ); my $id = $self->_pid_to_id($pid); $self->{wheels}{$id}{lazy} = { Tee => $args{Tee}, Merged => $args{Merged}, }; my $parent_id = $poe_kernel->get_active_session->ID; $poe_kernel->refcount_increment($parent_id, __PACKAGE__); $poe_kernel->run_one_timeslice() while $self->{wheels}{$id}; $poe_kernel->refcount_decrement($parent_id, __PACKAGE__); my $data = delete $self->{lazy}{$id}; return $data->{merged}, $data->{status} if $args{Merged}; return $data->{stdout}, $data->{stderr}, $data->{status}; } sub quickie_run { my %args = @_; my $self = POE::Quickie->new(); return $self->run(%args); } sub quickie { my @args = @_; my $self = POE::Quickie->new(); return $self->_lazy_run( RunArgs => \@args ); } sub quickie_tee { my @args = @_; my $self = POE::Quickie->new(); return $self->_lazy_run( RunArgs => \@args, Tee => 1, ); } sub quickie_merged { my @args = @_; my $self = POE::Quickie->new(); return $self->_lazy_run( RunArgs => \@args, Merged => 1, ); } sub quickie_tee_merged { my @args = @_; my $self = POE::Quickie->new(); return $self->_lazy_run( RunArgs => \@args, Tee => 1, Merged => 1, ); } } POE::Session->create( package_states => [ (__PACKAGE__) => [qw( _start stdout stderr )], ], ); POE::Kernel->run; sub _start { my $heap = $_[HEAP]; $heap->{wrapper} = WheelWrapper->new(trace => 1); $heap->{wrapper}->run( Program => sub { print "foo\n" }, StdoutEvent => 'stdout', ); } sub stdout { my ($heap, $output) = @_[HEAP, ARG0]; is($output, 'foo', 'Got stdout'); $heap->{wrapper}->run( Program => sub { warn "bar\n" }, StderrEvent => 'stderr', ); } sub stderr { my ($heap, $error) = @_[HEAP, ARG0]; is($error, 'bar', 'Got stderr'); }
Thank you for the test case. I believe this is fixed in https://github.com/rcaputo/poe/commit/eae7a261fc6cf5b2950b5cd92fd8d394f078688c