=== modified file 'Build.PL'
--- Build.PL 2013-12-30 23:22:21 +0000
+++ Build.PL 2014-03-27 12:34:15 +0000
@@ -14,6 +14,7 @@
'IO::Async::Loop' => '0.59',
'IO::Async::Stream' => '0.59',
'IO::Async::Timer::Countdown' => 0,
+ 'Struct::Dumb' => 0,
'URI' => 0,
},
recommends => {
=== modified file 'lib/Net/Async/HTTP.pm'
--- lib/Net/Async/HTTP.pm 2014-02-20 14:39:05 +0000
+++ lib/Net/Async/HTTP.pm 2014-03-27 12:45:58 +0000
@@ -1,7 +1,7 @@
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
-# (C) Paul Evans, 2008-2013 -- leonerd@leonerd.org.uk
+# (C) Paul Evans, 2008-2014 -- leonerd@leonerd.org.uk
package Net::Async::HTTP;
@@ -53,6 +53,9 @@
use constant READ_LEN => 64*1024; # 64 KiB
use constant WRITE_LEN => 64*1024; # 64 KiB
+use Struct::Dumb;
+struct Ready => [qw( future connecting )];
+
=head1 NAME
C<Net::Async::HTTP> - use HTTP with C<IO::Async>
@@ -323,7 +326,7 @@
push @{ $args{extensions} }, "SSL";
}
- $conn->connect(
+ my $f = $conn->connect(
host => $host,
service => $port,
( map { defined $self->{$_} ? ( $_ => $self->{$_} ) : () } qw( local_host local_port local_addrs local_addr ) ),
@@ -340,6 +343,8 @@
})->on_fail( sub {
$on_error->( $conn, "$host:$port - $_[0] failed [$_[-1]]" );
});
+
+ $f->on_ready( sub { undef $f } ); # intentionally cycle
}
sub get_connection
@@ -356,50 +361,56 @@
my $conns = $self->{connections}{$key} ||= [];
my $ready_queue = $self->{ready_queue}{$key} ||= [];
- my $f = $args{future} || $self->loop->new_future;
-
# Have a look to see if there are any idle connected ones first
foreach my $conn ( @$conns ) {
- $conn->is_idle and $conn->read_handle and return $f->done( $conn );
- }
-
- push @$ready_queue, $f unless $args{future};
-
- if( !$self->{max_connections_per_host} or @$conns < $self->{max_connections_per_host} ) {
- my $conn = Net::Async::HTTP::Connection->new(
- notifier_name => "$host:$port,connecting",
- ready_queue => $ready_queue,
- ( map { $_ => $self->{$_} }
- qw( max_in_flight pipeline read_len write_len decode_content ) ),
-
- on_closed => sub {
- my $conn = shift;
-
- $conn->remove_from_parent;
- @$conns = grep { $_ != $conn } @$conns;
- },
- );
-
- $self->add_child( $conn );
- push @$conns, $conn;
-
- $self->connect_connection( %args,
- conn => $conn,
- on_error => sub {
- my $conn = shift;
-
- $f->fail( @_ ) unless $f->is_cancelled;
-
- @$conns = grep { $_ != $conn } @$conns;
- @$ready_queue = grep { $_ != $f } @$ready_queue;
-
- if( @$ready_queue ) {
- # Requeue another connection attempt as there's still more to do
- $self->get_connection( %args, future => $ready_queue->[0] );
- }
- },
- );
- }
+ $conn->is_idle and $conn->read_handle and return Future->new->done( $conn );
+ }
+
+ my $ready = $args{ready};
+ $ready or push @$ready_queue, $ready = Ready( $self->loop->new_future, 0 );
+
+ my $f = $ready->future;
+
+ my $max = $self->{max_connections_per_host};
+ if( $max and @$conns >= $max ) {
+ return $f;
+ }
+
+ $ready->connecting++;
+
+ my $conn = Net::Async::HTTP::Connection->new(
+ notifier_name => "$host:$port,connecting",
+ ready_queue => $ready_queue,
+ ( map { $_ => $self->{$_} }
+ qw( max_in_flight pipeline read_len write_len decode_content ) ),
+
+ on_closed => sub {
+ my $conn = shift;
+
+ $conn->remove_from_parent;
+ @$conns = grep { $_ != $conn } @$conns;
+ },
+ );
+
+ $self->add_child( $conn );
+ push @$conns, $conn;
+
+ $self->connect_connection( %args,
+ conn => $conn,
+ on_error => sub {
+ my $conn = shift;
+
+ $f->fail( @_ ) unless $f->is_cancelled;
+
+ @$conns = grep { $_ != $conn } @$conns;
+ @$ready_queue = grep { $_ != $ready } @$ready_queue;
+
+ if( my ( $next ) = grep { !$_->connecting } @$ready_queue ) {
+ # Requeue another connection attempt as there's still more to do
+ $self->get_connection( %args, ready => $next );
+ }
+ },
+ );
return $f;
}
=== modified file 'lib/Net/Async/HTTP/Connection.pm'
--- lib/Net/Async/HTTP/Connection.pm 2014-02-20 14:39:05 +0000
+++ lib/Net/Async/HTTP/Connection.pm 2014-03-27 12:45:58 +0000
@@ -1,7 +1,7 @@
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
-# (C) Paul Evans, 2008-2013 -- leonerd@leonerd.org.uk
+# (C) Paul Evans, 2008-2014 -- leonerd@leonerd.org.uk
package Net::Async::HTTP::Connection;
@@ -20,10 +20,8 @@
my $CRLF = "\x0d\x0a"; # More portable than \r\n
-# Indices into responder/ready queue elements
-use constant ON_READ => 0;
-use constant ON_ERROR => 1;
-use constant IS_DONE => 2;
+use Struct::Dumb;
+struct Responder => [qw( on_read on_error is_done )];
# Detect whether HTTP::Message properly trims whitespace in header values. If
# it doesn't, we have to deploy a workaround to fix them up.
@@ -111,7 +109,7 @@
if( $self->should_pipeline ) {
$self->debug_printf( "READY pipelined" );
while( @$queue && $self->should_pipeline ) {
- my $f = shift @$queue;
+ my $f = ( shift @$queue )->future;
next if $f->is_cancelled;
$f->done( $self );
@@ -120,7 +118,7 @@
elsif( @$queue and $self->is_idle ) {
$self->debug_printf( "READY non-pipelined" );
while( @$queue ) {
- my $f = shift @$queue;
+ my $f = ( shift @$queue )->future;
next if $f->is_cancelled;
$f->done( $self );
@@ -153,12 +151,12 @@
my ( $buffref, $closed ) = @_;
if( my $head = $self->{responder_queue}[0] ) {
- my $ret = $head->[ON_READ]->( $self, $buffref, $closed, $head );
+ my $ret = $head->on_read->( $self, $buffref, $closed, $head );
if( defined $ret ) {
return $ret if !ref $ret;
- $head->[ON_READ] = $ret;
+ $head->on_read = $ret;
return 1;
}
@@ -184,7 +182,7 @@
my $self = shift;
while( my $head = shift @{ $self->{responder_queue} } ) {
- $head->[ON_ERROR]->( @_ ) unless $head->[IS_DONE];
+ $head->on_error->( @_ ) unless $head->is_done;
}
}
@@ -337,7 +335,7 @@
# 204 (No Content) nor 304 (Not Modified)
if( $method eq "HEAD" or $code =~ m/^1..$/ or $code eq "204" or $code eq "304" ) {
$self->debug_printf( "BODY done" );
- $responder->[IS_DONE]++;
+ $responder->is_done++;
$self->close if $connection_close;
$f->done( $on_body_chunk->() ) unless $f->is_cancelled;
$self->_request_done;
@@ -390,7 +388,7 @@
# TODO: Actually use the trailer
$self->debug_printf( "BODY done" );
- $responder->[IS_DONE]++;
+ $responder->is_done++;
my $final;
if( $decoder and not eval { $final = $decoder->(); 1 } ) {
@@ -443,7 +441,7 @@
if( $content_length == 0 ) {
$self->debug_printf( "BODY done" );
- $responder->[IS_DONE]++;
+ $responder->is_done++;
$f->done( $on_body_chunk->() ) unless $f->is_cancelled;
$self->_request_done;
return undef; # Finished
@@ -470,7 +468,7 @@
if( $content_length == 0 ) {
$self->debug_printf( "BODY done" );
- $responder->[IS_DONE]++;
+ $responder->is_done++;
$self->close if $connection_close;
my $final;
@@ -520,7 +518,7 @@
# TODO: IO::Async probably ought to do this. We need to fire the
# on_closed event _before_ calling on_body_chunk, to clear the
# connection cache in case another request comes - e.g. HEAD->GET
- $responder->[IS_DONE]++;
+ $responder->is_done++;
$self->close;
$self->debug_printf( "BODY done" );
@@ -590,10 +588,11 @@
$self->{requests_in_flight}++;
- push @{ $self->{responder_queue} }, [ $on_read, sub {
- # Protect against double-fail during ->error_all
- $f->fail( @_ ) unless $f->is_ready;
- } ];
+ push @{ $self->{responder_queue} }, Responder(
+ $on_read,
+ sub { $f->fail( @_ ) unless $f->is_ready; }, # on_error
+ 0, # is_done
+ );
return $f;
}
=== added file 't/90rt92904.t'
--- t/90rt92904.t 1970-01-01 00:00:00 +0000
+++ t/90rt92904.t 2014-03-27 12:45:58 +0000
@@ -0,0 +1,44 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+use Test::More;
+use IO::Async::Test;
+use IO::Async::Loop;
+
+use Net::Async::HTTP;
+
+my $loop = IO::Async::Loop->new;
+testing_loop( $loop );
+
+my $http = Net::Async::HTTP->new(
+ max_connections_per_host => 2,
+);
+
+$loop->add( $http );
+
+my @conn_f;
+no warnings 'redefine';
+local *IO::Async::Loop::connect = sub {
+ shift;
+ my %args = @_;
+ $args{host} eq "localhost" or die "Cannot fake connect - expected host 'localhost'";
+ $args{service} eq "5000" or die "Cannot fake connect - expected service '5000'";
+
+ push @conn_f, my $f = $loop->new_future;
+ return $f;
+};
+
+my @f = map { $http->do_request(uri=>'
http://localhost:5000/') } 0 .. 1;
+
+is( scalar @conn_f, 2, 'Two pending connect() attempts after two concurrent ->do_request' );
+
+# Fail them both
+( shift @conn_f )->fail( "Connection refused", connect => ) for 0 .. 1;
+
+ok( $f[$_]->is_ready && $f[$_]->failure, "Request [$_] Future fails after connect failure" ) for 0 .. 1;
+
+ok( !@conn_f, 'No more pending connect() attempts' );
+
+done_testing;