Fixed by attached patch (minus unit tests).
Will be in next version.
--
Paul Evans
=== modified file 'lib/Net/Async/HTTP.pm'
--- lib/Net/Async/HTTP.pm 2012-02-23 19:08:44 +0000
+++ lib/Net/Async/HTTP.pm 2012-02-29 21:12:08 +0000
@@ -1,7 +1,7 @@
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
-# (C) Paul Evans, 2008-2011 -- leonerd@leonerd.org.uk
+# (C) Paul Evans, 2008-2012 -- leonerd@leonerd.org.uk
package Net::Async::HTTP;
@@ -116,6 +116,10 @@
Optional. A reference to a L<HTTP::Cookies> object. Will be used to set
cookies in requests and store them from responses.
+=item pipeline => BOOL
+
+Optional. If false, disables HTTP/1.1-style request pipelining.
+
=back
=cut
@@ -125,14 +129,15 @@
my $self = shift;
my %params = @_;
- foreach (qw( user_agent max_redirects timeout proxy_host proxy_port cookie_jar )) {
+ foreach (qw( user_agent max_redirects timeout proxy_host proxy_port cookie_jar pipeline )) {
$self->{$_} = delete $params{$_} if exists $params{$_};
}
$self->SUPER::configure( %params );
- defined $self->{user_agent} or $self->{user_agent} = $DEFAULT_UA;
+ defined $self->{user_agent} or $self->{user_agent} = $DEFAULT_UA;
defined $self->{max_redirects} or $self->{max_redirects} = $DEFAULT_MAXREDIR;
+ defined $self->{pipeline} or $self->{pipeline} = 1;
}
=head1 METHODS
@@ -156,12 +161,13 @@
my $key = "$host:$port";
if( my $conn = $connections->{$key} ) {
- $conn->run_when_ready( $on_ready );
+ $conn->run_when_ready( $on_ready, $on_error );
return;
}
my $conn = Net::Async::HTTP::Protocol->new(
notifier_name => $key,
+ pipeline => $self->{pipeline},
on_closed => sub {
delete $connections->{$key};
},
@@ -199,7 +205,7 @@
%args,
);
- $conn->run_when_ready( $on_ready );
+ $conn->run_when_ready( $on_ready, $on_error );
}
=head2 $http->do_request( %args )
=== modified file 'lib/Net/Async/HTTP/Protocol.pm'
--- lib/Net/Async/HTTP/Protocol.pm 2012-02-23 19:08:44 +0000
+++ lib/Net/Async/HTTP/Protocol.pm 2012-02-29 21:12:08 +0000
@@ -18,8 +18,10 @@
my $CRLF = "\x0d\x0a"; # More portable than \r\n
-# Indices into responder_queue elements
+# Indices into responder/ready queue elements
+# Honestly, these would be much neater with Futures...
use constant ON_READ => 0;
+use constant ON_READY => 0;
use constant ON_ERROR => 1;
# Detect whether HTTP::Message properly trims whitespace in header values. If
@@ -38,32 +40,81 @@
=cut
+sub _init
+{
+ my $self = shift;
+
+ $self->{outstanding_requests} = 0;
+}
+
+sub configure
+{
+ my $self = shift;
+ my %params = @_;
+
+ foreach (qw( pipeline )) {
+ $self->{$_} = delete $params{$_} if exists $params{$_};
+ }
+
+ $self->SUPER::configure( %params );
+}
+
+sub should_pipeline
+{
+ my $self = shift;
+ return $self->{pipeline} && $self->{can_pipeline};
+}
+
sub connect
{
my $self = shift;
$self->SUPER::connect(
@_,
- on_connected => sub {
- my $self = shift;
-
- if( my $queue = delete $self->{on_ready_queue} ) {
- $_->( $self ) for @$queue;
- }
- },
+ on_connected => $self->can('ready'),
);
}
+sub ready
+{
+ my $self = shift;
+
+ my $queue = $self->{on_ready_queue} or return;
+
+ if( $self->should_pipeline ) {
+ $self->debug_printf( "READY pipelined" );
+ $self->{on_ready_queue} = [];
+ $_->[ON_READY]->( $self ) for @$queue;
+ }
+ elsif( @$queue ) {
+ $self->debug_printf( "READY non-pipelined" );
+ ( shift @$queue )->[ON_READY]->( $self );
+ }
+}
+
+sub is_idle
+{
+ my $self = shift;
+ return $self->{outstanding_requests} == 0;
+}
+
+sub _request_done
+{
+ my $self = shift;
+ $self->{outstanding_requests}--;
+ $self->ready;
+}
+
sub run_when_ready
{
my $self = shift;
- my ( $on_ready ) = @_;
+ my ( $on_ready, $on_error ) = @_;
- if( $self->transport ) {
+ if( $self->transport and ( $self->should_pipeline or $self->is_idle ) ) {
$on_ready->( $self );
}
else {
- push @{ $self->{on_ready_queue} }, $on_ready;
+ push @{ $self->{on_ready_queue} }, [ $on_ready, $on_error ];
}
}
@@ -100,6 +151,10 @@
while( my $head = shift @{ $self->{responder_queue} } ) {
$head->[ON_ERROR]->( @_ );
}
+
+ while( my $head = shift @{ $self->{on_ready_queue} } ) {
+ $head->[ON_ERROR]->( @_ );
+ }
}
sub request
@@ -144,6 +199,11 @@
$header->header( @headers );
}
+ my $protocol = $header->protocol;
+ if( $protocol =~ m{^HTTP/1\.(\d+)$} and $1 >= 1 ) {
+ $self->{can_pipeline} = 1;
+ }
+
$header->request( $req );
$header->previous( $args{previous_response} ) if $args{previous_response};
@@ -160,6 +220,7 @@
$self->debug_printf( "BODY done" );
$self->close if $connection_close;
$on_body_chunk->();
+ $self->_request_done;
return undef; # Finished
}
@@ -199,6 +260,7 @@
$self->debug_printf( "BODY done" );
$on_body_chunk->();
+ $self->_request_done;
return undef; # Finished
}
}
@@ -250,6 +312,7 @@
$self->debug_printf( "BODY done" );
$self->close if $connection_close;
$on_body_chunk->();
+ $self->_request_done;
return undef;
}
@@ -279,6 +342,7 @@
$self->debug_printf( "BODY done" );
$on_body_chunk->();
# $self already closed
+ $self->_request_done;
return undef;
};
}
@@ -308,6 +372,8 @@
$self->write( $request_body ) if $request_body;
+ $self->{outstanding_requests}++;
+
push @{ $self->{responder_queue} }, [ $on_read, $on_error ];
}