Skip Menu |

This queue is for tickets about the POE CPAN distribution.

Report information
The Basics
Id: 38669
Status: rejected
Priority: 0/
Queue: POE

People
Owner: RCAPUTO [...] cpan.org
Requestors: martijn [...] cpan.org
Cc:
AdminCc:

Bug Information
Severity: Wishlist
Broken in: (no value)
Fixed in: (no value)



Subject: Data::Transform support
So, I uploaded a first version of Data::Transform to CPAN today, and I thought I'd add a ticket with the patches to POE I would like to see for it. (NOTE: these aren't final, ready-to-apply patches, but more a starting point for discussion) There isn't very much to them. One is a patch for POE::Wheel::ReadWrite to send special packets right back into the driver without a round-trip to the user (mostly needed for the ssl protocol in the upcoming Data::Transform::SSL). Patches like this one are possibly needed for some other Wheels, but I haven't yet looked into that. The other two are patches to PoCo::*::TCP to send an EOF packet through the filter chain (which may or may not end up generating a network packet), so you can cleanly end the connection for protocols that send a 'goodbye' message.
Subject: data-transform.patch
diff -ur ../poe/lib/POE/Component/Client/TCP.pm lib/POE/Component/Client/TCP.pm --- ../poe/lib/POE/Component/Client/TCP.pm 2008-07-15 23:47:51.000000000 +0200 +++ lib/POE/Component/Client/TCP.pm 2008-08-07 20:22:50.000000000 +0200 @@ -274,6 +274,7 @@ if ($heap->{connected}) { $heap->{connected} = 0; if (defined $heap->{server}) { + $heap->{server}->put(Data::Transform::Meta::EOF->new()); if ( $heap->{got_an_error} or not $heap->{server}->get_driver_out_octets() diff -ur ../poe/lib/POE/Component/Server/TCP.pm lib/POE/Component/Server/TCP.pm --- ../poe/lib/POE/Component/Server/TCP.pm 2008-07-15 23:47:51.000000000 +0200 +++ lib/POE/Component/Server/TCP.pm 2008-08-07 20:22:35.000000000 +0200 @@ -261,6 +261,7 @@ my $heap = $_[HEAP]; $heap->{shutdown} = 1; if (defined $heap->{client}) { + $heap->{client}->put(Data::Transform::Meta::EOF->new()); if ( $heap->{got_an_error} or not $heap->{client}->get_driver_out_octets() Only in ../poe/lib/POE/: Component.pm Only in ../poe/lib/POE/: Driver Only in ../poe/lib/POE/: Driver.pm Only in ../poe/lib/POE/Filter: Block.pm Only in ../poe/lib/POE/Filter: Grep.pm Only in ../poe/lib/POE/Filter: HTTPD.pm Only in ../poe/lib/POE/Filter: Line.pm Only in ../poe/lib/POE/Filter: Map.pm Only in ../poe/lib/POE/Filter: RecordBlock.pm Only in ../poe/lib/POE/Filter: Reference.pm Only in ../poe/lib/POE/Filter: Stackable.pm Only in ../poe/lib/POE/Filter: Stream.pm Only in ../poe/lib/POE/: Filter.pm Only in ../poe/lib/POE/: Kernel.pm Only in ../poe/lib/POE/: Loader.pm Only in ../poe/lib/POE/: Loop Only in ../poe/lib/POE/: Loop.pm Only in ../poe/lib/POE/: NFA.pm Only in ../poe/lib/POE/: Pipe Only in ../poe/lib/POE/: Pipe.pm Only in ../poe/lib/POE/: Queue Only in ../poe/lib/POE/: Queue.pm Only in ../poe/lib/POE/: Resource Only in ../poe/lib/POE/: Resource.pm Only in ../poe/lib/POE/: Resources.pm Only in ../poe/lib/POE/: Session.pm Only in ../poe/lib/POE/Wheel: Curses.pm Only in ../poe/lib/POE/Wheel: FollowTail.pm Only in ../poe/lib/POE/Wheel: ListenAccept.pm Only in ../poe/lib/POE/Wheel: ReadLine.pm diff -ur ../poe/lib/POE/Wheel/ReadWrite.pm lib/POE/Wheel/ReadWrite.pm --- ../poe/lib/POE/Wheel/ReadWrite.pm 2008-06-27 11:40:25.000000000 +0200 +++ lib/POE/Wheel/ReadWrite.pm 2008-08-22 21:36:37.000000000 +0200 @@ -8,6 +8,7 @@ $VERSION = do {my($r)=(q$Revision$=~/(\d+)/);sprintf"1.%04d",$r}; use Carp qw( croak carp ); +use Scalar::Util qw(blessed); use POE qw(Wheel Driver::SysRW Filter::Line); # Offsets into $self. @@ -256,6 +257,7 @@ my $driver = $self->[DRIVER_BOTH]; my $input_filter = \$self->[FILTER_INPUT]; my $event_input = \$self->[EVENT_INPUT]; + my $handle_output = $self->[HANDLE_OUTPUT]; my $event_error = \$self->[EVENT_ERROR]; my $unique_id = $self->[UNIQUE_ID]; @@ -281,6 +283,13 @@ my $next_rec = $$input_filter->get_one(); last unless @$next_rec; foreach my $cooked_input (@$next_rec) { + if (blessed ($cooked_input)) { + if ($cooked_input->isa('Data::Transform::Meta::SENDBACK')) { + $driver->put([$cooked_input->data]); + $k->select_resume_write($handle_output); + next; + } + } $k->call($me, $$event_input, $cooked_input, $unique_id); } } @@ -473,6 +482,12 @@ my $next_rec = $self->[FILTER_INPUT]->get_one(); last unless @$next_rec; foreach my $cooked_input (@$next_rec) { + if (blessed ($cooked_input) and + $cooked_input->isa('Data::Transform::Meta::SENDBACK')) { + $self->[DRIVER_BOTH]->put([$cooked_input->{data}]); + $poe_kernel->select_resume_write($self->[HANDLE_OUTPUT]); + next; + } $poe_kernel->call( $poe_kernel->get_active_session(), $self->[EVENT_INPUT], Only in ../poe/lib/POE/Wheel: Run.pm Only in ../poe/lib/POE/Wheel: SocketFactory.pm Only in ../poe/lib/POE/: Wheel.pm
Update the patch to Wheel::ReadWrite to filter out Data::Transform::Meta packets on put, either before putting filter output in the driver (for Data::Transform filters) or before putting data into the filter (for POE::Filter filters) now passes wheel_readwrite.t, comp_tcp.t and comp_tcp_concurrent.t.
diff -ur poe/lib/POE/Component/Client/TCP.pm Data-Transform/lib/POE/Component/Client/TCP.pm --- poe/lib/POE/Component/Client/TCP.pm 2008-07-15 23:47:51.000000000 +0200 +++ Data-Transform/lib/POE/Component/Client/TCP.pm 2008-08-26 22:21:09.000000000 +0200 @@ -10,6 +10,8 @@ use Carp qw(carp croak); use Errno qw(ETIMEDOUT ECONNRESET); +use Data::Transform::Meta; + # Explicit use to import the parameter constants; use POE::Session; use POE::Driver::SysRW; @@ -274,6 +276,7 @@ if ($heap->{connected}) { $heap->{connected} = 0; if (defined $heap->{server}) { + $heap->{server}->put(Data::Transform::Meta::EOF->new()); if ( $heap->{got_an_error} or not $heap->{server}->get_driver_out_octets() diff -ur poe/lib/POE/Component/Server/TCP.pm Data-Transform/lib/POE/Component/Server/TCP.pm --- poe/lib/POE/Component/Server/TCP.pm 2008-07-15 23:47:51.000000000 +0200 +++ Data-Transform/lib/POE/Component/Server/TCP.pm 2008-08-26 22:21:37.000000000 +0200 @@ -11,6 +11,8 @@ use Socket qw(INADDR_ANY inet_ntoa inet_aton AF_INET AF_UNIX PF_UNIX); use Errno qw(ECONNABORTED ECONNRESET); +use Data::Transform::Meta; + # Explicit use to import the parameter constants. use POE::Session; use POE::Driver::SysRW; @@ -261,6 +263,7 @@ my $heap = $_[HEAP]; $heap->{shutdown} = 1; if (defined $heap->{client}) { + $heap->{client}->put(Data::Transform::Meta::EOF->new()); if ( $heap->{got_an_error} or not $heap->{client}->get_driver_out_octets() diff -ur poe/lib/POE/Wheel/ReadWrite.pm Data-Transform/lib/POE/Wheel/ReadWrite.pm --- poe/lib/POE/Wheel/ReadWrite.pm 2008-06-27 11:40:25.000000000 +0200 +++ Data-Transform/lib/POE/Wheel/ReadWrite.pm 2008-08-26 22:20:19.000000000 +0200 @@ -8,6 +8,7 @@ $VERSION = do {my($r)=(q$Revision$=~/(\d+)/);sprintf"1.%04d",$r}; use Carp qw( croak carp ); +use Scalar::Util qw(blessed); use POE qw(Wheel Driver::SysRW Filter::Line); # Offsets into $self. @@ -256,6 +257,7 @@ my $driver = $self->[DRIVER_BOTH]; my $input_filter = \$self->[FILTER_INPUT]; my $event_input = \$self->[EVENT_INPUT]; + my $handle_output = $self->[HANDLE_OUTPUT]; my $event_error = \$self->[EVENT_ERROR]; my $unique_id = $self->[UNIQUE_ID]; @@ -281,6 +283,13 @@ my $next_rec = $$input_filter->get_one(); last unless @$next_rec; foreach my $cooked_input (@$next_rec) { + if (blessed ($cooked_input)) { + if ($cooked_input->isa('Data::Transform::Meta::SENDBACK')) { + $driver->put([$cooked_input->data]); + $k->select_resume_write($handle_output); + next; + } + } $k->call($me, $$event_input, $cooked_input, $unique_id); } } @@ -419,11 +428,27 @@ sub put { my ($self, @chunks) = @_; - my $old_buffered_out_octets = $self->[DRIVER_BUFFERED_OUT_OCTETS]; - my $new_buffered_out_octets = - $self->[DRIVER_BUFFERED_OUT_OCTETS] = - $self->[DRIVER_BOTH]->put($self->[FILTER_OUTPUT]->put(\@chunks)); + my $new_buffered_out_octets; + + if ($self->[FILTER_OUTPUT]->can('meta')) { + my @filtered_chunks = grep { + not blessed $_ or not $_->isa('Data::Transform::Meta'); + } @{$self->[FILTER_OUTPUT]->put(\@chunks)}; + $new_buffered_out_octets = + $self->[DRIVER_BUFFERED_OUT_OCTETS] = + $self->[DRIVER_BOTH]->put(\@filtered_chunks); + } else { + $new_buffered_out_octets = + $self->[DRIVER_BUFFERED_OUT_OCTETS] = + $self->[DRIVER_BOTH]->put( + $self->[FILTER_OUTPUT]->put([ + grep { + not (blessed $_ and $_->isa('Data::Transform::Meta::EOF')); + } @chunks + ]) + ); + } if ( $self->[AUTOFLUSH] && @@ -473,6 +498,12 @@ my $next_rec = $self->[FILTER_INPUT]->get_one(); last unless @$next_rec; foreach my $cooked_input (@$next_rec) { + if (blessed ($cooked_input) and + $cooked_input->isa('Data::Transform::Meta::SENDBACK')) { + $self->[DRIVER_BOTH]->put([$cooked_input->{data}]); + $poe_kernel->select_resume_write($self->[HANDLE_OUTPUT]); + next; + } $poe_kernel->call( $poe_kernel->get_active_session(), $self->[EVENT_INPUT],
Are there any figures for the performance penalty to Data::Transform non-users? All the can()ning seems like a lot of work, but maybe it's not? Maybe it can be put behind a compile- time switch if it is.
On Thu Dec 15 15:24:43 2011, RCAPUTO wrote: Show quoted text
> Are there any figures for the performance penalty to Data::Transform > non-users? All the > can()ning seems like a lot of work, but maybe it's not? Maybe it can > be put behind a compile- > time switch if it is.
martijn@zzz:~$ time for (( i=10; i--; 1 )); do perl test.pl; done with POE::Filter::Line and no Data::Transform loaded: real 0m40.132s user 0m39.814s sys 0m0.284s with POE::Filter::Line and Data::Transform loaded: real 0m41.681s user 0m41.307s sys 0m0.348s with Data::Transform::Line: real 0m43.990s user 0m43.723s sys 0m0.248s
Subject: test.pl
use strict; use warnings; use POE; use POE::Wheel::ReadWrite; #use Data::Transform::POE; #use Data::Transform::Line; POE::Session->create( inline_states => { _start => sub { open( my $dict, 'british.med+.mwl' ); # from /usr/share/ispell $_[HEAP]->{wheel} = POE::Wheel::ReadWrite->new( Handle => $dict, #Filter => Data::Transform::Line->new, InputEvent => 'line', ErrorEvent => 'end', ); }, line => sub { $_[HEAP]->{count}++; }, end => sub { #warn delete $_[HEAP]->{count}; delete $_[HEAP]->{wheel}; } }, ); $poe_kernel->run;
I've attached a version of your benchmark that loops internally and reports basic processing rates excluding Perl and POE setup and teardown time. It doesn't report CPU times, which might be a problem.
Subject: lotr-bench-wheel-readwrite.pl
use strict; use warnings; use POE; use POE::Wheel::ReadWrite; use Time::HiRes qw(time); use constant ITERATIONS_PER_RUN => 10; POE::Session->create( inline_states => { _start => sub { $_[HEAP]->{tests_to_go} = ITERATIONS_PER_RUN; restart_test($_[HEAP]); }, line => sub { $_[HEAP]->{count}++; }, end => sub { my $elapsed = time() - $_[HEAP]{start}; my $count = $_[HEAP]{count}; printf "%d / %.3f = %.3f lines/sec\n", $count, $elapsed, $count / $elapsed; delete $_[HEAP]->{wheel}; restart_test($_[HEAP]) if --$_[HEAP]{tests_to_go} > 0; }, }, ); POE::Kernel->run(); exit; sub restart_test { my $heap = shift(); open(my $dict, '<', '/usr/share/dict/words') or die $!; $heap->{count} = 0; $heap->{start} = time(); $heap->{wheel} = POE::Wheel::ReadWrite->new( Handle => $dict, InputEvent => 'line', ErrorEvent => 'end', ); }
Subject: benchmark results
martijn@meh:~/devel/Perl/Data-Transform$ perl bench.pl 99156 / 8.988 = 11031.461 lines/sec 99156 / 9.110 = 10884.420 lines/sec 99156 / 9.042 = 10966.727 lines/sec 99156 / 9.029 = 10982.229 lines/sec 99156 / 8.950 = 11078.893 lines/sec 99156 / 8.953 = 11075.001 lines/sec 99156 / 8.968 = 11056.675 lines/sec 99156 / 8.930 = 11103.259 lines/sec 99156 / 8.903 = 11137.796 lines/sec 99156 / 8.924 = 11110.619 lines/sec martijn@meh:~/devel/Perl/Data-Transform$ perl bench.pl # with Data::Transform loaded 99156 / 9.287 = 10677.037 lines/sec 99156 / 9.155 = 10830.577 lines/sec 99156 / 9.259 = 10709.368 lines/sec 99156 / 9.165 = 10819.160 lines/sec 99156 / 9.156 = 10829.699 lines/sec 99156 / 9.136 = 10853.406 lines/sec 99156 / 9.150 = 10837.063 lines/sec 99156 / 9.135 = 10854.154 lines/sec 99156 / 9.142 = 10846.662 lines/sec 99156 / 9.227 = 10746.417 lines/sec martijn@meh:~/devel/Perl/Data-Transform$ perl -I lib bench.pl # with Data::Transform::Line used 99156 / 9.541 = 10393.082 lines/sec 99156 / 9.398 = 10550.579 lines/sec 99156 / 9.392 = 10557.869 lines/sec 99156 / 9.453 = 10488.941 lines/sec 99156 / 9.410 = 10536.966 lines/sec 99156 / 9.530 = 10404.112 lines/sec 99156 / 9.626 = 10300.745 lines/sec 99156 / 9.442 = 10501.358 lines/sec 99156 / 9.447 = 10495.481 lines/sec 99156 / 9.435 = 10509.591 lines/sec martijn@meh:~/devel/Perl/Data-Transform$
On Tue Jan 03 14:11:14 2012, MARTIJN wrote: [benchmarks] The benchmarks average 2.2% slower with Data::Transform loaded (but not used). The average slowdown using Data::Transform is 5.2%. Please commit your changes to POE::Wheel::ReadWrite. I may end up placing them behind a USE_DATA_TRANSFORM constant if someone can't tolerate the overhead.
Requested changes to be committed two months ago. Marking as stalled.
Requested the submitter apply their patch 20 months ago.