Skip Menu |

This queue is for tickets about the IO-Async CPAN distribution.

Report information
The Basics
Id: 76676
Status: resolved
Priority: 0/
Queue: IO-Async

People
Owner: Nobody in particular
Requestors: leonerd-cpan [...] leonerd.org.uk
Cc: cpan [...] prather.org
AdminCc:

Bug Information
Severity: Wishlist
Broken in: 0.47
Fixed in: 0.48



Subject: Channel improvemements - longrunning on_recv; sync-to-sync
Two features: 1) Allow setting up a longrunning on_recv for a Channel, rather than the oneshot of the ->recv method. E.g. $ch->configure( on_recv => sub { ... } ); 2) Allow passing both ends of a Channel to two different Routines, thus making a synchronous channel to pass data between them. -- Paul Evans
On Wed Apr 18 17:41:44 2012, PEVANS wrote: Show quoted text
> 1) Allow setting up a longrunning on_recv for a Channel, rather than the > oneshot of the ->recv method. E.g. > > $ch->configure( > on_recv => sub { ... } > );
Attached is a patch for this. The other is harder; I'll take a look tomorrow maybe. -- Paul Evans
Subject: rt76676-1.patch
=== modified file 'lib/IO/Async/Channel.pm' --- lib/IO/Async/Channel.pm 2012-03-24 21:08:34 +0000 +++ lib/IO/Async/Channel.pm 2012-04-18 23:05:18 +0000 @@ -59,8 +59,8 @@ While this object does in fact inherit from L<IO::Async::Notifier> for implementation reasons it is not intended that this object be used as a -Notifier. The C<configure> method should not be called, and it should not be -added to a Loop object. +Notifier. It should not be added to a Loop object directly; event management +will be handled by its containing C<IO::Async::Routine> object. =cut @@ -76,6 +76,39 @@ =cut +=head2 $channel->configure( %params ) + +Similar to the standard C<configure> method on C<IO::Async::Notifier>, this is +used to change details of the Channel's operation. + +=over 4 + +=item on_recv => CODE + +May only be set on an async mode channel. If present, will be invoked whenever +a new value is received, rather than using the C<recv> method. + + $on_recv->( $channel, $data ) + +=back + +=cut + +sub configure +{ + my $self = shift; + my %params = @_; + + if( exists $params{on_recv} ) { + $self->{mode} and $self->{mode} eq "async" or + croak "Can only configure on_recv in async mode"; + + $self->{on_recv} = delete $params{on_recv}; + } + + $self->SUPER::configure( %params ); +} + =head2 $channel->send( $data ) Pushes the data stored in the given Perl reference into the FIFO of the @@ -245,24 +278,12 @@ $self->{on_recv} = $on_recv; $self->{on_eof} = delete $args{on_eof}; } - else { - $self->{on_result_queue} = \my @on_result_queue; - $self->{on_recv} = sub { - my ( $self, $result ) = @_; - (shift @on_result_queue)->( $self, recv => $result ); - }; - $self->{on_eof} = sub { - my ( $self ) = @_; - while( @on_result_queue ) { - (shift @on_result_queue)->( $self, eof => ); - } - }; - } keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args ); $self->{stream} = $stream; $self->{mode} = "async"; + $self->{on_result_queue} = []; $stream->configure( autoflush => 1, @@ -307,7 +328,10 @@ my ( $stream, $buffref, $eof ) = @_; if( $eof ) { - $self->{on_eof}->( $self ); + while( my $on_result = shift @{ $self->{on_result_queue} } ) { + $on_result->( $self, eof => ); + } + $self->{on_eof}->( $self ) if $self->{on_eof}; return; } @@ -318,7 +342,12 @@ my $record = thaw( substr( $$buffref, 4, $len ) ); substr( $$buffref, 0, 4 + $len ) = ""; - $self->{on_recv}->( $self, $record ); + if( my $on_result = shift @{ $self->{on_result_queue} } ) { + $on_result->( $self, recv => $record ); + } + else { + $self->{on_recv}->( $self, $record ); + } return 1; } === modified file 't/40channel.t' --- t/40channel.t 2012-02-14 22:24:38 +0000 +++ t/40channel.t 2012-04-18 23:05:18 +0000 @@ -4,7 +4,7 @@ use IO::Async::Test; -use Test::More tests => 11; +use Test::More tests => 12; use Test::Identity; use IO::Async::Channel; @@ -133,6 +133,22 @@ is_deeply( $recved, [ data => "by sync" ], 'Async mode channel can ->recv on_recv' ); + my @recv_queue; + $channel_rd->configure( + on_recv => sub { push @recv_queue, $_[1] } + ); + + undef $recved; + + $channel_wr->send( [ first => "thing" ] ); + $channel_wr->send( [ second => "thing" ] ); + + wait_for { @recv_queue >= 2 }; + + is_deeply( \@recv_queue, + [ [ first => "thing" ], [ second => "thing" ] ], + 'Async mode channel can receive with ->configure on_recv' ); + $channel_wr->close; my $recv_eof;
On Wed Apr 18 17:41:44 2012, PEVANS wrote: Show quoted text
> 2) Allow passing both ends of a Channel to two different Routines,
thus Show quoted text
> making a synchronous channel to pass data between them.
This part is proving to be non-trivial, on part of tearing down the async-mode end of the channel inbetween. I wonder if it might be easier to add two new keys, channels_src and channels_sink, that configure the in-routine side of a channel but don't configure the main process side. That would allow two routines to share a channel quite easily. -- Paul Evans
Actually, via some cunning trickery and being lazy, this works too. Find attached a patch to implement sync->sync mode sharing of Channel between two Routines. It requires the previous patch as well. -- Paul Evans
Subject: rt76676-2.patch
=== modified file 'lib/IO/Async/Channel.pm' --- lib/IO/Async/Channel.pm 2012-04-18 23:16:23 +0000 +++ lib/IO/Async/Channel.pm 2012-04-25 22:58:35 +0000 @@ -14,6 +14,8 @@ use Carp; use Storable qw( freeze thaw ); +use IO::Async::Stream; + =head1 NAME C<IO::Async::Channel> - pass values into or out from an L<IO::Async::Routine> @@ -31,7 +33,9 @@ mode. In asynchronous mode all methods return immediately and use C<IO::Async>-style callback functions. In synchronous within the Routine process the methods block until they are ready and may be used for -flow-control within the routine. +flow-control within the routine. Alternatively, a Channel may be shared +between two different Routine objects, and not used directly by the +controlling program. The channel itself represents a FIFO of Perl reference values. New values may be put into the channel by the C<send> method in either mode. Values may be @@ -68,7 +72,7 @@ { my $class = shift; return bless { - mode => undef, + mode => "", }, $class; } @@ -90,6 +94,13 @@ $on_recv->( $channel, $data ) +=item on_eof => CODE + +May only be set on an async mode channel. If present, will be invoked when the +channel gets closed by the peer. + + $on_eof->( $channel ) + =back =cut @@ -99,11 +110,13 @@ my $self = shift; my %params = @_; - if( exists $params{on_recv} ) { + foreach (qw( on_recv on_eof )) { + next unless exists $params{$_}; $self->{mode} and $self->{mode} eq "async" or - croak "Can only configure on_recv in async mode"; + croak "Can only configure $_ in async mode"; - $self->{on_recv} = delete $params{on_recv}; + $self->{$_} = delete $params{$_}; + $self->_build_stream; } $self->SUPER::configure( %params ); @@ -272,30 +285,37 @@ my $self = shift; my %args = @_; - my $stream = delete $args{stream} or croak "Expected 'stream'"; - - if( my $on_recv = delete $args{on_recv} ) { - $self->{on_recv} = $on_recv; - $self->{on_eof} = delete $args{on_eof}; - } + exists $args{$_} and $self->{$_} = delete $args{$_} for qw( read_handle write_handle ); keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args ); - $self->{stream} = $stream; $self->{mode} = "async"; - $self->{on_result_queue} = []; - - $stream->configure( - autoflush => 1, - on_read => $self->_capture_weakself( '_on_stream_read' ) - ); +} + +sub _build_stream +{ + my $self = shift; + return $self->{stream} ||= do { + $self->{on_result_queue} = []; + + my $stream = IO::Async::Stream->new( + read_handle => $self->{read_handle}, + write_handle => $self->{write_handle}, + autoflush => 1, + on_read => $self->_capture_weakself( '_on_stream_read' ) + ); + + $self->add_child( $stream ); + + $stream; + }; } sub _send_async { my $self = shift; my ( $bytes ) = @_; - $self->{stream}->write( $bytes ); + $self->_build_stream->write( $bytes ); } sub _recv_async @@ -305,6 +325,8 @@ my $on_recv = $args{on_recv}; my $on_eof = $args{on_eof}; + $self->_build_stream; + push @{ $self->{on_result_queue} }, sub { my ( $self, $type, $result ) = @_; if( $type eq "recv" ) { @@ -352,6 +374,30 @@ return 1; } +sub _extract_read_handle +{ + my $self = shift; + + return undef if !$self->{mode}; + + croak "Cannot extract filehandle" if $self->{mode} ne "async"; + $self->{mode} = "dead"; + + return $self->{read_handle}; +} + +sub _extract_write_handle +{ + my $self = shift; + + return undef if !$self->{mode}; + + croak "Cannot extract filehandle" if $self->{mode} ne "async"; + $self->{mode} = "dead"; + + return $self->{write_handle}; +} + =head1 AUTHOR Paul Evans <leonerd@leonerd.org.uk> === modified file 'lib/IO/Async/Routine.pm' --- lib/IO/Async/Routine.pm 2012-04-12 15:40:20 +0000 +++ lib/IO/Async/Routine.pm 2012-04-25 22:58:35 +0000 @@ -12,8 +12,6 @@ use base qw( IO::Async::Process ); -use IO::Async::Stream; - =head1 NAME C<IO::Async::Routine> - execute code in an independent sub-process @@ -130,13 +128,19 @@ my @channels_out; foreach my $ch ( @{ $self->{channels_in} || [] } ) { - my ( $rd, $wr ) = $loop->pipepair; + my ( $rd, $wr ); + unless( $rd = $ch->_extract_read_handle ) { + ( $rd, $wr ) = $loop->pipepair; + } push @setup, $rd => "keep"; push @channels_in, [ $ch, $wr, $rd ]; } foreach my $ch ( @{ $self->{channels_out} || [] } ) { - my ( $rd, $wr ) = $loop->pipepair; + my ( $rd, $wr ); + unless( $wr = $ch->_extract_write_handle ) { + ( $rd, $wr ) = $loop->pipepair; + } push @setup, $wr => "keep"; push @channels_out, [ $ch, $rd, $wr ]; } @@ -173,19 +177,17 @@ foreach ( @channels_in ) { my ( $ch, $wr ) = @$_; - my $stream = IO::Async::Stream->new( write_handle => $wr ); - $ch->setup_async_mode( stream => $stream ); + $ch->setup_async_mode( write_handle => $wr ); - $self->add_child( $stream ); + $self->add_child( $ch ) unless $ch->parent; } foreach ( @channels_out ) { my ( $ch, $rd ) = @$_; - my $stream = IO::Async::Stream->new( read_handle => $rd ); - $ch->setup_async_mode( stream => $stream ); + $ch->setup_async_mode( read_handle => $rd ); - $self->add_child( $stream ); + $self->add_child( $ch ) unless $ch->parent; } $self->SUPER::_add_to_loop( $loop ); === modified file 't/40channel.t' --- t/40channel.t 2012-04-18 23:16:23 +0000 +++ t/40channel.t 2012-04-25 22:58:35 +0000 @@ -4,12 +4,11 @@ use IO::Async::Test; -use Test::More tests => 12; +use Test::More tests => 11; use Test::Identity; use IO::Async::Channel; -use IO::Async::Stream; use IO::Async::Loop::Poll; use Storable qw( freeze ); @@ -48,17 +47,15 @@ $channel_rd->setup_sync_mode( $pipe_rd ); my $channel_wr = IO::Async::Channel->new; - $channel_wr->setup_async_mode( - stream => my $stream_wr = IO::Async::Stream->new( write_handle => $pipe_wr ), - ); + $channel_wr->setup_async_mode( write_handle => $pipe_wr ); - $loop->add( $stream_wr ); + $loop->add( $channel_wr ); $channel_wr->send( [ data => "by async" ] ); # Cheat for semi-sync my $flushed; - $stream_wr->write( "", on_flush => sub { $flushed++ } ); + $channel_wr->{stream}->write( "", on_flush => sub { $flushed++ } ); wait_for { $flushed }; is_deeply( $channel_rd->recv, [ data => "by async" ], 'Async mode channel can send' ); @@ -68,7 +65,7 @@ is( $channel_rd->recv, undef, 'Sync mode can be closed' ); } -# sync->async +# sync->async configured on_recv { my ( $pipe_rd, $pipe_wr ) = $loop->pipepair; @@ -76,8 +73,11 @@ my $recv_eof; my $channel_rd = IO::Async::Channel->new; - $channel_rd->setup_async_mode( - stream => my $stream_rd = IO::Async::Stream->new( read_handle => $pipe_rd ), + $channel_rd->setup_async_mode( read_handle => $pipe_rd ); + + $loop->add( $channel_rd ); + + $channel_rd->configure( on_recv => sub { identical( $_[0], $channel_rd, 'Channel passed to on_recv' ); push @recv_queue, $_[1]; @@ -87,8 +87,6 @@ }, ); - $loop->add( $stream_rd ); - my $channel_wr = IO::Async::Channel->new; $channel_wr->setup_sync_mode( $pipe_wr ); @@ -104,16 +102,14 @@ is( $recv_eof, 1, 'Async mode channel can on_eof' ); } -# sync->async late ->recv +# sync->async oneshot ->recv { my ( $pipe_rd, $pipe_wr ) = $loop->pipepair; my $channel_rd = IO::Async::Channel->new; - $channel_rd->setup_async_mode( - stream => my $stream_rd = IO::Async::Stream->new( read_handle => $pipe_rd ), - ); + $channel_rd->setup_async_mode( read_handle => $pipe_rd ); - $loop->add( $stream_rd ); + $loop->add( $channel_rd ); my $channel_wr = IO::Async::Channel->new; $channel_wr->setup_sync_mode( $pipe_wr ); @@ -133,22 +129,6 @@ is_deeply( $recved, [ data => "by sync" ], 'Async mode channel can ->recv on_recv' ); - my @recv_queue; - $channel_rd->configure( - on_recv => sub { push @recv_queue, $_[1] } - ); - - undef $recved; - - $channel_wr->send( [ first => "thing" ] ); - $channel_wr->send( [ second => "thing" ] ); - - wait_for { @recv_queue >= 2 }; - - is_deeply( \@recv_queue, - [ [ first => "thing" ], [ second => "thing" ] ], - 'Async mode channel can receive with ->configure on_recv' ); - $channel_wr->close; my $recv_eof; === modified file 't/41routine.t' --- t/41routine.t 2012-02-21 21:53:57 +0000 +++ t/41routine.t 2012-04-25 22:58:35 +0000 @@ -4,7 +4,7 @@ use IO::Async::Test; -use Test::More tests => 11; +use Test::More tests => 12; use Test::Identity; use Test::Refcount; @@ -127,3 +127,35 @@ identical( $finishargs[0], $routine, 'on_finish passed self' ); is( $finishargs[1], 0, 'on_finish passed exit code' ); } + +{ + my $channel = IO::Async::Channel->new; + + my $src_finished; + my $src_routine = IO::Async::Routine->new( + channels_out => [ $channel ], + code => sub { + $channel->send( [ some => "data" ] ); + return 0; + }, + on_finish => sub { $src_finished++ }, + ); + + $loop->add( $src_routine ); + + my $sink_result; + my $sink_routine = IO::Async::Routine->new( + channels_in => [ $channel ], + code => sub { + my @data = @{ $channel->recv }; + return ( $data[0] eq "some" and $data[1] eq "data" ) ? 0 : 1; + }, + on_finish => sub { $sink_result = $_[1] }, + ); + + $loop->add( $sink_routine ); + + wait_for { $src_finished and defined $sink_result }; + + is( $sink_result, 0, 'synchronous src->sink can share a channel' ); +}
Released in 0.48 -- Paul Evans