Skip Menu |

This queue is for tickets about the POE CPAN distribution.

Report information
The Basics
Id: 47966
Status: resolved
Priority: 0/
Queue: POE

People
Owner: Nobody in particular
Requestors: perl [...] pied.nu
Cc:
AdminCc:

Bug Information
Severity: Important
Broken in: 1.006
Fixed in: (no value)



Subject: Signals must not touch the queue
There is a race condition in POE::Queue::Array : If a signal handler modifies the queue while the main program is also modifying it, all offsets used by ->_find_item (or others) are in error. The included patch is a minimalist work around: signal handlers put events into a shadow queue, which gets copied to the main queue when expedient. Another work around is to use POE::XS::Queue::Array. Another idea would be to borrow a leaf from erlang. Erlang has a shared-nothing threading model. If we conceive of the signal handlers running in one thread, and main loop running in another thread, we need to make sure they can't stomp on each other. The included patch does this by copying the shadow queue inside of a grep, which should be signal-safe. Erlang, however, would use a pipe or socket to send messages between the threads. We could use POE::Pipe::OneWay to send very short messages from the signal handlers to the main loop. A simple string like 'CHLD' or 'PIPE', or even just the signal's number. This would wake up the main loop from its select (or poll or whatever). It would then enqueue the real events in the main queue. If done this way, we need to rewrite _test_if_kernel_is_idle so that it ignores the signal-pipe, but only if it is empty. BTW, this is only an issue for PerlSignals. For the POE::Loops that implement loop_watch_signal() (Event_Lib, EV), it shouldn't be an issue. What's more, doing the above would remove the signal polling that happens in POE::Loop::TkActiveState, for instance.
Subject: Philip_Gwyn-POE-signal-shadow-queue.01.patch
Index: lib/POE/Resource/Signals.pm =================================================================== --- lib/POE/Resource/Signals.pm (revision 2571) +++ lib/POE/Resource/Signals.pm (working copy) @@ -467,7 +467,7 @@ return if $polling_for_signals; $polling_for_signals = 1; - $self->_data_ev_enqueue( + $self->_data_shadow_enqueue( $self, $self, EN_SCPOLL, ET_SCPOLL, [ ], __FILE__, __LINE__, undef, time(), ); @@ -587,6 +587,9 @@ # If waitpid() returned 0, then we have child processes. $kr_child_procs = !$pid; + if (TRACE_SIGNALS) { + _warn("$$: <sg> kr_child_procs=$kr_child_procs" ); + } unless ( USE_SIGCHLD ) { @@ -611,9 +614,46 @@ sub _data_sig_child_procs { return if !USE_SIGCHLD and !$polling_for_signals; + if (TRACE_SIGNALS) { + _warn("$$: <sg> _data_sig_child_procs kr_child_procs=$kr_child_procs" ); + } return $kr_child_procs; } +# This is the shadow queue, used by the signal handlers. +# Modifying the kernel's queue from a signal handler is not safe. +# So any new events go into this queue, and get copied to the main queue +# by _data_sig_enqueue(). +my @shadow_queue; + +sub _data_shadow_is_empty { + return 0==@shadow_queue; +} + +sub _data_shadow_enqueue { + my $self = shift; + if (TRACE_SIGNALS) { + _warn("$$: <sg> shadow queue signal = ", $_[2], + ( 0==@{$_[4]} ? '' : " SIG$_[4][0]" ) + ); + } + push @shadow_queue, [ @_ ]; +} + +sub _data_shadow_requeue { + my( $self ) = @_; + return unless @shadow_queue; + my @temp; + # I HOPE this is signal-safe and atomic. If a bit excessive. + @shadow_queue = grep { push @temp, $_; 0 } @shadow_queue; + if (TRACE_SIGNALS) { + _warn("$$: <sg> shadow queue -> main queue ", (0+@temp), " signals." ); + } + foreach my $todo ( @temp ) { + $self->_data_ev_enqueue( @$todo ); + } +} + 1; __END__ Index: lib/POE/Kernel.pm =================================================================== --- lib/POE/Kernel.pm (revision 2571) +++ lib/POE/Kernel.pm (working copy) @@ -635,6 +635,7 @@ $kr_queue->get_item_count() > $idle_queue_size or $self->_data_handle_count() or $self->_data_extref_count() or + $self->_data_shadow_is_idle() or $self->_data_sig_child_procs() ) { $self->_data_ev_enqueue( Index: lib/POE/Loop/Gtk.pm =================================================================== --- lib/POE/Loop/Gtk.pm (revision 2571) +++ lib/POE/Loop/Gtk.pm (working copy) @@ -241,6 +241,7 @@ $self->_data_stat_add('idle_seconds', time() - $last_time); } + $self->_data_shadow_requeue(); $self->_data_ev_dispatch_due(); $self->_test_if_kernel_is_idle(); Index: lib/POE/Loop/TkCommon.pm =================================================================== --- lib/POE/Loop/TkCommon.pm (revision 2571) +++ lib/POE/Loop/TkCommon.pm (working copy) @@ -132,6 +132,7 @@ sub loop_do_timeslice { my $self = shift; + $self->_data_shadow_requeue(); # Check for a hung kernel. $self->_test_if_kernel_is_idle(); my $now; Index: lib/POE/Loop/IO_Poll.pm =================================================================== --- lib/POE/Loop/IO_Poll.pm (revision 2571) +++ lib/POE/Loop/IO_Poll.pm (working copy) @@ -221,6 +221,7 @@ sub loop_do_timeslice { my $self = shift; + $self->_data_shadow_requeue(); # Check for a hung kernel. $self->_test_if_kernel_is_idle(); Index: lib/POE/Loop/Tk.pm =================================================================== --- lib/POE/Loop/Tk.pm (revision 2571) +++ lib/POE/Loop/Tk.pm (working copy) @@ -197,6 +197,7 @@ # Tk filehandle callback to dispatch selects. sub _loop_select_callback { my ($fileno, $mode) = @_; + $poe_kernel->_data_shadow_requeue(); $poe_kernel->_data_handle_enqueue_ready($mode, $fileno); $poe_kernel->_test_if_kernel_is_idle(); } Index: lib/POE/Loop/Select.pm =================================================================== --- lib/POE/Loop/Select.pm (revision 2571) +++ lib/POE/Loop/Select.pm (working copy) @@ -143,6 +143,7 @@ sub loop_do_timeslice { my $self = shift; + $self->_data_shadow_requeue(); # Check for a hung kernel. $self->_test_if_kernel_is_idle(); Index: lib/POE/Loop/Event.pm =================================================================== --- lib/POE/Loop/Event.pm (revision 2571) +++ lib/POE/Loop/Event.pm (working copy) @@ -162,6 +162,7 @@ $self->_data_stat_add('idle_seconds', time() - $last_time); } + $self->_data_shadow_requeue(); $self->_data_ev_dispatch_due(); $self->_test_if_kernel_is_idle(); @@ -191,6 +192,7 @@ ); $self->_data_handle_enqueue_ready($mode, $fileno); + $self->_data_shadow_requeue(); $self->_test_if_kernel_is_idle(); } Index: lib/POE/Loop/PerlSignals.pm =================================================================== --- lib/POE/Loop/PerlSignals.pm (revision 2571) +++ lib/POE/Loop/PerlSignals.pm (working copy) @@ -30,7 +30,7 @@ POE::Kernel::_warn "<sg> Enqueuing generic SIG$_[0] event"; } - $poe_kernel->_data_ev_enqueue( + $poe_kernel->_data_shadow_enqueue( $poe_kernel, $poe_kernel, EN_SIGNAL, ET_SIGNAL, [ $_[0] ], __FILE__, __LINE__, undef, time() ); @@ -42,7 +42,7 @@ POE::Kernel::_warn "<sg> Enqueuing PIPE-like SIG$_[0] event"; } - $poe_kernel->_data_ev_enqueue( + $poe_kernel->_data_shadow_enqueue( $poe_kernel, $poe_kernel, EN_SIGNAL, ET_SIGNAL, [ $_[0] ], __FILE__, __LINE__, undef, time() );
This other patch uses the signal-pipe. It can be turned off with POE_USE_SIGNAL_PIPE=0

Message body is not shown because it is too large.

Furthur possible refinement on the signal-pipe : http://www.xs4all.nl/~evbergen/unix-signals.html
Apprend size of the buffers for various POE::Pipe::OneWay conduits: Max req: pipe = 4096 (/proc/sys/msgmax ??) socketpair = 364 socket = 139561 (/proc/sys/net/ipv4/tcp_max_tw_buckets ??) Values are number of 1 byte requests. Perl 5.8.8, CentOS 5, Linux 2.6.9-78.0.13.ELvm-smp Pipe::OneWay has the following precedence : socketpair, pipe, socket. ...and socketpair is the smallest, socket probably the slowest. Hence a pipe is requested for the signal-pipe.
Test results for various loops. Perl 5.8.8, CentOS 4, Linux 2.6.9-78.0.13.ELvm-smp POE::Loop::Select - works POE::Loop::Event - works POE::Loop::IO_Poll - works POE::Loop::Tk - Tk needs X POE::Loop::Gtk - Gtk needs X POE::Loop::TkActiveState - needs Win32 and X Other loops on the CPAN: POE::Loop::EV - works POE::Loop::Event::Lib - fails, even w/o the patch POE::Loop::Glib - Can't install Glib POE::Loop::Kqueue - fails to build POE::Loop::Prima - Prima needs X POE::Loop::Wx - Wx needs X POE::XS::Loop::EPoll - works POE::XS::Loop::Poll - works
perl 5.8.8, FreeBSD 6.2-RELEASE POE::Loop::Select - works POE::Loop::Event - works POE::Loop::IO_Poll - works POE::Loop::Tk - Tk needs X POE::Loop::Gtk - Gtk needs X POE::Loop::TkActiveState - needs Win32 and X POE::Loop::EV - t/30_loops/60_ev_kqueue/wheel_run.t failure of one check POE::Loop::Event::Lib - fails anyway POE::Loop::Glib - POE::Loop::Kqueue - Works. Must generate tests by hand. POE::Loop::Prima - no X POE::Loop::Wx - no X POE::XS::Loop::EPoll - Only supports Linux POE::XS::Loop::Poll - works Max bytes: pipe = 65536 socketpair = 8192 socket = 114688
Perl 5.10.0, FC11, Linux 2.6.29.5-191.fc11.i686.PAE POE::Loop::Select - works POE::Loop::Event - works POE::Loop::IO_Poll - works POE::Loop::Tk - no X POE::Loop::Gtk - no X POE::Loop::TkActiveState - no X nor Win32 POE::Loop::EV - works POE::Loop::Event::Lib - fails w/o the patch. segfaults w/ patch POE::Loop::Glib - POE::Loop::Kqueue - Not linux POE::Loop::Prima - no X POE::Loop::Wx - no X POE::XS::Loop::EPoll - works POE::XS::Loop::Poll - works Max bytes: pipe = 65536 socketpair = 363 socket = 121576
Patch applied as revision 2576. Thank you very much! Release is blocked on feedback from mailing list users. has_forked() also needs documentation before the POD tests will pass. A recap of my feedback from IRC: pack("C") and unpack("C") may be rewritten as chr() and ord() depending on your intent. has_forked() may be able to recalculate POE::Kernel's ID in the child process. I believe the kernel ID includes a PID. It's late, so I'm not sure what the "top" means in the _loop_signal_handler_generic_top() method name. It might be clearer in the morning. Thank you very much for providing a "make test"-clean patch.
Show quoted text
> has_forked() may be able to recalculate POE::Kernel's ID in the child > process. I believe the kernel ID includes a PID.
Yes. Very. However, IKC currently assumes the Kernel ID won't change after a fork. Show quoted text
> It's late, so I'm not sure what the "top" means in the > _loop_signal_handler_generic_top() method name. It might be clearer in > the morning.
Term borrowed from interupt handlers, which are split into bottom and top halves. Bottom half records the internupt, cleans up and makes sure the top half gets called at a better time. Like the _top methods here.
http://en.wikipedia.org/wiki/Interrupt_handler I seem to have gotten the top and bottom switched.
This patch is an add on to the previous one. - Add ->_data_sig_has_forked, which rebuilds the signal-pipe and clears $kr_child_procs - Blocks signals while rebuilding the signal-pipe - Documents ->has_forked - Documents USE_SIGNAL_PIPE - Rename the _top methods to _bottom, to follow the interupt handler usage.
Index: lib/POE/Resource/Signals.pm =================================================================== --- lib/POE/Resource/Signals.pm (revision 2584) +++ lib/POE/Resource/Signals.pm (working copy) @@ -15,7 +15,7 @@ use POE::Pipe::OneWay; use POE::Resource::FileHandles; -use POSIX qw(:sys_wait_h); +use POSIX qw(:sys_wait_h sigprocmask SIG_SETMASK); ### Map watched signal names to the sessions that are watching them ### and the events that must be delivered when they occur. @@ -101,10 +101,7 @@ sub _data_sig_initialize { my $self = shift; - # Initialize this to a true value so our waitpid() loop can run at - # least once. Starts false when running in an Apache handler so our - # SIGCHLD hijinks don't interfere with the web server. - $kr_child_procs = exists($INC{'Apache.pm'}) ? 0 : ( USE_SIGCHLD ? 0 : 1 ); + $self->_data_sig_reset_procs; $poe_kernel->[KR_SIGNALS] = \%kr_signals; $poe_kernel->[KR_PIDS] = \%kr_pids_to_events; @@ -147,6 +144,25 @@ } } +sub _data_sig_has_forked { + my( $self ) = @_; + $self->_data_sig_reset_procs; + if( USE_SIGNAL_PIPE ) { + $self->_data_sig_mask_all; + $self->_data_sig_pipe_finalize; + $self->_data_sig_pipe_build; + $self->_data_sig_unmask_all; + } +} + +sub _data_sig_reset_procs { + # Initialize this to a true value so our waitpid() loop can run at + # least once. Starts false when running in an Apache handler so our + # SIGCHLD hijinks don't interfere with the web server. + $kr_child_procs = exists($INC{'Apache.pm'}) ? 0 : ( USE_SIGCHLD ? 0 : 1 ); +} + + ### Return signals that are safe to manipulate. sub _data_sig_get_safe_signals { @@ -628,12 +644,13 @@ ## implement them as shared-nothing threads. ## ## The signal handlers are split in 2 : -## - a bottom handler, which sends the signal number over a one-way pipe. -## - a top handler, which is called when this number is received. +## - a top handler, which sends the signal number over a one-way pipe. +## - a bottom handler, which is called when this number is received in the +## main loop. use vars qw( $signal_pipe_read_fd ); my( $signal_pipe_write, $signal_pipe_read, $signal_pipe_pid, - %SIG2NUM, %NUM2SIG ); + $signal_mask_none, $signal_mask_all, %SIG2NUM, %NUM2SIG ); sub _data_sig_pipe_build { my( $self ) = @_; @@ -659,6 +676,9 @@ # Associate the pipe with this PID $signal_pipe_pid = $$; + # Mess with the signal mask + $self->_data_sig_mask_all; + # Open the signal pipe if (RUNNING_IN_HELL) { ( $signal_pipe_read, $signal_pipe_write ) = POE::Pipe::OneWay->new('inet'); @@ -679,8 +699,40 @@ # Add to the select list $self->_data_handle_condition( $signal_pipe_read ); $self->loop_watch_filehandle( $signal_pipe_read, MODE_RD ); + $self->_data_sig_unmask_all; } +sub _data_sig_mask_build { + $signal_mask_none = POSIX::SigSet->new(); + $signal_mask_none->emptyset(); + $signal_mask_all = POSIX::SigSet->new(); + $signal_mask_all->fillset(); +} + +### Mask all signals +sub _data_sig_mask_all { + my $self = $poe_kernel; + unless( $signal_mask_all ) { + $self->_data_sig_mask_build; + } + my $mask_temp; + sigprocmask( SIG_SETMASK, $signal_mask_all, $mask_temp ) + or _trap "<sg> Unable to mask all signals: $!"; +} + +### Unmask all signals +sub _data_sig_unmask_all { + my $self = $poe_kernel; + unless( $signal_mask_none ) { + $self->_data_sig_mask_build; + } + my $mask_temp; + sigprocmask( SIG_SETMASK, $signal_mask_none, $mask_temp ) + or _trap "<sg> Unable to unmask all signals: $!"; +} + + + sub _data_sig_pipe_finalize { my( $self ) = @_; if( $signal_pipe_read ) { @@ -761,13 +813,13 @@ } my $sig = $NUM2SIG{ $n }; if( $sig eq 'CHLD' ) { - _loop_signal_handler_chld_top( $sig ); + _loop_signal_handler_chld_bottom( $sig ); } elsif( $sig eq 'PIPE' ) { - _loop_signal_handler_pipe_top( $sig ); + _loop_signal_handler_pipe_bottom( $sig ); } else { - _loop_signal_handler_generic_top( $sig ); + _loop_signal_handler_generic_bottom( $sig ); } } } Index: lib/POE/Kernel.pm =================================================================== --- lib/POE/Kernel.pm (revision 2584) +++ lib/POE/Kernel.pm (working copy) @@ -165,6 +165,9 @@ # The Kernel's master queue. my $kr_queue; +# The current PID, to detect when it changes +my $kr_pid; + # Filehandle activity modes. They are often used as list indexes. sub MODE_RD () { 0 } # read sub MODE_WR () { 1 } # write @@ -492,6 +495,7 @@ local *STDERR = *TRACE_FILE; _trap_death(); + $message =~ s/^/$$: /mg; die $message; _release_death(); } @@ -641,6 +645,12 @@ ); } + if( ASSERT_DATA ) { + if( $kr_pid != $$ ) { + _trap "New process detected. You must call ->has_forked() in the child process." + } + } + unless ( $kr_queue->get_item_count() > $idle_queue_size or $self->_data_handle_count() or @@ -812,6 +822,9 @@ # Create our master queue. $kr_queue = $queue_class->new(); + # Remember the PID + $kr_pid = $$; + # TODO - Should KR_ACTIVE_SESSIONS and KR_ACTIVE_EVENT be handled # by POE::Resource::Sessions? # TODO - Should the subsystems be split off into separate real @@ -1346,12 +1359,20 @@ # Less invasive form of ->stop() + ->run() sub has_forked { + if( $kr_pid == $$ ) { + _croak "You should only call ->has_forked() from the child process."; + } + # So has_forked() can be called as a class method. my $self = $poe_kernel; - if( USE_SIGNAL_PIPE ) { - $self->_data_sig_pipe_finalize; - $self->_data_sig_pipe_build; - } + + # Undefine the kernel ID so it will be recalculated on the next + # ID() call. + $self->[KR_ID] = undef; + $kr_pid = $$; + + # reset some stuff for the signals + $poe_kernel->_data_sig_has_forked; } #------------------------------------------------------------------------------ @@ -3064,6 +3085,22 @@ $kernel->run_while(\$job_count); +=head3 has_forked + + my $pid = fork(); + die "Unable to fork" unless defined $pid; + unless( $pid ) { + $poe_kernel->has_forked; + } + +Inform the kernel that it is now running in a new process. This allows the +kernel to reset some internal data to adjust to the new situation. + +has_forked() must be called in the child process if you wish to run the same +kernel. However, if you want the child process to have new kernel, you must +call L</stop> instead. + + =head3 stop stop() causes POE::Kernel->run() to return early. It does this by @@ -5281,7 +5318,19 @@ Defaults to 1 second. -=head2 CATCH_EXCEPTIONS +=head2 USE_SIGNAL_PIPE + +The only safe way to handle signals is to implement a shared-nothing model. +POE builds a I<signal pipe> that communicates between the the signal +handlers and the POE kernel loop in a safe and atomic manner. The signal +pipe is implemented with L<POE::Pipe::OneWay>, using a C<pipe> conduit on +Unix, and C<inet> on Windows. + +If you wish to revert to the previous unsafe signal behaviour, you must set +C<USE_SIGNAL_PIPE> to 0, or the environment vairable C<POE_USE_SIGNAL_PIPE>. + + +=head1 CATCH_EXCEPTIONS Whether or not POE should run event handler code in an eval { } and deliver the C<DIE> signal on errors. Index: lib/POE/Loop/PerlSignals.pm =================================================================== --- lib/POE/Loop/PerlSignals.pm (revision 2584) +++ lib/POE/Loop/PerlSignals.pm (working copy) @@ -30,11 +30,11 @@ POE::Kernel->_data_sig_pipe_send( $_[0] ); } else { - _loop_signal_handler_generic_top( $_[0] ); + _loop_signal_handler_generic_bottom( $_[0] ); } } -sub _loop_signal_handler_generic_top { +sub _loop_signal_handler_generic_bottom { if (TRACE_SIGNALS) { POE::Kernel::_warn "<sg> Enqueuing generic SIG$_[0] event"; } @@ -52,11 +52,11 @@ POE::Kernel->_data_sig_pipe_send( $_[0] ); } else { - _loop_signal_handler_pipe_top( $_[0] ); + _loop_signal_handler_pipe_bottom( $_[0] ); } } -sub _loop_signal_handler_pipe_top { +sub _loop_signal_handler_pipe_bottom { if (TRACE_SIGNALS) { POE::Kernel::_warn "<sg> Enqueuing PIPE-like SIG$_[0] event"; } @@ -74,11 +74,11 @@ POE::Kernel->_data_sig_pipe_send( 'CHLD' ); } else { - _loop_signal_handler_chld_top( $_[0] ); + _loop_signal_handler_chld_bottom( $_[0] ); } } -sub _loop_signal_handler_chld_top { +sub _loop_signal_handler_chld_bottom { if (TRACE_SIGNALS) { POE::Kernel::_warn "<sg> Enqueuing CHLD-like SIG$_[0] event"; } Index: lib/POE.pm =================================================================== --- lib/POE.pm (revision 2584) +++ lib/POE.pm (working copy) @@ -7,7 +7,7 @@ use Carp qw( croak ); use vars qw($VERSION $REVISION); -$VERSION = '1.006'; # NOTE - Should be #.### (three decimal places) +$VERSION = '1.007-rc2'; # NOTE - Should be #.### (three decimal places) $REVISION = do {my($r)=(q$Revision$=~/(\d+)/);sprintf"1.%04d",$r}; sub import {
This is a patch against r2587 that: - Assigns fake numbers (>128) to signals if SIGname isn't defined - Not mess with sigprocmask() if RUNNING_IN_HELL.
Index: lib/POE/Resource/Signals.pm =================================================================== --- lib/POE/Resource/Signals.pm (revision 2587) +++ lib/POE/Resource/Signals.pm (working copy) @@ -655,21 +655,30 @@ sub _data_sig_pipe_build { my( $self ) = @_; return unless USE_SIGNAL_PIPE; - + my $fake = 128; + unless( %SIG2NUM ) { foreach my $sig ( keys %_safe_signals ) { my $n = eval "POSIX::SIG$sig()"; - next if $@; - # paranoid check - _trap "<sg> SIG$sig is out of range ($n)" if $n > 255; + # warn $@; + if( $@ ) { # AKA : RUNNING_IN_HELL + # The number used is less important then the fact that it has + # a unique number assigned to it + $n = $fake++; + _trap "<sg> SIG$sig not defined and $n > 255" if $n > 255; + } + else { + # paranoid check + _trap "<sg> SIG$sig is out of range ($n)" if $n > 127; + } $SIG2NUM{ $sig } = $n; $NUM2SIG{ $n } = $sig; } + # warn join "\n", map { "$_: $SIG2NUM{$_}" } sort keys %SIG2NUM; # we need CLD to be named CHLD $SIG2NUM{ CLD } = $SIG2NUM{ CHLD }; $NUM2SIG{ $SIG2NUM{ CHLD } } = 'CHLD'; $NUM2SIG{ $SIG2NUM{ CLD } } = 'CHLD' if $SIG2NUM{ CLD }; - # warn join "\n", map { "$_: $SIG2NUM{$_}" } sort keys %SIG2NUM; # warn join "\n", map { "$_: $_safe_signals{$_}" } sort keys %_safe_signals; } @@ -703,6 +712,7 @@ } sub _data_sig_mask_build { + return if RUNNING_IN_HELL; $signal_mask_none = POSIX::SigSet->new(); $signal_mask_none->emptyset(); $signal_mask_all = POSIX::SigSet->new(); @@ -711,6 +721,7 @@ ### Mask all signals sub _data_sig_mask_all { + return if RUNNING_IN_HELL; my $self = $poe_kernel; unless( $signal_mask_all ) { $self->_data_sig_mask_build; @@ -722,6 +733,7 @@ ### Unmask all signals sub _data_sig_unmask_all { + return if RUNNING_IN_HELL; my $self = $poe_kernel; unless( $signal_mask_none ) { $self->_data_sig_mask_build;
Patches applied. Also, you re-acquired your commit bit and worked on the code directly. Thank you very much for championing this fix. POE 1.007 will be released shortly including it.