Yes, that bug seems valid.
I've adjusted the code slightly, renamed a few bits and changed formatting here and there, and also decided it would be neater to use Struct::Dumb for these queues.
Revised patch attached
--
Paul Evans
=== modified file 'Build.PL'
--- Build.PL 2013-10-10 17:13:29 +0000
+++ Build.PL 2014-07-24 07:21:22 +0000
@@ -13,6 +13,7 @@
'IO::Poll' => 0,
'Socket' => '2.007',
'Storable' => 0,
+ 'Struct::Dumb' => 0,
'Time::HiRes' => 0,
# Fails on perl 5.8.3 for unknown reasons
=== modified file 'lib/IO/Async/Stream.pm'
--- lib/IO/Async/Stream.pm 2014-07-11 14:16:53 +0000
+++ lib/IO/Async/Stream.pm 2014-07-24 07:39:51 +0000
@@ -1,7 +1,7 @@
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
-# (C) Paul Evans, 2006-2013 -- leonerd@leonerd.org.uk
+# (C) Paul Evans, 2006-2014 -- leonerd@leonerd.org.uk
package IO::Async::Stream;
@@ -25,15 +25,14 @@
our $READLEN = 8192;
our $WRITELEN = 8192;
-# Indicies in writequeue elements
-use constant WQ_DATA => 0;
-use constant WQ_WRITELEN => 1;
-use constant WQ_ON_WRITE => 2;
-use constant WQ_ON_FLUSH => 3;
-use constant WQ_WATCHING => 4;
-# Indicies into readqueue elements
-use constant RQ_ONREAD => 0;
-use constant RQ_FUTURE => 1;
+use Struct::Dumb;
+
+# Element of the writequeue
+struct Writer => [qw( data writelen on_write on_flush on_error watching )];
+
+# Element of the readqueue
+struct Reader => [qw( on_read future )];
+
# Bitfields in the want flags
use constant WANT_READ_FOR_READ => 0x01;
use constant WANT_READ_FOR_WRITE => 0x02;
@@ -199,8 +198,8 @@
{
my $self = shift;
- $self->{writequeue} = []; # Queue of ARRAYs of [ $data, $on_write, $on_flush ]
- $self->{readqueue} = []; # Queue of ARRAYs of [ CODE, $readfuture ]
+ $self->{writequeue} = []; # Queue of Writers
+ $self->{readqueue} = []; # Queue of Readers
$self->{writeable} = 1; # "innocent until proven guilty" (by means of EAGAIN)
$self->{readbuff} = "";
@@ -575,6 +574,10 @@
{
my $self = shift;
+ foreach ( @{ $self->{writequeue} } ) {
+ $_->on_error->( "stream closing" ) if $_->on_error;
+ }
+
undef @{ $self->{writequeue} };
undef $self->{stream_closing};
@@ -666,6 +669,13 @@
$on_flush->( $stream )
+=item on_error => CODE
+
+A CODE reference which will be invoked if a C<syswrite> error happens while
+performing this write. Invoked as for the C<Stream>'s C<on_write_error> event.
+
+ $on_error->( $stream, $errno )
+
=back
If the object is not yet a member of a loop and doesn't yet have a
@@ -704,57 +714,58 @@
my $writequeue = $self->{writequeue};
my $head;
- while( $head = $writequeue->[0] and ref $head->[WQ_DATA] ) {
- if( ref $head->[WQ_DATA] eq "CODE" ) {
- my $data = $head->[WQ_DATA]->( $self );
+ while( $head = $writequeue->[0] and ref $head->data ) {
+ if( ref $head->data eq "CODE" ) {
+ my $data = $head->data->( $self );
if( !defined $data ) {
- $head->[WQ_ON_FLUSH]->( $self ) if $head->[WQ_ON_FLUSH];
+ $head->on_flush->( $self ) if $head->on_flush;
shift @$writequeue;
return 1;
}
if( !ref $data and my $encoding = $self->{encoding} ) {
$data = $encoding->encode( $data );
}
- unshift @$writequeue, my $new = [ $data ];
- $new->[$_] = $head->[$_] for WQ_WRITELEN, WQ_ON_WRITE; # not ON_FLUSH
+ unshift @$writequeue, my $new = Writer(
+ $data, $head->writelen, $head->on_write, undef, undef, 0
+ );
next;
}
- elsif( blessed $head->[WQ_DATA] and $head->[WQ_DATA]->isa( "Future" ) ) {
- my $f = $head->[WQ_DATA];
+ elsif( blessed $head->data and $head->data->isa( "Future" ) ) {
+ my $f = $head->data;
if( !$f->is_ready ) {
- return 0 if $head->[WQ_WATCHING];
+ return 0 if $head->watching;
$f->on_ready( sub { $self->_flush_one_write } );
- $head->[WQ_WATCHING]++;
+ $head->watching++;
return 0;
}
my $data = $f->get;
if( !ref $data and my $encoding = $self->{encoding} ) {
$data = $encoding->encode( $data );
}
- $head->[WQ_DATA] = $data;
+ $head->data = $data;
next;
}
else {
- die "Unsure what to do with reference ".ref($head->[WQ_DATA])." in write queue";
+ die "Unsure what to do with reference ".ref($head->data)." in write queue";
}
}
my $second;
while( $second = $writequeue->[1] and
- !ref $second->[WQ_DATA] and
- $head->[WQ_WRITELEN] == $second->[WQ_WRITELEN] and
- !$head->[WQ_ON_WRITE] and !$second->[WQ_ON_WRITE] and
- !$head->[WQ_ON_FLUSH] ) {
- $head->[WQ_DATA] .= $second->[WQ_DATA];
- $head->[WQ_ON_WRITE] = $second->[WQ_ON_WRITE];
- $head->[WQ_ON_FLUSH] = $second->[WQ_ON_FLUSH];
+ !ref $second->data and
+ $head->writelen == $second->writelen and
+ !$head->on_write and !$second->on_write and
+ !$head->on_flush ) {
+ $head->data .= $second->data;
+ $head->on_write = $second->on_write;
+ $head->on_flush = $second->on_flush;
splice @$writequeue, 1, 1, ();
}
- die "TODO: head data does not contain a plain string" if ref $head->[WQ_DATA];
+ die "TODO: head data does not contain a plain string" if ref $head->data;
my $writer = $self->{writer};
- my $len = $self->$writer( $self->write_handle, $head->[WQ_DATA], $head->[WQ_WRITELEN] );
+ my $len = $self->$writer( $self->write_handle, $head->data, $head->writelen );
if( !defined $len ) {
my $errno = $!;
@@ -771,18 +782,19 @@
$self->maybe_invoke_event( on_write_eof => );
}
+ $head->on_error->( $self, $errno ) if $head->on_error;
$self->maybe_invoke_event( on_write_error => $errno )
or $self->close_now;
return 0;
}
- if( my $on_write = $head->[WQ_ON_WRITE] ) {
+ if( my $on_write = $head->on_write ) {
$on_write->( $self, $len );
}
- if( !length $head->[WQ_DATA] ) {
- $head->[WQ_ON_FLUSH]->( $self ) if $head->[WQ_ON_FLUSH];
+ if( !length $head->data ) {
+ $head->on_flush->( $self ) if $head->on_flush;
shift @{ $self->{writequeue} };
}
@@ -808,18 +820,30 @@
my $on_write = delete $params{on_write};
my $on_flush = delete $params{on_flush};
+ my $on_error = delete $params{on_error};
my $f;
if( defined wantarray ) {
my $orig_on_flush = $on_flush;
+ my $orig_on_error = $on_error;
$f = $self->loop->new_future;
$on_flush = sub {
$f->done;
$orig_on_flush->( @_ ) if $orig_on_flush;
};
+ $on_error = sub {
+ my $self = shift;
+ my ( $errno ) = @_;
+
+ $f->fail( "write failed: $errno", syswrite => $errno ) unless $f->is_ready;
+
+ $orig_on_error->( $self, @_ ) if $orig_on_error;
+ };
}
- push @{ $self->{writequeue} }, [ $data, $params{write_len} // $self->{write_len}, $on_write, $on_flush ];
+ push @{ $self->{writequeue} }, Writer(
+ $data, $params{write_len} // $self->{write_len}, $on_write, $on_flush, $on_error, 0
+ );
keys %params and croak "Unrecognised keys for ->write - " . join( ", ", keys %params );
@@ -875,7 +899,7 @@
my $readqueue = $self->{readqueue};
my $ret;
- if( $readqueue->[0] and my $on_read = $readqueue->[0][RQ_ONREAD] ) {
+ if( $readqueue->[0] and my $on_read = $readqueue->[0]->on_read ) {
$ret = $on_read->( $self, \$self->{readbuff}, $eof );
}
else {
@@ -890,7 +914,7 @@
if( ref $ret eq "CODE" ) {
# Replace the top CODE, or add it if there was none
- $readqueue->[0] = [ $ret ];
+ $readqueue->[0] = Reader( $ret, undef );
return 1;
}
elsif( @$readqueue and !defined $ret ) {
@@ -937,7 +961,7 @@
or $self->close_now;
foreach ( @{ $self->{readqueue} } ) {
- $_->[RQ_FUTURE]->fail( "read failed: $errno", sysread => $errno ) if $_->[RQ_FUTURE];
+ $_->future->fail( "read failed: $errno", sysread => $errno ) if $_->future;
}
undef @{ $self->{readqueue} };
@@ -960,7 +984,7 @@
$self->maybe_invoke_event( on_read_eof => );
$self->close_now if $self->{close_on_read_eof};
foreach ( @{ $self->{readqueue} } ) {
- $_->[RQ_FUTURE]->done( undef ) if $_->[RQ_FUTURE];
+ $_->future->done( undef ) if $_->future;
}
undef @{ $self->{readqueue} };
return;
@@ -1012,7 +1036,7 @@
my ( $on_read, %args ) = @_;
# %args undocumented for internal use
- push @{ $self->{readqueue} }, [ $on_read, $args{future} ];
+ push @{ $self->{readqueue} }, Reader( $on_read, $args{future} );
# TODO: Should this always defer?
1 while length $self->{readbuff} and $self->_flush_one_read( 0 );
=== modified file 't/21stream-2write.t'
--- t/21stream-2write.t 2013-10-10 17:31:29 +0000
+++ t/21stream-2write.t 2014-07-24 07:32:41 +0000
@@ -257,10 +257,10 @@
on_write_eof => sub { $eof++ },
);
- $stream->write( "Junk" );
-
$loop->add( $stream );
+ my $write_future = $stream->write( "Junk" );
+
$rd->close;
ok( !$stream->is_write_eof, '$stream->is_write_eof before wait' );
@@ -272,6 +272,9 @@
is( $eof, 1, 'EOF indication after wait' );
ok( !defined $stream->loop, 'EOF stream no longer member of Loop' );
+
+ ok( $write_future->is_ready,'write future ready after stream closed' );
+ ok( $write_future->is_failed,'write future failed after stream closed' );
}
# Close
@@ -456,12 +459,15 @@
$loop->add( $stream );
- $stream->write( "hello" );
+ my $write_future = $stream->write( "hello" );
wait_for { defined $write_errno };
cmp_ok( $write_errno, "==", ECONNRESET, 'errno after failed write' );
+ ok( $write_future->is_ready,'write future ready after failed write' );
+ ok( $write_future->is_failed,'write future failed after failed write' );
+
$loop->remove( $stream );
}