Actually, via some cunning trickery and being lazy, this works too.
Find attached a patch to implement sync->sync mode sharing of Channel
between two Routines. It requires the previous patch as well.
--
Paul Evans
=== modified file 'lib/IO/Async/Channel.pm'
--- lib/IO/Async/Channel.pm 2012-04-18 23:16:23 +0000
+++ lib/IO/Async/Channel.pm 2012-04-25 22:58:35 +0000
@@ -14,6 +14,8 @@
use Carp;
use Storable qw( freeze thaw );
+use IO::Async::Stream;
+
=head1 NAME
C<IO::Async::Channel> - pass values into or out from an L<IO::Async::Routine>
@@ -31,7 +33,9 @@
mode. In asynchronous mode all methods return immediately and use
C<IO::Async>-style callback functions. In synchronous within the Routine
process the methods block until they are ready and may be used for
-flow-control within the routine.
+flow-control within the routine. Alternatively, a Channel may be shared
+between two different Routine objects, and not used directly by the
+controlling program.
The channel itself represents a FIFO of Perl reference values. New values may
be put into the channel by the C<send> method in either mode. Values may be
@@ -68,7 +72,7 @@
{
my $class = shift;
return bless {
- mode => undef,
+ mode => "",
}, $class;
}
@@ -90,6 +94,13 @@
$on_recv->( $channel, $data )
+=item on_eof => CODE
+
+May only be set on an async mode channel. If present, will be invoked when the
+channel gets closed by the peer.
+
+ $on_eof->( $channel )
+
=back
=cut
@@ -99,11 +110,13 @@
my $self = shift;
my %params = @_;
- if( exists $params{on_recv} ) {
+ foreach (qw( on_recv on_eof )) {
+ next unless exists $params{$_};
$self->{mode} and $self->{mode} eq "async" or
- croak "Can only configure on_recv in async mode";
+ croak "Can only configure $_ in async mode";
- $self->{on_recv} = delete $params{on_recv};
+ $self->{$_} = delete $params{$_};
+ $self->_build_stream;
}
$self->SUPER::configure( %params );
@@ -272,30 +285,37 @@
my $self = shift;
my %args = @_;
- my $stream = delete $args{stream} or croak "Expected 'stream'";
-
- if( my $on_recv = delete $args{on_recv} ) {
- $self->{on_recv} = $on_recv;
- $self->{on_eof} = delete $args{on_eof};
- }
+ exists $args{$_} and $self->{$_} = delete $args{$_} for qw( read_handle write_handle );
keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args );
- $self->{stream} = $stream;
$self->{mode} = "async";
- $self->{on_result_queue} = [];
-
- $stream->configure(
- autoflush => 1,
- on_read => $self->_capture_weakself( '_on_stream_read' )
- );
+}
+
+sub _build_stream
+{
+ my $self = shift;
+ return $self->{stream} ||= do {
+ $self->{on_result_queue} = [];
+
+ my $stream = IO::Async::Stream->new(
+ read_handle => $self->{read_handle},
+ write_handle => $self->{write_handle},
+ autoflush => 1,
+ on_read => $self->_capture_weakself( '_on_stream_read' )
+ );
+
+ $self->add_child( $stream );
+
+ $stream;
+ };
}
sub _send_async
{
my $self = shift;
my ( $bytes ) = @_;
- $self->{stream}->write( $bytes );
+ $self->_build_stream->write( $bytes );
}
sub _recv_async
@@ -305,6 +325,8 @@
my $on_recv = $args{on_recv};
my $on_eof = $args{on_eof};
+ $self->_build_stream;
+
push @{ $self->{on_result_queue} }, sub {
my ( $self, $type, $result ) = @_;
if( $type eq "recv" ) {
@@ -352,6 +374,30 @@
return 1;
}
+sub _extract_read_handle
+{
+ my $self = shift;
+
+ return undef if !$self->{mode};
+
+ croak "Cannot extract filehandle" if $self->{mode} ne "async";
+ $self->{mode} = "dead";
+
+ return $self->{read_handle};
+}
+
+sub _extract_write_handle
+{
+ my $self = shift;
+
+ return undef if !$self->{mode};
+
+ croak "Cannot extract filehandle" if $self->{mode} ne "async";
+ $self->{mode} = "dead";
+
+ return $self->{write_handle};
+}
+
=head1 AUTHOR
Paul Evans <leonerd@leonerd.org.uk>
=== modified file 'lib/IO/Async/Routine.pm'
--- lib/IO/Async/Routine.pm 2012-04-12 15:40:20 +0000
+++ lib/IO/Async/Routine.pm 2012-04-25 22:58:35 +0000
@@ -12,8 +12,6 @@
use base qw( IO::Async::Process );
-use IO::Async::Stream;
-
=head1 NAME
C<IO::Async::Routine> - execute code in an independent sub-process
@@ -130,13 +128,19 @@
my @channels_out;
foreach my $ch ( @{ $self->{channels_in} || [] } ) {
- my ( $rd, $wr ) = $loop->pipepair;
+ my ( $rd, $wr );
+ unless( $rd = $ch->_extract_read_handle ) {
+ ( $rd, $wr ) = $loop->pipepair;
+ }
push @setup, $rd => "keep";
push @channels_in, [ $ch, $wr, $rd ];
}
foreach my $ch ( @{ $self->{channels_out} || [] } ) {
- my ( $rd, $wr ) = $loop->pipepair;
+ my ( $rd, $wr );
+ unless( $wr = $ch->_extract_write_handle ) {
+ ( $rd, $wr ) = $loop->pipepair;
+ }
push @setup, $wr => "keep";
push @channels_out, [ $ch, $rd, $wr ];
}
@@ -173,19 +177,17 @@
foreach ( @channels_in ) {
my ( $ch, $wr ) = @$_;
- my $stream = IO::Async::Stream->new( write_handle => $wr );
- $ch->setup_async_mode( stream => $stream );
+ $ch->setup_async_mode( write_handle => $wr );
- $self->add_child( $stream );
+ $self->add_child( $ch ) unless $ch->parent;
}
foreach ( @channels_out ) {
my ( $ch, $rd ) = @$_;
- my $stream = IO::Async::Stream->new( read_handle => $rd );
- $ch->setup_async_mode( stream => $stream );
+ $ch->setup_async_mode( read_handle => $rd );
- $self->add_child( $stream );
+ $self->add_child( $ch ) unless $ch->parent;
}
$self->SUPER::_add_to_loop( $loop );
=== modified file 't/40channel.t'
--- t/40channel.t 2012-04-18 23:16:23 +0000
+++ t/40channel.t 2012-04-25 22:58:35 +0000
@@ -4,12 +4,11 @@
use IO::Async::Test;
-use Test::More tests => 12;
+use Test::More tests => 11;
use Test::Identity;
use IO::Async::Channel;
-use IO::Async::Stream;
use IO::Async::Loop::Poll;
use Storable qw( freeze );
@@ -48,17 +47,15 @@
$channel_rd->setup_sync_mode( $pipe_rd );
my $channel_wr = IO::Async::Channel->new;
- $channel_wr->setup_async_mode(
- stream => my $stream_wr = IO::Async::Stream->new( write_handle => $pipe_wr ),
- );
+ $channel_wr->setup_async_mode( write_handle => $pipe_wr );
- $loop->add( $stream_wr );
+ $loop->add( $channel_wr );
$channel_wr->send( [ data => "by async" ] );
# Cheat for semi-sync
my $flushed;
- $stream_wr->write( "", on_flush => sub { $flushed++ } );
+ $channel_wr->{stream}->write( "", on_flush => sub { $flushed++ } );
wait_for { $flushed };
is_deeply( $channel_rd->recv, [ data => "by async" ], 'Async mode channel can send' );
@@ -68,7 +65,7 @@
is( $channel_rd->recv, undef, 'Sync mode can be closed' );
}
-# sync->async
+# sync->async configured on_recv
{
my ( $pipe_rd, $pipe_wr ) = $loop->pipepair;
@@ -76,8 +73,11 @@
my $recv_eof;
my $channel_rd = IO::Async::Channel->new;
- $channel_rd->setup_async_mode(
- stream => my $stream_rd = IO::Async::Stream->new( read_handle => $pipe_rd ),
+ $channel_rd->setup_async_mode( read_handle => $pipe_rd );
+
+ $loop->add( $channel_rd );
+
+ $channel_rd->configure(
on_recv => sub {
identical( $_[0], $channel_rd, 'Channel passed to on_recv' );
push @recv_queue, $_[1];
@@ -87,8 +87,6 @@
},
);
- $loop->add( $stream_rd );
-
my $channel_wr = IO::Async::Channel->new;
$channel_wr->setup_sync_mode( $pipe_wr );
@@ -104,16 +102,14 @@
is( $recv_eof, 1, 'Async mode channel can on_eof' );
}
-# sync->async late ->recv
+# sync->async oneshot ->recv
{
my ( $pipe_rd, $pipe_wr ) = $loop->pipepair;
my $channel_rd = IO::Async::Channel->new;
- $channel_rd->setup_async_mode(
- stream => my $stream_rd = IO::Async::Stream->new( read_handle => $pipe_rd ),
- );
+ $channel_rd->setup_async_mode( read_handle => $pipe_rd );
- $loop->add( $stream_rd );
+ $loop->add( $channel_rd );
my $channel_wr = IO::Async::Channel->new;
$channel_wr->setup_sync_mode( $pipe_wr );
@@ -133,22 +129,6 @@
is_deeply( $recved, [ data => "by sync" ], 'Async mode channel can ->recv on_recv' );
- my @recv_queue;
- $channel_rd->configure(
- on_recv => sub { push @recv_queue, $_[1] }
- );
-
- undef $recved;
-
- $channel_wr->send( [ first => "thing" ] );
- $channel_wr->send( [ second => "thing" ] );
-
- wait_for { @recv_queue >= 2 };
-
- is_deeply( \@recv_queue,
- [ [ first => "thing" ], [ second => "thing" ] ],
- 'Async mode channel can receive with ->configure on_recv' );
-
$channel_wr->close;
my $recv_eof;
=== modified file 't/41routine.t'
--- t/41routine.t 2012-02-21 21:53:57 +0000
+++ t/41routine.t 2012-04-25 22:58:35 +0000
@@ -4,7 +4,7 @@
use IO::Async::Test;
-use Test::More tests => 11;
+use Test::More tests => 12;
use Test::Identity;
use Test::Refcount;
@@ -127,3 +127,35 @@
identical( $finishargs[0], $routine, 'on_finish passed self' );
is( $finishargs[1], 0, 'on_finish passed exit code' );
}
+
+{
+ my $channel = IO::Async::Channel->new;
+
+ my $src_finished;
+ my $src_routine = IO::Async::Routine->new(
+ channels_out => [ $channel ],
+ code => sub {
+ $channel->send( [ some => "data" ] );
+ return 0;
+ },
+ on_finish => sub { $src_finished++ },
+ );
+
+ $loop->add( $src_routine );
+
+ my $sink_result;
+ my $sink_routine = IO::Async::Routine->new(
+ channels_in => [ $channel ],
+ code => sub {
+ my @data = @{ $channel->recv };
+ return ( $data[0] eq "some" and $data[1] eq "data" ) ? 0 : 1;
+ },
+ on_finish => sub { $sink_result = $_[1] },
+ );
+
+ $loop->add( $sink_routine );
+
+ wait_for { $src_finished and defined $sink_result };
+
+ is( $sink_result, 0, 'synchronous src->sink can share a channel' );
+}