Skip Menu |

This queue is for tickets about the Net-Async-HTTP CPAN distribution.

Report information
The Basics
Id: 75382
Status: resolved
Priority: 0/
Queue: Net-Async-HTTP

People
Owner: Nobody in particular
Requestors: leonerd-cpan [...] leonerd.org.uk
Cc: TEAM [...] cpan.org
AdminCc:

Bug Information
Severity: (no value)
Broken in: 0.16
Fixed in: 0.17



Subject: Too aggressive in HTTP/1.1 pipelining
NaHTTP ought to wait for the first response on a server, to check that it does support HTTP/1.1, before trying to pipeline more requests. Until the first response comes back indicating it can, all pending requests should be queued. -- Paul Evans
Fixed by attached patch (minus unit tests). Will be in next version. -- Paul Evans
Subject: rt75382.patch
=== modified file 'lib/Net/Async/HTTP.pm' --- lib/Net/Async/HTTP.pm 2012-02-23 19:08:44 +0000 +++ lib/Net/Async/HTTP.pm 2012-02-29 21:12:08 +0000 @@ -1,7 +1,7 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2008-2011 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2008-2012 -- leonerd@leonerd.org.uk package Net::Async::HTTP; @@ -116,6 +116,10 @@ Optional. A reference to a L<HTTP::Cookies> object. Will be used to set cookies in requests and store them from responses. +=item pipeline => BOOL + +Optional. If false, disables HTTP/1.1-style request pipelining. + =back =cut @@ -125,14 +129,15 @@ my $self = shift; my %params = @_; - foreach (qw( user_agent max_redirects timeout proxy_host proxy_port cookie_jar )) { + foreach (qw( user_agent max_redirects timeout proxy_host proxy_port cookie_jar pipeline )) { $self->{$_} = delete $params{$_} if exists $params{$_}; } $self->SUPER::configure( %params ); - defined $self->{user_agent} or $self->{user_agent} = $DEFAULT_UA; + defined $self->{user_agent} or $self->{user_agent} = $DEFAULT_UA; defined $self->{max_redirects} or $self->{max_redirects} = $DEFAULT_MAXREDIR; + defined $self->{pipeline} or $self->{pipeline} = 1; } =head1 METHODS @@ -156,12 +161,13 @@ my $key = "$host:$port"; if( my $conn = $connections->{$key} ) { - $conn->run_when_ready( $on_ready ); + $conn->run_when_ready( $on_ready, $on_error ); return; } my $conn = Net::Async::HTTP::Protocol->new( notifier_name => $key, + pipeline => $self->{pipeline}, on_closed => sub { delete $connections->{$key}; }, @@ -199,7 +205,7 @@ %args, ); - $conn->run_when_ready( $on_ready ); + $conn->run_when_ready( $on_ready, $on_error ); } =head2 $http->do_request( %args ) === modified file 'lib/Net/Async/HTTP/Protocol.pm' --- lib/Net/Async/HTTP/Protocol.pm 2012-02-23 19:08:44 +0000 +++ lib/Net/Async/HTTP/Protocol.pm 2012-02-29 21:12:08 +0000 @@ -18,8 +18,10 @@ my $CRLF = "\x0d\x0a"; # More portable than \r\n -# Indices into responder_queue elements +# Indices into responder/ready queue elements +# Honestly, these would be much neater with Futures... use constant ON_READ => 0; +use constant ON_READY => 0; use constant ON_ERROR => 1; # Detect whether HTTP::Message properly trims whitespace in header values. If @@ -38,32 +40,81 @@ =cut +sub _init +{ + my $self = shift; + + $self->{outstanding_requests} = 0; +} + +sub configure +{ + my $self = shift; + my %params = @_; + + foreach (qw( pipeline )) { + $self->{$_} = delete $params{$_} if exists $params{$_}; + } + + $self->SUPER::configure( %params ); +} + +sub should_pipeline +{ + my $self = shift; + return $self->{pipeline} && $self->{can_pipeline}; +} + sub connect { my $self = shift; $self->SUPER::connect( @_, - on_connected => sub { - my $self = shift; - - if( my $queue = delete $self->{on_ready_queue} ) { - $_->( $self ) for @$queue; - } - }, + on_connected => $self->can('ready'), ); } +sub ready +{ + my $self = shift; + + my $queue = $self->{on_ready_queue} or return; + + if( $self->should_pipeline ) { + $self->debug_printf( "READY pipelined" ); + $self->{on_ready_queue} = []; + $_->[ON_READY]->( $self ) for @$queue; + } + elsif( @$queue ) { + $self->debug_printf( "READY non-pipelined" ); + ( shift @$queue )->[ON_READY]->( $self ); + } +} + +sub is_idle +{ + my $self = shift; + return $self->{outstanding_requests} == 0; +} + +sub _request_done +{ + my $self = shift; + $self->{outstanding_requests}--; + $self->ready; +} + sub run_when_ready { my $self = shift; - my ( $on_ready ) = @_; + my ( $on_ready, $on_error ) = @_; - if( $self->transport ) { + if( $self->transport and ( $self->should_pipeline or $self->is_idle ) ) { $on_ready->( $self ); } else { - push @{ $self->{on_ready_queue} }, $on_ready; + push @{ $self->{on_ready_queue} }, [ $on_ready, $on_error ]; } } @@ -100,6 +151,10 @@ while( my $head = shift @{ $self->{responder_queue} } ) { $head->[ON_ERROR]->( @_ ); } + + while( my $head = shift @{ $self->{on_ready_queue} } ) { + $head->[ON_ERROR]->( @_ ); + } } sub request @@ -144,6 +199,11 @@ $header->header( @headers ); } + my $protocol = $header->protocol; + if( $protocol =~ m{^HTTP/1\.(\d+)$} and $1 >= 1 ) { + $self->{can_pipeline} = 1; + } + $header->request( $req ); $header->previous( $args{previous_response} ) if $args{previous_response}; @@ -160,6 +220,7 @@ $self->debug_printf( "BODY done" ); $self->close if $connection_close; $on_body_chunk->(); + $self->_request_done; return undef; # Finished } @@ -199,6 +260,7 @@ $self->debug_printf( "BODY done" ); $on_body_chunk->(); + $self->_request_done; return undef; # Finished } } @@ -250,6 +312,7 @@ $self->debug_printf( "BODY done" ); $self->close if $connection_close; $on_body_chunk->(); + $self->_request_done; return undef; } @@ -279,6 +342,7 @@ $self->debug_printf( "BODY done" ); $on_body_chunk->(); # $self already closed + $self->_request_done; return undef; }; } @@ -308,6 +372,8 @@ $self->write( $request_body ) if $request_body; + $self->{outstanding_requests}++; + push @{ $self->{responder_queue} }, [ $on_read, $on_error ]; }
This was released in 0.17 but I forgot to update the ticket. -- Paul Evans