This might do it.
--
Paul Evans
=== modified file 'lib/Future/Mutex.pm'
--- lib/Future/Mutex.pm 2017-12-15 01:20:49 +0000
+++ lib/Future/Mutex.pm 2017-12-15 01:54:37 +0000
@@ -51,6 +51,13 @@
(by its returned future providing a result or failing), the next queued code
is invoked.
+An instance may also be a counting mutex if initialised with a count greater
+than one. In this case, it can keep multiple blocks outstanding up to that
+limit, with subsequent requests queued as before. This allows it to act as a
+concurrency-bounding limit around some operation that can run concurrently,
+but an application wishes to apply overall limits to stop it growing too much,
+such as communications with external services or executing other programs.
+
=cut
=head1 CONSTRUCTOR
@@ -59,18 +66,31 @@
=head2 new
- $mutex = Future::Mutex->new
+ $mutex = Future::Mutex->new( count => $n )
Returns a new C<Future::Mutex> instance. It is initially unlocked.
+Takes the following named arguments:
+
+=over 8
+
+=item count => INT
+
+Optional number to limit outstanding concurrency. Will default to 1 if not
+supplied.
+
+=back
+
=cut
sub new
{
my $class = shift;
+ my %params = @_;
return bless {
- f => Future->done,
+ avail => $params{count} // 1,
+ queue => [],
}, $class;
}
@@ -98,11 +118,25 @@
my $self = shift;
my ( $code ) = @_;
- my $old_f = $self->{f};
- $self->{f} = my $new_f = Future->new;
-
- $old_f->then( $code )
- ->on_ready( sub { my $f = shift; $new_f->done; $f; } );
+ my $down_f;
+ if( $self->{avail} ) {
+ $self->{avail}--;
+ $down_f = Future->done;
+ }
+ else {
+ push @{ $self->{queue} }, $down_f = Future->new;
+ }
+
+ my $up = sub {
+ if( my $next_f = shift @{ $self->{queue} } ) {
+ $next_f->done;
+ }
+ else {
+ $self->{avail}++;
+ }
+ };
+
+ $down_f->then( $code )->on_ready( $up );
}
=head2 available
@@ -116,7 +150,7 @@
sub available
{
my $self = shift;
- return $self->{f}->is_done;
+ return $self->{avail};
}
=head1 AUTHOR
=== modified file 't/40mutex.t'
--- t/40mutex.t 2017-12-15 01:20:49 +0000
+++ t/40mutex.t 2017-12-15 01:54:37 +0000
@@ -113,4 +113,53 @@
ok( $mutex->available, 'Mutex is available after cancel' );
}
+# queueing
+{
+ my $mutex = Future::Mutex->new;
+
+ my ( $f1, $f2, $f3 );
+ my $f = Future->needs_all(
+ $mutex->enter( sub { $f1 = Future->new } ),
+ $mutex->enter( sub { $f2 = Future->new } ),
+ $mutex->enter( sub { $f3 = Future->new } ),
+ );
+
+ ok( defined $f1, '$f1 defined' );
+ $f1->done;
+
+ ok( defined $f2, '$f2 defined' );
+ $f2->done;
+
+ ok( defined $f3, '$f3 defined' );
+ $f3->done;
+
+ ok( $f->is_done, 'Chain is done' );
+ ok( $mutex->available, 'Mutex is available after chain done' );
+}
+
+# counting
+{
+ my $mutex = Future::Mutex->new( count => 2 );
+
+ is( $mutex->available, 2, 'Mutex has 2 counts available' );
+
+ my ( $f1, $f2, $f3 );
+ my $f = Future->needs_all(
+ $mutex->enter( sub { $f1 = Future->new } ),
+ $mutex->enter( sub { $f2 = Future->new } ),
+ $mutex->enter( sub { $f3 = Future->new } ),
+ );
+
+ ok( defined $f1 && defined $f2, '$f1 and $f2 defined with count 2' );
+
+ $f1->done;
+ ok( defined $f3, '$f3 defined after $f1 done' );
+
+ $f2->done;
+ $f3->done;
+
+ ok( $f->is_done, 'Chain is done' );
+ ok( $mutex->available, 'Mutex is available after chain done' );
+}
+
done_testing;