use strict;
use warnings FATAL => 'all';
use POE;
use Test::More tests => 2;
{
package WheelWrapper;
use strict;
use warnings FATAL => 'all';
use Carp 'croak';
use POE;
use POE::Filter::Stream;
use POE::Wheel::Run;
require Exporter;
use base 'Exporter';
our @EXPORT = qw(quickie_run quickie quickie_merged quickie_tee quickie_tee_merged);
our @EXPORT_OK = @EXPORT;
our %EXPORT_TAGS = (ALL => [@EXPORT]);
our %OBJECTS;
sub new {
my ($package, %args) = @_;
my $parent_id = $poe_kernel->get_active_session->ID;
if (my $self = $OBJECTS{$parent_id}) {
return $self;
}
my $self = bless \%args, $package;
$self->{parent_id} = $parent_id;
$OBJECTS{$parent_id} = $self;
return $self;
}
sub _create_session {
my ($self) = @_;
POE::Session->create(
object_states => [
$self => [qw(
_start
_stop
_exception
_create_wheel
_child_signal
_child_timeout
_child_stdin
_child_stdout
_child_stderr
_killall
)],
],
options => {
($self->{debug} ? (debug => 1) : ()),
($self->{default} ? (default => 1) : ()),
($self->{trace} ? (trace => 1) : ()),
},
);
return;
}
sub _start {
my ($kernel, $session, $self) = @_[KERNEL, SESSION, OBJECT];
my $session_id = $session->ID;
$self->{session_id} = $session_id;
$kernel->sig(DIE => '_exception');
return;
}
sub _stop {
my $self = $_[OBJECT];
delete $self->{session_id};
return;
}
sub run {
my ($self, %args) = @_;
croak 'Program parameter not supplied' if !defined $args{Program};
if ($args{AltFork} && ref $args{Program}) {
croak 'Program must be a string when AltFork is enabled';
}
if ($args{AltFork} && $^O eq 'Win32') {
croak 'AltFork does not currently work on Win32';
}
$self->_create_session() if !defined $self->{session_id};
my ($exception, $wheel)
= $poe_kernel->call($self->{session_id}, '_create_wheel', \%args);
# propagate possible exception from POE::Wheel::Run->new()
croak $exception if $exception;
return $wheel->PID;
}
sub _create_wheel {
my ($kernel, $self, $args) = @_[KERNEL, OBJECT, ARG0];
my %data;
for my $arg (qw(AltFork Timeout Input Program Context ProgramArgs
StdoutEvent StderrEvent ExitEvent ResultEvent Tee Merged)) {
next if !exists $args->{$arg};
$data{$arg} = delete $args->{$arg};
}
if ($data{AltFork}) {
my @inc = map { +'-I' => $_ } @INC;
$data{Program} = [$^X, @inc, '-e', $data{Program}];
}
my $wheel;
eval {
$wheel = POE::Wheel::Run->new(
StdinFilter => POE::Filter::Stream->new(),
StdinEvent => '_child_stdin',
StdoutEvent => '_child_stdout',
StderrEvent => '_child_stderr',
Program => $data{Program},
(defined $data{ProgramArgs}
? (ProgramArgs => $data{ProgramArgs})
: ()
),
($^O ne 'Win32'
? (CloseOnCall => 1)
: ()
),
%$args,
);
};
if ($@) {
chomp $@;
return $@;
}
$data{obj} = $wheel;
$data{extra_args} = $args;
$self->{wheels}{$wheel->ID} = \%data;
if (defined $data{Input}) {
$wheel->put($data{Input});
}
else {
$wheel->shutdown_stdin();
}
if ($data{Timeout}) {
$data{alrm} =
$kernel->delay_set('_child_timeout', $data{Timeout}, $wheel->ID);
}
$kernel->sig_child($wheel->PID, '_child_signal');
return (undef, $wheel);
}
sub _exception {
my ($kernel, $self, $ex) = @_[KERNEL, OBJECT, ARG1];
chomp $ex->{error_str};
warn __PACKAGE__.": Event $ex->{event} in session "
.$ex->{dest_session}->ID." raised exception:\n $ex->{error_str}\n";
$kernel->sig_handled();
return;
}
sub _child_signal {
my ($kernel, $self, $pid, $status) = @_[KERNEL, OBJECT, ARG1, ARG2];
my $id = $self->_pid_to_id($pid);
my $data = $self->{wheels}{$id};
my $s = $status >> 8;
if ($s != 0 && !exists $data->{ExitEvent}
&& !exists $data->{ResultEvent}) {
warn "Child $pid exited with nonzero status $s\n";
}
$kernel->alarm_remove($data->{alrm}) if $data->{Timeout};
if ($data->{lazy}) {
$self->{lazy}{$id} = {
merged => $data->{merged},
stdout => $data->{stdout},
stderr => $data->{stderr},
status => $status,
}
}
delete $self->{wheels}{$id};
if (defined $data->{ExitEvent}) {
$kernel->call(
$self->{parent_id},
$data->{ExitEvent},
$status,
$pid,
(defined $data->{Context}
? $data->{Context}
: ()),
);
}
if (defined $data->{ResultEvent}) {
$kernel->call(
$self->{parent_id},
$data->{ResultEvent},
$data->{stdout},
$data->{stderr},
$data->{merged},
$status,
$pid,
(defined $data->{Context}
? $data->{Context}
: ()),
);
}
return;
}
sub _child_timeout {
my ($self, $id) = @_[OBJECT, ARG0];
$self->{wheels}{$id}{obj}->kill();
return;
}
sub _child_stdin {
my ($self, $id) = @_[OBJECT, ARG0];
$self->{wheels}{$id}{obj}->shutdown_stdin();
return;
}
sub _child_stdout {
my ($kernel, $self, $output, $id) = @_[KERNEL, OBJECT, ARG0, ARG1];
my $data = $self->{wheels}{$id};
if ($data->{lazy} || defined $data->{ResultEvent}) {
push @{ $data->{merged} }, $output;
push @{ $data->{stdout} }, $output;
if ($data->{lazy}{Tee}) {
print $output, "\n";
}
}
elsif (!exists $data->{StdoutEvent}) {
print "$output\n";
}
elsif (defined (my $event = $data->{StdoutEvent})) {
my $context = $data->{Context};
$kernel->call(
$self->{parent_id},
$event,
$output,
$data->{obj}->PID,
(defined $context ? $context : ()),
);
}
return;
}
sub _child_stderr {
my ($kernel, $self, $error, $id) = @_[KERNEL, OBJECT, ARG0, ARG1];
my $data = $self->{wheels}{$id};
if ($data->{lazy} || defined $data->{ResultEvent}) {
push @{ $data->{merged} }, $error;
push @{ $data->{stderr} }, $error;
if ($data->{lazy}{Tee}) {
$data->{lazy}{Merged}
? print $error, "\n"
: warn $error, "\n";
}
}
elsif (!exists $data->{StderrEvent}) {
warn "$error\n";
}
elsif (defined (my $event = $data->{StderrEvent})) {
my $context = $data->{Context};
$kernel->call(
$self->{parent_id},
$event,
$error,
$data->{obj}->PID,
(defined $context ? $context : ()),
);
}
return;
}
sub _pid_to_id {
my ($self, $pid) = @_;
for my $id (keys %{ $self->{wheels} }) {
return $id if $self->{wheels}{$id}{obj}->PID == $pid;
}
return;
}
sub killall {
my $self = shift;
$self = POE::Quickie->new() if ref $self ne 'POE::Quickie';
$poe_kernel->call($self->{session_id}, '_killall', @_);
return;
}
sub _killall {
my ($kernel, $self, $signal) = @_[KERNEL, OBJECT, ARG0];
$kernel->alarm_remove_all();
for my $id (keys %{ $self->{wheels}}) {
$self->{wheels}{$id}{obj}->kill($signal);
}
return;
}
sub processes {
my ($self) = @_;
$self = POE::Quickie->new() if ref $self ne 'POE::Quickie';
my %wheels;
for my $id (keys %{ $self->{wheels} }) {
my $pid = $self->{wheels}{$id}{obj}->PID;
$wheels{$pid} = $self->{wheels}{$id}{Context};
}
return \%wheels;
}
sub _lazy_run {
my ($self, %args) = @_;
my %run_args;
if (@{ $args{RunArgs} } == 1 &&
(!ref $args{RunArgs}[0] || ref ($args{RunArgs}[0]) =~ /^(?:ARRAY|CODE)$/)) {
$run_args{Program} = $args{RunArgs}[0];
}
else {
%run_args = @{ $args{RunArgs} };
}
my $pid = $self->run(
%run_args,
ExitEvent => undef,
($args{Tee} ? () : (StderrEvent => undef)),
($args{Tee} ? () : (StdoutEvent => undef)),
);
my $id = $self->_pid_to_id($pid);
$self->{wheels}{$id}{lazy} = {
Tee => $args{Tee},
Merged => $args{Merged},
};
my $parent_id = $poe_kernel->get_active_session->ID;
$poe_kernel->refcount_increment($parent_id, __PACKAGE__);
$poe_kernel->run_one_timeslice() while $self->{wheels}{$id};
$poe_kernel->refcount_decrement($parent_id, __PACKAGE__);
my $data = delete $self->{lazy}{$id};
return $data->{merged}, $data->{status} if $args{Merged};
return $data->{stdout}, $data->{stderr}, $data->{status};
}
sub quickie_run {
my %args = @_;
my $self = POE::Quickie->new();
return $self->run(%args);
}
sub quickie {
my @args = @_;
my $self = POE::Quickie->new();
return $self->_lazy_run(
RunArgs => \@args
);
}
sub quickie_tee {
my @args = @_;
my $self = POE::Quickie->new();
return $self->_lazy_run(
RunArgs => \@args,
Tee => 1,
);
}
sub quickie_merged {
my @args = @_;
my $self = POE::Quickie->new();
return $self->_lazy_run(
RunArgs => \@args,
Merged => 1,
);
}
sub quickie_tee_merged {
my @args = @_;
my $self = POE::Quickie->new();
return $self->_lazy_run(
RunArgs => \@args,
Tee => 1,
Merged => 1,
);
}
}
POE::Session->create(
package_states => [
(__PACKAGE__) => [qw(
_start
stdout
stderr
)],
],
);
POE::Kernel->run;
sub _start {
my $heap = $_[HEAP];
$heap->{wrapper} = WheelWrapper->new(trace => 1);
$heap->{wrapper}->run(
Program => sub { print "foo\n" },
StdoutEvent => 'stdout',
);
}
sub stdout {
my ($heap, $output) = @_[HEAP, ARG0];
is($output, 'foo', 'Got stdout');
$heap->{wrapper}->run(
Program => sub { warn "bar\n" },
StderrEvent => 'stderr',
);
}
sub stderr {
my ($heap, $error) = @_[HEAP, ARG0];
is($error, 'bar', 'Got stderr');
}