Skip Menu |

This queue is for tickets about the Net-Async-HTTP CPAN distribution.

Report information
The Basics
Id: 92904
Status: resolved
Priority: 0/
Queue: Net-Async-HTTP

People
Owner: leonerd-cpan [...] leonerd.org.uk
Requestors: dakkar [...] thenautilus.net
Cc:
AdminCc:

Bug Information
Severity: Important
Broken in: 0.33
Fixed in: 0.34



Subject: Futures in ready-queue are double-failed when max_connections_per_host!=1
First attachment: minimal test case. Second attachment: a patch that seems to fix the issue I'm not sure what goes wrong in the released version, but I've made the failure code work the same way as the success one: grab a non-cancelled future from the ready-queue, and fail it. Problem: passing a future to get_connection does not work anymore, and this is probably a bug. On the other hand, it was never documented…
Subject: client-test.pl
#!/usr/bin/env perl use strict; use warnings; use 5.10.0; use Data::Printer; use Net::Async::HTTP; use IO::Async::Loop; my $client = Net::Async::HTTP->new( max_connections_per_host => 2, ); my $loop = IO::Async::Loop->new; $loop->add($client); my @f = map { $client->do_request(uri=>'http://localhost:5000/') ->then( sub{say "OK:".p(@_);Future->wrap()}, sub{say "Fail:".p(@_);Future->wrap()}, ); } 1..2; $loop->run;
Subject: maybe-fix-double-fail.patch
diff --git c/lib/Net/Async/HTTP.pm w/lib/Net/Async/HTTP.pm index a42b018..60ccac5 100644 --- c/lib/Net/Async/HTTP.pm +++ w/lib/Net/Async/HTTP.pm @@ -388,10 +388,15 @@ sub get_connection on_error => sub { my $conn = shift; - $f->fail( @_ ) unless $f->is_cancelled; + while( @$ready_queue ) { + my $f = shift @$ready_queue; + next if $f->is_cancelled; + + $f->fail( @_ ); + last; + } @$conns = grep { $_ != $conn } @$conns; - @$ready_queue = grep { $_ != $f } @$ready_queue; if( @$ready_queue ) { # Requeue another connection attempt as there's still more to do
On Mon Feb 10 09:48:24 2014, DAKKAR wrote: Show quoted text
> First attachment: minimal test case.
Indeed; (minus the redundant use Data::Printer) I get: $ perl rt92904.pl IO::Async::Future=HASH(0x1fc7558) is already failed and cannot be ->fail'ed at /usr/share/perl5/Net/Async/HTTP.pm line 391. -- Paul Evans
On Mon Feb 10 09:48:24 2014, DAKKAR wrote: Show quoted text
> Second attachment: a patch that seems to fix the issue > > I'm not sure what goes wrong in the released version, but I've made > the failure code work the same way as the success one: grab a non- > cancelled future from the ready-queue, and fail it.
I don't think that's the right solution; I think the problem is in fact somewhat earlier than this. Despite there being only two ->GET calls and two Futures, in total three connection attempts are made: $ IO_ASYNC_DEBUG=1 perl rt92904.pl [Na:HTTP::Connection{localhost:5000,connecting}<-Na:HTTP] CONNECT localhost:5000 [Na:HTTP::Connection{localhost:5000,connecting}<-Na:HTTP] CONNECT localhost:5000 [Ia:Stream{r=8}<-Ia:Channel<-Ia:Function::Worker<-Ia:Resolver] EVENT on_read [Ia:Stream{r=5}<-Ia:Channel<-Ia:Function::Worker<-Ia:Resolver] EVENT on_read [Na:HTTP::Connection{localhost:5000,connecting}<-Na:HTTP] CONNECT localhost:5000 [Ia:Stream{r=5}<-Ia:Channel<-Ia:Function::Worker<-Ia:Resolver] EVENT on_read IO::Async::Future=HASH(0x1522f10) is already failed and cannot be ->fail'ed at /usr/share/perl5/Net/Async/HTTP.pm line 391. I suspect getting to the bottom of this will solve it. -- Paul Evans
Found it. This was due to a second attempt to connect() to the host for the second request after the first request failed, due to second request still living in the ready queue. Solved by keeping track of which ready queue items are already connect-pending, and skipping over those. -- Paul Evans
Subject: rt92904.patch
=== modified file 'Build.PL' --- Build.PL 2013-12-30 23:22:21 +0000 +++ Build.PL 2014-03-27 12:34:15 +0000 @@ -14,6 +14,7 @@ 'IO::Async::Loop' => '0.59', 'IO::Async::Stream' => '0.59', 'IO::Async::Timer::Countdown' => 0, + 'Struct::Dumb' => 0, 'URI' => 0, }, recommends => { === modified file 'lib/Net/Async/HTTP.pm' --- lib/Net/Async/HTTP.pm 2014-02-20 14:39:05 +0000 +++ lib/Net/Async/HTTP.pm 2014-03-27 12:45:58 +0000 @@ -1,7 +1,7 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2008-2013 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2008-2014 -- leonerd@leonerd.org.uk package Net::Async::HTTP; @@ -53,6 +53,9 @@ use constant READ_LEN => 64*1024; # 64 KiB use constant WRITE_LEN => 64*1024; # 64 KiB +use Struct::Dumb; +struct Ready => [qw( future connecting )]; + =head1 NAME C<Net::Async::HTTP> - use HTTP with C<IO::Async> @@ -323,7 +326,7 @@ push @{ $args{extensions} }, "SSL"; } - $conn->connect( + my $f = $conn->connect( host => $host, service => $port, ( map { defined $self->{$_} ? ( $_ => $self->{$_} ) : () } qw( local_host local_port local_addrs local_addr ) ), @@ -340,6 +343,8 @@ })->on_fail( sub { $on_error->( $conn, "$host:$port - $_[0] failed [$_[-1]]" ); }); + + $f->on_ready( sub { undef $f } ); # intentionally cycle } sub get_connection @@ -356,50 +361,56 @@ my $conns = $self->{connections}{$key} ||= []; my $ready_queue = $self->{ready_queue}{$key} ||= []; - my $f = $args{future} || $self->loop->new_future; - # Have a look to see if there are any idle connected ones first foreach my $conn ( @$conns ) { - $conn->is_idle and $conn->read_handle and return $f->done( $conn ); - } - - push @$ready_queue, $f unless $args{future}; - - if( !$self->{max_connections_per_host} or @$conns < $self->{max_connections_per_host} ) { - my $conn = Net::Async::HTTP::Connection->new( - notifier_name => "$host:$port,connecting", - ready_queue => $ready_queue, - ( map { $_ => $self->{$_} } - qw( max_in_flight pipeline read_len write_len decode_content ) ), - - on_closed => sub { - my $conn = shift; - - $conn->remove_from_parent; - @$conns = grep { $_ != $conn } @$conns; - }, - ); - - $self->add_child( $conn ); - push @$conns, $conn; - - $self->connect_connection( %args, - conn => $conn, - on_error => sub { - my $conn = shift; - - $f->fail( @_ ) unless $f->is_cancelled; - - @$conns = grep { $_ != $conn } @$conns; - @$ready_queue = grep { $_ != $f } @$ready_queue; - - if( @$ready_queue ) { - # Requeue another connection attempt as there's still more to do - $self->get_connection( %args, future => $ready_queue->[0] ); - } - }, - ); - } + $conn->is_idle and $conn->read_handle and return Future->new->done( $conn ); + } + + my $ready = $args{ready}; + $ready or push @$ready_queue, $ready = Ready( $self->loop->new_future, 0 ); + + my $f = $ready->future; + + my $max = $self->{max_connections_per_host}; + if( $max and @$conns >= $max ) { + return $f; + } + + $ready->connecting++; + + my $conn = Net::Async::HTTP::Connection->new( + notifier_name => "$host:$port,connecting", + ready_queue => $ready_queue, + ( map { $_ => $self->{$_} } + qw( max_in_flight pipeline read_len write_len decode_content ) ), + + on_closed => sub { + my $conn = shift; + + $conn->remove_from_parent; + @$conns = grep { $_ != $conn } @$conns; + }, + ); + + $self->add_child( $conn ); + push @$conns, $conn; + + $self->connect_connection( %args, + conn => $conn, + on_error => sub { + my $conn = shift; + + $f->fail( @_ ) unless $f->is_cancelled; + + @$conns = grep { $_ != $conn } @$conns; + @$ready_queue = grep { $_ != $ready } @$ready_queue; + + if( my ( $next ) = grep { !$_->connecting } @$ready_queue ) { + # Requeue another connection attempt as there's still more to do + $self->get_connection( %args, ready => $next ); + } + }, + ); return $f; } === modified file 'lib/Net/Async/HTTP/Connection.pm' --- lib/Net/Async/HTTP/Connection.pm 2014-02-20 14:39:05 +0000 +++ lib/Net/Async/HTTP/Connection.pm 2014-03-27 12:45:58 +0000 @@ -1,7 +1,7 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2008-2013 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2008-2014 -- leonerd@leonerd.org.uk package Net::Async::HTTP::Connection; @@ -20,10 +20,8 @@ my $CRLF = "\x0d\x0a"; # More portable than \r\n -# Indices into responder/ready queue elements -use constant ON_READ => 0; -use constant ON_ERROR => 1; -use constant IS_DONE => 2; +use Struct::Dumb; +struct Responder => [qw( on_read on_error is_done )]; # Detect whether HTTP::Message properly trims whitespace in header values. If # it doesn't, we have to deploy a workaround to fix them up. @@ -111,7 +109,7 @@ if( $self->should_pipeline ) { $self->debug_printf( "READY pipelined" ); while( @$queue && $self->should_pipeline ) { - my $f = shift @$queue; + my $f = ( shift @$queue )->future; next if $f->is_cancelled; $f->done( $self ); @@ -120,7 +118,7 @@ elsif( @$queue and $self->is_idle ) { $self->debug_printf( "READY non-pipelined" ); while( @$queue ) { - my $f = shift @$queue; + my $f = ( shift @$queue )->future; next if $f->is_cancelled; $f->done( $self ); @@ -153,12 +151,12 @@ my ( $buffref, $closed ) = @_; if( my $head = $self->{responder_queue}[0] ) { - my $ret = $head->[ON_READ]->( $self, $buffref, $closed, $head ); + my $ret = $head->on_read->( $self, $buffref, $closed, $head ); if( defined $ret ) { return $ret if !ref $ret; - $head->[ON_READ] = $ret; + $head->on_read = $ret; return 1; } @@ -184,7 +182,7 @@ my $self = shift; while( my $head = shift @{ $self->{responder_queue} } ) { - $head->[ON_ERROR]->( @_ ) unless $head->[IS_DONE]; + $head->on_error->( @_ ) unless $head->is_done; } } @@ -337,7 +335,7 @@ # 204 (No Content) nor 304 (Not Modified) if( $method eq "HEAD" or $code =~ m/^1..$/ or $code eq "204" or $code eq "304" ) { $self->debug_printf( "BODY done" ); - $responder->[IS_DONE]++; + $responder->is_done++; $self->close if $connection_close; $f->done( $on_body_chunk->() ) unless $f->is_cancelled; $self->_request_done; @@ -390,7 +388,7 @@ # TODO: Actually use the trailer $self->debug_printf( "BODY done" ); - $responder->[IS_DONE]++; + $responder->is_done++; my $final; if( $decoder and not eval { $final = $decoder->(); 1 } ) { @@ -443,7 +441,7 @@ if( $content_length == 0 ) { $self->debug_printf( "BODY done" ); - $responder->[IS_DONE]++; + $responder->is_done++; $f->done( $on_body_chunk->() ) unless $f->is_cancelled; $self->_request_done; return undef; # Finished @@ -470,7 +468,7 @@ if( $content_length == 0 ) { $self->debug_printf( "BODY done" ); - $responder->[IS_DONE]++; + $responder->is_done++; $self->close if $connection_close; my $final; @@ -520,7 +518,7 @@ # TODO: IO::Async probably ought to do this. We need to fire the # on_closed event _before_ calling on_body_chunk, to clear the # connection cache in case another request comes - e.g. HEAD->GET - $responder->[IS_DONE]++; + $responder->is_done++; $self->close; $self->debug_printf( "BODY done" ); @@ -590,10 +588,11 @@ $self->{requests_in_flight}++; - push @{ $self->{responder_queue} }, [ $on_read, sub { - # Protect against double-fail during ->error_all - $f->fail( @_ ) unless $f->is_ready; - } ]; + push @{ $self->{responder_queue} }, Responder( + $on_read, + sub { $f->fail( @_ ) unless $f->is_ready; }, # on_error + 0, # is_done + ); return $f; } === added file 't/90rt92904.t' --- t/90rt92904.t 1970-01-01 00:00:00 +0000 +++ t/90rt92904.t 2014-03-27 12:45:58 +0000 @@ -0,0 +1,44 @@ +#!/usr/bin/perl + +use strict; +use warnings; + +use Test::More; +use IO::Async::Test; +use IO::Async::Loop; + +use Net::Async::HTTP; + +my $loop = IO::Async::Loop->new; +testing_loop( $loop ); + +my $http = Net::Async::HTTP->new( + max_connections_per_host => 2, +); + +$loop->add( $http ); + +my @conn_f; +no warnings 'redefine'; +local *IO::Async::Loop::connect = sub { + shift; + my %args = @_; + $args{host} eq "localhost" or die "Cannot fake connect - expected host 'localhost'"; + $args{service} eq "5000" or die "Cannot fake connect - expected service '5000'"; + + push @conn_f, my $f = $loop->new_future; + return $f; +}; + +my @f = map { $http->do_request(uri=>'http://localhost:5000/') } 0 .. 1; + +is( scalar @conn_f, 2, 'Two pending connect() attempts after two concurrent ->do_request' ); + +# Fail them both +( shift @conn_f )->fail( "Connection refused", connect => ) for 0 .. 1; + +ok( $f[$_]->is_ready && $f[$_]->failure, "Request [$_] Future fails after connect failure" ) for 0 .. 1; + +ok( !@conn_f, 'No more pending connect() attempts' ); + +done_testing;
Released in 0.34 -- Paul Evans