Skip Menu |

This queue is for tickets about the IO-Async CPAN distribution.

Report information
The Basics
Id: 75572
Status: resolved
Priority: 0/
Queue: IO-Async

People
Owner: Nobody in particular
Requestors: leonerd-cpan [...] leonerd.org.uk
Cc:
AdminCc:

Bug Information
Severity: Wishlist
Broken in: (no value)
Fixed in: 0.48



Subject: Function improvements: ->restart, max_calls
$function->restart # gracefully stop workers and restart them, to pick up code changes, etc... $function->configure( max_worker_calls => 123 ) # stop a worker after that many calls -- Paul Evans
On Tue Mar 06 07:10:39 2012, PEVANS wrote: Show quoted text
> $function->restart # gracefully stop workers and restart them, to pick > up code changes, etc...
Done this part -- Paul Evans
Subject: rt75572-1.patch
=== modified file 'lib/IO/Async/Channel.pm' --- lib/IO/Async/Channel.pm 2012-04-25 22:23:45 +0000 +++ lib/IO/Async/Channel.pm 2012-05-11 23:25:33 +0000 @@ -341,7 +341,7 @@ sub _close_async { my $self = shift; - $self->{stream}->close_when_empty; + $self->{stream}->close_when_empty if $self->{stream}; } sub _on_stream_read === modified file 'lib/IO/Async/Function.pm' --- lib/IO/Async/Function.pm 2012-03-24 21:08:34 +0000 +++ lib/IO/Async/Function.pm 2012-05-11 23:27:20 +0000 @@ -273,6 +273,20 @@ } } +=head2 $function->restart + +Gracefully stop and restart all the worker processes. + +=cut + +sub restart +{ + my $self = shift; + + $self->stop; + $self->start; +} + =head2 $function->call( %params ) Schedules an invocation of the contained function to be executed on one of the === modified file 't/42function.t' --- t/42function.t 2012-05-11 18:32:41 +0000 +++ t/42function.t 2012-05-11 23:27:35 +0000 @@ -4,7 +4,7 @@ use IO::Async::Test; -use Test::More tests => 34; +use Test::More tests => 37; use Test::Fatal; use Test::Refcount; @@ -144,6 +144,7 @@ $loop->remove( $function ); } +# max_workers { my $count = 0; @@ -175,6 +176,7 @@ $loop->remove( $function ); } +# exit_on_die { my $count = 0; @@ -206,6 +208,7 @@ $loop->remove( $function ); } +# restart after exit { my $function = IO::Async::Function->new( min_workers => 0, @@ -420,3 +423,54 @@ is( $result, "return", 'Write-to-STD{OUT+ERR} function returned' ); is( $buffer, "A line to STDOUT\nA line to STDERR\n", 'Write-to-STD{OUT+ERR} wrote to pipe' ); } + +# Restart +{ + my $value = 1; + + my $function = IO::Async::Function->new( + code => sub { return $value }, + ); + + $loop->add( $function ); + + my $result; + $function->call( + args => [], + on_return => sub { $result = shift }, + on_error => sub { die "Test failed early - @_" }, + ); + + wait_for { defined $result }; + + is( $result, 1, '$result before restart' ); + + $value = 2; + $function->restart; + + undef $result; + $function->call( + args => [], + on_return => sub { $result = shift }, + on_error => sub { die "Test failed early - @_" }, + ); + + wait_for { defined $result }; + + is( $result, 2, '$result after restart' ); + + undef $result; + $function->call( + args => [], + on_return => sub { $result = shift }, + on_error => sub { die "Test failed early - @_" }, + ); + + $function->restart; + + wait_for { defined $result }; + + is( $result, 2, 'call before restart still returns result' ); + + $loop->remove( $function ); +}
On Tue Mar 06 07:10:39 2012, PEVANS wrote: Show quoted text
> $function->configure( max_worker_calls => 123 ) # stop a worker after > that many calls
Done. -- Paul Evans
Subject: rt75572-2.patch
=== modified file 'lib/IO/Async/Function.pm' --- lib/IO/Async/Function.pm 2012-05-11 22:20:59 +0000 +++ lib/IO/Async/Function.pm 2012-05-18 16:51:55 +0000 @@ -121,6 +121,12 @@ actual number running at any time will be kept somewhere between these bounds according to load. +=item max_worker_calls => INT + +Optional. If provided, stop a worker process after it has processed this +number of calls. (New workers may be started to replace stopped ones, within +the bounds given above). + =item idle_timeout => NUM Optional. If provided, idle worker processes will be shut down after this @@ -161,7 +167,7 @@ my %params = @_; my %worker_params; - foreach (qw( exit_on_die )) { + foreach (qw( exit_on_die max_worker_calls )) { $self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_}; } @@ -426,6 +432,7 @@ my $worker = IO::Async::Function::Worker->new( ( map { $_ => $self->{$_} } qw( code setup exit_on_die ) ), + max_calls => $self->{max_worker_calls}, on_finish => $self->_capture_weakself( sub { my $self = shift or return; @@ -497,8 +504,6 @@ my $arg_channel = IO::Async::Channel->new; my $ret_channel = IO::Async::Channel->new; - my $exit_on_die = delete $params{exit_on_die}; - my $code = delete $params{code}; $params{code} = sub { while( my $args = $arg_channel->recv ) { @@ -522,11 +527,20 @@ $worker->{arg_channel} = $arg_channel; $worker->{ret_channel} = $ret_channel; - $worker->{exit_on_die} = $exit_on_die; return $worker; } +sub configure +{ + my $self = shift; + my %params = @_; + + exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls ); + + $self->SUPER::configure( %params ); +} + sub stop { my $worker = shift; @@ -572,6 +586,8 @@ die "Unrecognised type from worker - $type\n"; } + $worker->stop if !$worker->{max_calls}; + $function->_dispatch_pending if $function; } ), on_eof => $worker->_capture_weakself( sub { @@ -589,6 +605,7 @@ ); $worker->{busy} = 1; + $worker->{max_calls}--; } =head1 NOTES === modified file 't/42function.t' --- t/42function.t 2012-05-11 23:29:23 +0000 +++ t/42function.t 2012-05-18 16:51:55 +0000 @@ -4,7 +4,7 @@ use IO::Async::Test; -use Test::More tests => 37; +use Test::More tests => 40; use Test::Fatal; use Test::Refcount; @@ -474,3 +474,44 @@ $loop->remove( $function ); } + +# max_worker_calls +{ + my $counter; + my $function = IO::Async::Function->new( + max_workers => 1, + max_worker_calls => 2, + code => sub { return ++$counter; } + ); + + $loop->add( $function ); + + my $result; + $function->call( + args => [], + on_return => sub { $result = shift }, + on_error => sub { die "Test failed early - @_" }, + ); + wait_for { defined $result }; + is( $result, 1, '$result from first call' ); + + undef $result; + $function->call( + args => [], + on_return => sub { $result = shift }, + on_error => sub { die "Test failed early - @_" }, + ); + wait_for { defined $result }; + is( $result, 2, '$result from second call' ); + + undef $result; + $function->call( + args => [], + on_return => sub { $result = shift }, + on_error => sub { die "Test failed early - @_" }, + ); + wait_for { defined $result }; + is( $result, 1, '$result from third call' ); + + $loop->remove( $function ); +}
Released in 0.48 -- Paul Evans