Skip Menu |

This queue is for tickets about the IO-Async CPAN distribution.

Report information
The Basics
Id: 97433
Status: resolved
Priority: 0/
Queue: IO-Async

People
Owner: leonerd-cpan [...] leonerd.org.uk
Requestors: DAKKAR [...] cpan.org
Cc:
AdminCc:

Bug Information
Severity: (no value)
Broken in: 0.63
Fixed in: 0.64



Subject: Stream's write futures are not failed on close
$stream->write(...) returns a Future, that gets done when the write has completed. If the stream closes before the write has completed, the future is orphaned. The attached patch fails the future in case of write failure or stream closure. It also adds a on_fail callback to write.
Subject: fail-write-futures.patch
diff --git i/lib/IO/Async/Stream.pm w/lib/IO/Async/Stream.pm index 4762845..ffecb6e 100644 --- i/lib/IO/Async/Stream.pm +++ w/lib/IO/Async/Stream.pm @@ -30,7 +30,8 @@ use constant WQ_DATA => 0; use constant WQ_WRITELEN => 1; use constant WQ_ON_WRITE => 2; use constant WQ_ON_FLUSH => 3; -use constant WQ_WATCHING => 4; +use constant WQ_ON_FAIL => 4; +use constant WQ_WATCHING => 5; # Indicies into readqueue elements use constant RQ_ONREAD => 0; use constant RQ_FUTURE => 1; @@ -575,6 +576,10 @@ sub close_now { my $self = shift; + for my $queue_element (@{ $self->{writequeue} }) { + # invoke failure handlers + $queue_element->[WQ_ON_FAIL]->("stream closing") if $queue_element->[WQ_ON_FAIL]; + } undef @{ $self->{writequeue} }; undef $self->{stream_closing}; @@ -704,7 +709,7 @@ sub _flush_one_write my $writequeue = $self->{writequeue}; my $head; - while( $head = $writequeue->[0] and ref $head->[WQ_DATA] ) { + while( $head = $writequeue->[WQ_DATA] and ref $head->[WQ_DATA] ) { if( ref $head->[WQ_DATA] eq "CODE" ) { my $data = $head->[WQ_DATA]->( $self ); if( !defined $data ) { @@ -740,7 +745,7 @@ sub _flush_one_write } my $second; - while( $second = $writequeue->[1] and + while( $second = $writequeue->[WQ_WRITELEN] and !ref $second->[WQ_DATA] and $head->[WQ_WRITELEN] == $second->[WQ_WRITELEN] and !$head->[WQ_ON_WRITE] and !$second->[WQ_ON_WRITE] and @@ -771,6 +776,7 @@ sub _flush_one_write $self->maybe_invoke_event( on_write_eof => ); } + $head->[4]->("write error: $errno") if $head->[4]; # invoke failure handler $self->maybe_invoke_event( on_write_error => $errno ) or $self->close_now; @@ -808,18 +814,24 @@ sub write my $on_write = delete $params{on_write}; my $on_flush = delete $params{on_flush}; + my $on_fail = delete $params{on_close}; my $f; if( defined wantarray ) { my $orig_on_flush = $on_flush; + my $orig_on_fail = $on_fail; $f = $self->loop->new_future; $on_flush = sub { $f->done; $orig_on_flush->( @_ ) if $orig_on_flush; }; + $on_fail = sub { + $f->fail("write failed: @_") unless $f->is_ready; + $orig_on_fail->( @_ ) if $orig_on_fail; + }; } - push @{ $self->{writequeue} }, [ $data, $params{write_len} // $self->{write_len}, $on_write, $on_flush ]; + push @{ $self->{writequeue} }, [ $data, $params{write_len} // $self->{write_len}, $on_write, $on_flush, $on_fail ]; keys %params and croak "Unrecognised keys for ->write - " . join( ", ", keys %params ); diff --git i/t/21stream-2write.t w/t/21stream-2write.t index f110df4..61bb7e6 100644 --- i/t/21stream-2write.t +++ w/t/21stream-2write.t @@ -257,10 +257,10 @@ SKIP: { on_write_eof => sub { $eof++ }, ); - $stream->write( "Junk" ); - $loop->add( $stream ); + my $write_future = $stream->write( "Junk" ); + $rd->close; ok( !$stream->is_write_eof, '$stream->is_write_eof before wait' ); @@ -272,6 +272,9 @@ SKIP: { is( $eof, 1, 'EOF indication after wait' ); ok( !defined $stream->loop, 'EOF stream no longer member of Loop' ); + + ok($write_future->is_ready,'write future ready after stream closed'); + ok($write_future->is_failed,'write future failed after stream closed'); } # Close @@ -456,12 +459,15 @@ SKIP: { $loop->add( $stream ); - $stream->write( "hello" ); + my $write_future = $stream->write( "hello" ); wait_for { defined $write_errno }; cmp_ok( $write_errno, "==", ECONNRESET, 'errno after failed write' ); + ok($write_future->is_ready,'write future ready after failed write'); + ok($write_future->is_failed,'write future failed after failed write'); + $loop->remove( $stream ); }
Yes, that bug seems valid. I've adjusted the code slightly, renamed a few bits and changed formatting here and there, and also decided it would be neater to use Struct::Dumb for these queues. Revised patch attached -- Paul Evans
Subject: rt97433.patch
=== modified file 'Build.PL' --- Build.PL 2013-10-10 17:13:29 +0000 +++ Build.PL 2014-07-24 07:21:22 +0000 @@ -13,6 +13,7 @@ 'IO::Poll' => 0, 'Socket' => '2.007', 'Storable' => 0, + 'Struct::Dumb' => 0, 'Time::HiRes' => 0, # Fails on perl 5.8.3 for unknown reasons === modified file 'lib/IO/Async/Stream.pm' --- lib/IO/Async/Stream.pm 2014-07-11 14:16:53 +0000 +++ lib/IO/Async/Stream.pm 2014-07-24 07:39:51 +0000 @@ -1,7 +1,7 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2006-2013 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2006-2014 -- leonerd@leonerd.org.uk package IO::Async::Stream; @@ -25,15 +25,14 @@ our $READLEN = 8192; our $WRITELEN = 8192; -# Indicies in writequeue elements -use constant WQ_DATA => 0; -use constant WQ_WRITELEN => 1; -use constant WQ_ON_WRITE => 2; -use constant WQ_ON_FLUSH => 3; -use constant WQ_WATCHING => 4; -# Indicies into readqueue elements -use constant RQ_ONREAD => 0; -use constant RQ_FUTURE => 1; +use Struct::Dumb; + +# Element of the writequeue +struct Writer => [qw( data writelen on_write on_flush on_error watching )]; + +# Element of the readqueue +struct Reader => [qw( on_read future )]; + # Bitfields in the want flags use constant WANT_READ_FOR_READ => 0x01; use constant WANT_READ_FOR_WRITE => 0x02; @@ -199,8 +198,8 @@ { my $self = shift; - $self->{writequeue} = []; # Queue of ARRAYs of [ $data, $on_write, $on_flush ] - $self->{readqueue} = []; # Queue of ARRAYs of [ CODE, $readfuture ] + $self->{writequeue} = []; # Queue of Writers + $self->{readqueue} = []; # Queue of Readers $self->{writeable} = 1; # "innocent until proven guilty" (by means of EAGAIN) $self->{readbuff} = ""; @@ -575,6 +574,10 @@ { my $self = shift; + foreach ( @{ $self->{writequeue} } ) { + $_->on_error->( "stream closing" ) if $_->on_error; + } + undef @{ $self->{writequeue} }; undef $self->{stream_closing}; @@ -666,6 +669,13 @@ $on_flush->( $stream ) +=item on_error => CODE + +A CODE reference which will be invoked if a C<syswrite> error happens while +performing this write. Invoked as for the C<Stream>'s C<on_write_error> event. + + $on_error->( $stream, $errno ) + =back If the object is not yet a member of a loop and doesn't yet have a @@ -704,57 +714,58 @@ my $writequeue = $self->{writequeue}; my $head; - while( $head = $writequeue->[0] and ref $head->[WQ_DATA] ) { - if( ref $head->[WQ_DATA] eq "CODE" ) { - my $data = $head->[WQ_DATA]->( $self ); + while( $head = $writequeue->[0] and ref $head->data ) { + if( ref $head->data eq "CODE" ) { + my $data = $head->data->( $self ); if( !defined $data ) { - $head->[WQ_ON_FLUSH]->( $self ) if $head->[WQ_ON_FLUSH]; + $head->on_flush->( $self ) if $head->on_flush; shift @$writequeue; return 1; } if( !ref $data and my $encoding = $self->{encoding} ) { $data = $encoding->encode( $data ); } - unshift @$writequeue, my $new = [ $data ]; - $new->[$_] = $head->[$_] for WQ_WRITELEN, WQ_ON_WRITE; # not ON_FLUSH + unshift @$writequeue, my $new = Writer( + $data, $head->writelen, $head->on_write, undef, undef, 0 + ); next; } - elsif( blessed $head->[WQ_DATA] and $head->[WQ_DATA]->isa( "Future" ) ) { - my $f = $head->[WQ_DATA]; + elsif( blessed $head->data and $head->data->isa( "Future" ) ) { + my $f = $head->data; if( !$f->is_ready ) { - return 0 if $head->[WQ_WATCHING]; + return 0 if $head->watching; $f->on_ready( sub { $self->_flush_one_write } ); - $head->[WQ_WATCHING]++; + $head->watching++; return 0; } my $data = $f->get; if( !ref $data and my $encoding = $self->{encoding} ) { $data = $encoding->encode( $data ); } - $head->[WQ_DATA] = $data; + $head->data = $data; next; } else { - die "Unsure what to do with reference ".ref($head->[WQ_DATA])." in write queue"; + die "Unsure what to do with reference ".ref($head->data)." in write queue"; } } my $second; while( $second = $writequeue->[1] and - !ref $second->[WQ_DATA] and - $head->[WQ_WRITELEN] == $second->[WQ_WRITELEN] and - !$head->[WQ_ON_WRITE] and !$second->[WQ_ON_WRITE] and - !$head->[WQ_ON_FLUSH] ) { - $head->[WQ_DATA] .= $second->[WQ_DATA]; - $head->[WQ_ON_WRITE] = $second->[WQ_ON_WRITE]; - $head->[WQ_ON_FLUSH] = $second->[WQ_ON_FLUSH]; + !ref $second->data and + $head->writelen == $second->writelen and + !$head->on_write and !$second->on_write and + !$head->on_flush ) { + $head->data .= $second->data; + $head->on_write = $second->on_write; + $head->on_flush = $second->on_flush; splice @$writequeue, 1, 1, (); } - die "TODO: head data does not contain a plain string" if ref $head->[WQ_DATA]; + die "TODO: head data does not contain a plain string" if ref $head->data; my $writer = $self->{writer}; - my $len = $self->$writer( $self->write_handle, $head->[WQ_DATA], $head->[WQ_WRITELEN] ); + my $len = $self->$writer( $self->write_handle, $head->data, $head->writelen ); if( !defined $len ) { my $errno = $!; @@ -771,18 +782,19 @@ $self->maybe_invoke_event( on_write_eof => ); } + $head->on_error->( $self, $errno ) if $head->on_error; $self->maybe_invoke_event( on_write_error => $errno ) or $self->close_now; return 0; } - if( my $on_write = $head->[WQ_ON_WRITE] ) { + if( my $on_write = $head->on_write ) { $on_write->( $self, $len ); } - if( !length $head->[WQ_DATA] ) { - $head->[WQ_ON_FLUSH]->( $self ) if $head->[WQ_ON_FLUSH]; + if( !length $head->data ) { + $head->on_flush->( $self ) if $head->on_flush; shift @{ $self->{writequeue} }; } @@ -808,18 +820,30 @@ my $on_write = delete $params{on_write}; my $on_flush = delete $params{on_flush}; + my $on_error = delete $params{on_error}; my $f; if( defined wantarray ) { my $orig_on_flush = $on_flush; + my $orig_on_error = $on_error; $f = $self->loop->new_future; $on_flush = sub { $f->done; $orig_on_flush->( @_ ) if $orig_on_flush; }; + $on_error = sub { + my $self = shift; + my ( $errno ) = @_; + + $f->fail( "write failed: $errno", syswrite => $errno ) unless $f->is_ready; + + $orig_on_error->( $self, @_ ) if $orig_on_error; + }; } - push @{ $self->{writequeue} }, [ $data, $params{write_len} // $self->{write_len}, $on_write, $on_flush ]; + push @{ $self->{writequeue} }, Writer( + $data, $params{write_len} // $self->{write_len}, $on_write, $on_flush, $on_error, 0 + ); keys %params and croak "Unrecognised keys for ->write - " . join( ", ", keys %params ); @@ -875,7 +899,7 @@ my $readqueue = $self->{readqueue}; my $ret; - if( $readqueue->[0] and my $on_read = $readqueue->[0][RQ_ONREAD] ) { + if( $readqueue->[0] and my $on_read = $readqueue->[0]->on_read ) { $ret = $on_read->( $self, \$self->{readbuff}, $eof ); } else { @@ -890,7 +914,7 @@ if( ref $ret eq "CODE" ) { # Replace the top CODE, or add it if there was none - $readqueue->[0] = [ $ret ]; + $readqueue->[0] = Reader( $ret, undef ); return 1; } elsif( @$readqueue and !defined $ret ) { @@ -937,7 +961,7 @@ or $self->close_now; foreach ( @{ $self->{readqueue} } ) { - $_->[RQ_FUTURE]->fail( "read failed: $errno", sysread => $errno ) if $_->[RQ_FUTURE]; + $_->future->fail( "read failed: $errno", sysread => $errno ) if $_->future; } undef @{ $self->{readqueue} }; @@ -960,7 +984,7 @@ $self->maybe_invoke_event( on_read_eof => ); $self->close_now if $self->{close_on_read_eof}; foreach ( @{ $self->{readqueue} } ) { - $_->[RQ_FUTURE]->done( undef ) if $_->[RQ_FUTURE]; + $_->future->done( undef ) if $_->future; } undef @{ $self->{readqueue} }; return; @@ -1012,7 +1036,7 @@ my ( $on_read, %args ) = @_; # %args undocumented for internal use - push @{ $self->{readqueue} }, [ $on_read, $args{future} ]; + push @{ $self->{readqueue} }, Reader( $on_read, $args{future} ); # TODO: Should this always defer? 1 while length $self->{readbuff} and $self->_flush_one_read( 0 ); === modified file 't/21stream-2write.t' --- t/21stream-2write.t 2013-10-10 17:31:29 +0000 +++ t/21stream-2write.t 2014-07-24 07:32:41 +0000 @@ -257,10 +257,10 @@ on_write_eof => sub { $eof++ }, ); - $stream->write( "Junk" ); - $loop->add( $stream ); + my $write_future = $stream->write( "Junk" ); + $rd->close; ok( !$stream->is_write_eof, '$stream->is_write_eof before wait' ); @@ -272,6 +272,9 @@ is( $eof, 1, 'EOF indication after wait' ); ok( !defined $stream->loop, 'EOF stream no longer member of Loop' ); + + ok( $write_future->is_ready,'write future ready after stream closed' ); + ok( $write_future->is_failed,'write future failed after stream closed' ); } # Close @@ -456,12 +459,15 @@ $loop->add( $stream ); - $stream->write( "hello" ); + my $write_future = $stream->write( "hello" ); wait_for { defined $write_errno }; cmp_ok( $write_errno, "==", ECONNRESET, 'errno after failed write' ); + ok( $write_future->is_ready,'write future ready after failed write' ); + ok( $write_future->is_failed,'write future failed after failed write' ); + $loop->remove( $stream ); }
Subject: Re: [rt.cpan.org #97433] Stream's write futures are not failed on close
Date: Thu, 24 Jul 2014 10:09:16 +0100
To: bug-IO-Async [...] rt.cpan.org
From: Gianni Ceccarelli <dakkar [...] cpan.org>
Probably just minor issues, but: while( $second = $writequeue->[1] and - !ref $second->[WQ_DATA] and - $head->[WQ_WRITELEN] == $second->[WQ_WRITELEN] and - !$head->[WQ_ON_WRITE] and !$second->[WQ_ON_WRITE] and - !$head->[WQ_ON_FLUSH] ) { - $head->[WQ_DATA] .= $second->[WQ_DATA]; - $head->[WQ_ON_WRITE] = $second->[WQ_ON_WRITE]; - $head->[WQ_ON_FLUSH] = $second->[WQ_ON_FLUSH]; + !ref $second->data and + $head->writelen == $second->writelen and + !$head->on_write and !$second->on_write and + !$head->on_flush ) { + $head->data .= $second->data; + $head->on_write = $second->on_write; + $head->on_flush = $second->on_flush; splice @$writequeue, 1, 1, (); What about on_error? Should we copy that one as well? Also, why are we checking !$second->on_write ? + $on_error = sub { + my $self = shift; + my ( $errno ) = @_; + + $f->fail( "write failed: $errno", syswrite => $errno ) unless $f->is_ready; + + $orig_on_error->( $self, @_ ) if $orig_on_error; + }; 'syswrite'? That failure will also be used on stream close, where the problem (I think) is not syswrite-related. -- Dakkar - <Mobilis in mobile> GPG public key fingerprint = A071 E618 DD2C 5901 9574 6FE2 40EA 9883 7519 3F88 key id = 0x75193F88 Tonight's special, blackened blackened leftovers
Released -- Paul Evans