On Tue Mar 06 07:10:39 2012, PEVANS wrote:
Show quoted text> $function->configure( max_worker_calls => 123 ) # stop a worker after
> that many calls
Done.
--
Paul Evans
=== modified file 'lib/IO/Async/Function.pm'
--- lib/IO/Async/Function.pm 2012-05-11 22:20:59 +0000
+++ lib/IO/Async/Function.pm 2012-05-18 16:51:55 +0000
@@ -121,6 +121,12 @@
actual number running at any time will be kept somewhere between these bounds
according to load.
+=item max_worker_calls => INT
+
+Optional. If provided, stop a worker process after it has processed this
+number of calls. (New workers may be started to replace stopped ones, within
+the bounds given above).
+
=item idle_timeout => NUM
Optional. If provided, idle worker processes will be shut down after this
@@ -161,7 +167,7 @@
my %params = @_;
my %worker_params;
- foreach (qw( exit_on_die )) {
+ foreach (qw( exit_on_die max_worker_calls )) {
$self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_};
}
@@ -426,6 +432,7 @@
my $worker = IO::Async::Function::Worker->new(
( map { $_ => $self->{$_} } qw( code setup exit_on_die ) ),
+ max_calls => $self->{max_worker_calls},
on_finish => $self->_capture_weakself( sub {
my $self = shift or return;
@@ -497,8 +504,6 @@
my $arg_channel = IO::Async::Channel->new;
my $ret_channel = IO::Async::Channel->new;
- my $exit_on_die = delete $params{exit_on_die};
-
my $code = delete $params{code};
$params{code} = sub {
while( my $args = $arg_channel->recv ) {
@@ -522,11 +527,20 @@
$worker->{arg_channel} = $arg_channel;
$worker->{ret_channel} = $ret_channel;
- $worker->{exit_on_die} = $exit_on_die;
return $worker;
}
+sub configure
+{
+ my $self = shift;
+ my %params = @_;
+
+ exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls );
+
+ $self->SUPER::configure( %params );
+}
+
sub stop
{
my $worker = shift;
@@ -572,6 +586,8 @@
die "Unrecognised type from worker - $type\n";
}
+ $worker->stop if !$worker->{max_calls};
+
$function->_dispatch_pending if $function;
} ),
on_eof => $worker->_capture_weakself( sub {
@@ -589,6 +605,7 @@
);
$worker->{busy} = 1;
+ $worker->{max_calls}--;
}
=head1 NOTES
=== modified file 't/42function.t'
--- t/42function.t 2012-05-11 23:29:23 +0000
+++ t/42function.t 2012-05-18 16:51:55 +0000
@@ -4,7 +4,7 @@
use IO::Async::Test;
-use Test::More tests => 37;
+use Test::More tests => 40;
use Test::Fatal;
use Test::Refcount;
@@ -474,3 +474,44 @@
$loop->remove( $function );
}
+
+# max_worker_calls
+{
+ my $counter;
+ my $function = IO::Async::Function->new(
+ max_workers => 1,
+ max_worker_calls => 2,
+ code => sub { return ++$counter; }
+ );
+
+ $loop->add( $function );
+
+ my $result;
+ $function->call(
+ args => [],
+ on_return => sub { $result = shift },
+ on_error => sub { die "Test failed early - @_" },
+ );
+ wait_for { defined $result };
+ is( $result, 1, '$result from first call' );
+
+ undef $result;
+ $function->call(
+ args => [],
+ on_return => sub { $result = shift },
+ on_error => sub { die "Test failed early - @_" },
+ );
+ wait_for { defined $result };
+ is( $result, 2, '$result from second call' );
+
+ undef $result;
+ $function->call(
+ args => [],
+ on_return => sub { $result = shift },
+ on_error => sub { die "Test failed early - @_" },
+ );
+ wait_for { defined $result };
+ is( $result, 1, '$result from third call' );
+
+ $loop->remove( $function );
+}