Skip Menu |

This queue is for tickets about the future CPAN distribution.

Report information
The Basics
Id: 123876
Status: resolved
Priority: 0/
Queue: future

People
Owner: Nobody in particular
Requestors: leonerd-cpan [...] leonerd.org.uk
Cc:
AdminCc:

Bug Information
Severity: Wishlist
Broken in: (no value)
Fixed in: 0.38



Subject: Consider N-way concurrency in Future::Mutex
Either as a new class, or perhaps more likely just a constructor argument to allow N outstanding calls, rather than just one. With suitable implementation it could be useful for concurrency-bounding in other locations too, such as the `fmap*` family. -- Paul Evans
This might do it. -- Paul Evans
Subject: rt123876.patch
=== modified file 'lib/Future/Mutex.pm' --- lib/Future/Mutex.pm 2017-12-15 01:20:49 +0000 +++ lib/Future/Mutex.pm 2017-12-15 01:54:37 +0000 @@ -51,6 +51,13 @@ (by its returned future providing a result or failing), the next queued code is invoked. +An instance may also be a counting mutex if initialised with a count greater +than one. In this case, it can keep multiple blocks outstanding up to that +limit, with subsequent requests queued as before. This allows it to act as a +concurrency-bounding limit around some operation that can run concurrently, +but an application wishes to apply overall limits to stop it growing too much, +such as communications with external services or executing other programs. + =cut =head1 CONSTRUCTOR @@ -59,18 +66,31 @@ =head2 new - $mutex = Future::Mutex->new + $mutex = Future::Mutex->new( count => $n ) Returns a new C<Future::Mutex> instance. It is initially unlocked. +Takes the following named arguments: + +=over 8 + +=item count => INT + +Optional number to limit outstanding concurrency. Will default to 1 if not +supplied. + +=back + =cut sub new { my $class = shift; + my %params = @_; return bless { - f => Future->done, + avail => $params{count} // 1, + queue => [], }, $class; } @@ -98,11 +118,25 @@ my $self = shift; my ( $code ) = @_; - my $old_f = $self->{f}; - $self->{f} = my $new_f = Future->new; - - $old_f->then( $code ) - ->on_ready( sub { my $f = shift; $new_f->done; $f; } ); + my $down_f; + if( $self->{avail} ) { + $self->{avail}--; + $down_f = Future->done; + } + else { + push @{ $self->{queue} }, $down_f = Future->new; + } + + my $up = sub { + if( my $next_f = shift @{ $self->{queue} } ) { + $next_f->done; + } + else { + $self->{avail}++; + } + }; + + $down_f->then( $code )->on_ready( $up ); } =head2 available @@ -116,7 +150,7 @@ sub available { my $self = shift; - return $self->{f}->is_done; + return $self->{avail}; } =head1 AUTHOR === modified file 't/40mutex.t' --- t/40mutex.t 2017-12-15 01:20:49 +0000 +++ t/40mutex.t 2017-12-15 01:54:37 +0000 @@ -113,4 +113,53 @@ ok( $mutex->available, 'Mutex is available after cancel' ); } +# queueing +{ + my $mutex = Future::Mutex->new; + + my ( $f1, $f2, $f3 ); + my $f = Future->needs_all( + $mutex->enter( sub { $f1 = Future->new } ), + $mutex->enter( sub { $f2 = Future->new } ), + $mutex->enter( sub { $f3 = Future->new } ), + ); + + ok( defined $f1, '$f1 defined' ); + $f1->done; + + ok( defined $f2, '$f2 defined' ); + $f2->done; + + ok( defined $f3, '$f3 defined' ); + $f3->done; + + ok( $f->is_done, 'Chain is done' ); + ok( $mutex->available, 'Mutex is available after chain done' ); +} + +# counting +{ + my $mutex = Future::Mutex->new( count => 2 ); + + is( $mutex->available, 2, 'Mutex has 2 counts available' ); + + my ( $f1, $f2, $f3 ); + my $f = Future->needs_all( + $mutex->enter( sub { $f1 = Future->new } ), + $mutex->enter( sub { $f2 = Future->new } ), + $mutex->enter( sub { $f3 = Future->new } ), + ); + + ok( defined $f1 && defined $f2, '$f1 and $f2 defined with count 2' ); + + $f1->done; + ok( defined $f3, '$f3 defined after $f1 done' ); + + $f2->done; + $f3->done; + + ok( $f->is_done, 'Chain is done' ); + ok( $mutex->available, 'Mutex is available after chain done' ); +} + done_testing;
Released in 0.38 -- Paul Evans