Skip Menu |

This queue is for tickets about the Net-Stomp CPAN distribution.

Report information
The Basics
Id: 36629
Status: resolved
Priority: 0/
Queue: Net-Stomp

People
Owner: Nobody in particular
Requestors: FRODWITH [...] cpan.org
Cc:
AdminCc:

Bug Information
Severity: Normal
Broken in: (no value)
Fixed in: (no value)



Subject: can_read and Frame->parse
-- this is perl 5.10.0 on darwin2-level, Net::Stomp 0.32 can_read doesn't behave correctly. The docs say it returns true iff there's a frame waiting to be read, but it's just a select() call to see if there's data on the socket, which can return both false positives and false negatives. While investigating this, I noticed that Net::Stomp::Frame->parse was modified at some point to take a socket as its argument. This is inconsistent with docs and probably not what was wanted anyway. parse and as_string should be symmetrical. Attached is a patch that fixes both issues and adds tests for them.
Subject: Net-Stomp-can_read.patch
=== modified file 'MANIFEST' --- MANIFEST 2008-06-06 14:41:14 +0000 +++ MANIFEST 2008-06-10 16:17:15 +0000 @@ -7,4 +7,7 @@ README t/pod.t t/pod_coverage.t +t/000-frame-serialization.t +t/001-bytes_message.t +t/101-can_read-buffer-bug.t META.yml === modified file 'lib/Net/Stomp.pm' --- lib/Net/Stomp.pm 2008-06-06 14:41:14 +0000 +++ lib/Net/Stomp.pm 2008-06-10 15:46:13 +0000 @@ -5,7 +5,11 @@ use IO::Select; use Net::Stomp::Frame; use base 'Class::Accessor::Fast'; -__PACKAGE__->mk_accessors(qw(hostname port select socket)); +__PACKAGE__->mk_accessors(qw( + hostname port + fdset socket + frames buffer +)); our $VERSION = '0.32'; sub new { @@ -19,10 +23,11 @@ die "Error connecting to " . $self->hostname . ':' . $self->port . ": $!" unless $socket; binmode($socket); + $self->socket($socket); - my $select = IO::Select->new(); - $select->add($socket); - $self->select($select); + $self->fdset(IO::Select->new($socket)); + $self->frames([]); + $self->buffer(''); return $self; } @@ -43,10 +48,46 @@ $self->socket->close; } +sub _parse_frames { + my $self = shift; + my $buf; + while ($self->fdset->can_read(0)) { + unless ($self->socket->sysread($buf, 1024)) { + $self->socket->close(); + die "End of file"; + } + $self->buffer($self->buffer . $buf); + } + while (1) { + my ($leftovers, $frame) = Net::Stomp::Frame->parse($self->buffer); + last unless $frame; + push(@{ $self->frames }, $frame); + $self->buffer($leftovers); + } +} + sub can_read { my ( $self, $conf ) = @_; - my $timeout = $conf->{timeout} || 0; - return $self->select->can_read($timeout) || 0; + my $timeout = $conf->{timeout}; + my $block = $conf->{block}; + + $self->_parse_frames(); + return @{ $self->frames } > 0 unless ($timeout || $block); + + while ($block || $timeout > 0) { + return 1 if @{ $self->frames } > 0; + if ($block) { + $self->fdset->can_read; + } + else { + my $start = time; + $self->fdset->can_read($timeout); + $timeout -= time - $start; + } + $self->_parse_frames(); + } + + return 0; } sub send { @@ -83,16 +124,16 @@ sub send_frame { my ( $self, $frame ) = @_; - # warn "send [" . $frame->as_string . "]\n"; +# warn "send [" . $frame->as_string . "]\n"; $self->socket->print( $frame->as_string ); } sub receive_frame { my $self = shift; - my $frame = Net::Stomp::Frame->parse( $self->socket ); - - # warn "receive [" . $frame->as_string . "]\n"; + $self->can_read({block => 1}); + my $frame = pop(@{ $self->frames }); +# warn "recv [" . $frame->as_string . "]\n"; return $frame; } @@ -268,11 +309,11 @@ =head2 can_read This returns whether a frame is waiting to be read. Optionally takes a -timeout in seconds: +timeout in seconds OR a directive to block until a frame is ready to read: my $can_read = $stomp->can_read; my $can_read = $stomp->can_read({ timeout => '0.1' }); - + $stomp->can_read({ block => 1 }); =head2 ack === modified file 'lib/Net/Stomp/Frame.pm' --- lib/Net/Stomp/Frame.pm 2008-06-06 14:41:14 +0000 +++ lib/Net/Stomp/Frame.pm 2008-06-10 15:44:36 +0000 @@ -22,52 +22,45 @@ while ( my ( $key, $value ) = each %{ $headers || {} } ) { $frame .= $key . ':' . $value . "\n"; } - $frame .= "\n"; - $frame .= $body || ''; - $frame .= "\000"; + + $body ||= ''; + return "$frame\n$body\000"; } sub parse { - my ( $package, $socket ) = @_; - local $/ = "\n"; - - # read the command - my $command; - while (1) { - $command = $socket->getline || die "Error reading command: $!"; - chop $command; - last if $command; - } - - # read headers - my $headers; - while (1) { - my $line = $socket->getline || die "Error reading header: $!"; - chop $line; - last if $line eq ""; - my ( $key, $value ) = split /: ?/, $line, 2; - $headers->{$key} = $value; - } - - # read the body - my $body; - if ( $headers->{"content-length"} ) { - $socket->read( $body, $headers->{"content-length"} ) - || die "Error reading body: $!"; - $socket->getc; # eat the trailing null + my ($class, $string) = @_; + my ($command, $headers, $body); + + my @failure = ($string, undef); + + ($command, $string) = split("\n", $string, 2); + return @failure unless $command; + + while (1) { + my ($header, $key, $val); + ($header, $string) = split("\n", $string, 2); + last unless $header; + ($key, $val) = split(/: */, $header, 2); + $headers->{$key} = $val; + } + + if (my $length = $headers->{'content-length'}) { + return @failure unless length($string) >= $length; $headers->{bytes_message} = 1; - } else { - while (1) { - my $byte = $socket->getc; - die "Error reading body: $!" unless defined $byte; - last if $byte eq "\000"; - $body .= $byte; - } + $body = substr($string, 0, $length); + $string = substr($string, $length + 1); # +1 to eat trailing null + } + else { + return @failure unless $string =~ "\000"; + ($body, $string) = split("\000", $string, 2); } - my $frame = Net::Stomp::Frame->new( - { command => $command, headers => $headers, body => $body } ); - return $frame; + my $frame = Net::Stomp::Frame->new({ + command => $command, + headers => $headers, + body => $body, + }); + return ($string, $frame); } 1; @@ -117,13 +110,22 @@ =head2 parse -Create a new L<Net::Somp::Frame> given a string containing the serialised frame: +Create a new L<Net::Stomp::Frame> given a string containing the serialised +frame: my $frame = Net::Stomp::Frame->parse($string); +If called in scalar context as above, the frame is returned (or undef if the +frame couldn't be parsed). + +In list context, the remainder of the string left over from parsing and the +frame are returned (or in failure mode, the original string and undef). + + my ($remaining, $frame) = Net::Stomp::Frame->parse($string); + =head2 as_string -Create a string containing the serialised frame representing the frame: +Create a string representing the serialized frame: my $string = $frame->as_string; === added file 't/000-frame-serialization.t' --- t/000-frame-serialization.t 1970-01-01 00:00:00 +0000 +++ t/000-frame-serialization.t 2008-06-10 16:02:34 +0000 @@ -0,0 +1,16 @@ +use warnings; +use strict; + +use Net::Stomp::Frame; +use Test::More tests => 1; + +my $body = "Row, row, row your boat."; + +my $frame = Net::Stomp::Frame->new({ + command => 'MESSAGE', + body => $body, + headers => {'message-id' => '12345'}, +}); + +my $str = $frame->as_string; +is($str, $frame->parse($str)->as_string, 'parse/as_string symmetry'); === added file 't/001-bytes_message.t' --- t/001-bytes_message.t 1970-01-01 00:00:00 +0000 +++ t/001-bytes_message.t 2008-06-10 16:02:55 +0000 @@ -0,0 +1,21 @@ +use warnings; +use strict; + +use Net::Stomp::Frame; +use Test::More tests => 5; + +my $body = '0' . "\000" . '123456789'; +my $str = Net::Stomp::Frame->new({ + command => 'MESSAGE', + headers => { + 'content-length' => 11, + 'destination' => '/queue/whatever' + }, + body => $body, +})->as_string . 'gibberish'; +my ($leftovers, $frame) = Net::Stomp::Frame->parse($str); +is($frame->command, 'MESSAGE', 'command'); +is($frame->headers->{destination}, '/queue/whatever', 'destination'); +is($frame->body, $body, 'body'); +ok($frame->headers->{bytes_message}, 'bytes_message'); +is($leftovers, 'gibberish', 'leftovers'); === added file 't/101-can_read-buffer-bug.t' --- t/101-can_read-buffer-bug.t 1970-01-01 00:00:00 +0000 +++ t/101-can_read-buffer-bug.t 2008-06-10 16:06:06 +0000 @@ -0,0 +1,67 @@ +use warnings; +use strict; + +use IO::Socket::INET; +use Net::Stomp; +use Net::Stomp::Frame; + +my $server = IO::Socket::INET->new(Listen => 1); + +sub server_main { + my $client = $server->accept(); + my $body = "o hai there"; + foreach my $i (1..3) { + use bytes; + $client->print(Net::Stomp::Frame->new({ + command => 'MESSAGE', + headers => { + destination => '/queue/wordbin', + 'message-id' => "$i", + 'content-length' => bytes::length($body), + }, + body => $body, + })->as_string); + } + sleep 1; # leave the socket open long enough for can_read to fail + # because if you close it, the buffering problem doesn't show up. + $client->close(); + $server->close(); + exit 0; +} + +sub client_main { + use Test::More tests => 3; + my $stomp = Net::Stomp->new({ + hostname => 'localhost', + port => $server->sockport + }); + # this makes the first frame come in + $stomp->connect({ login => 'hello', passcode => 'there' }); + + # second frame + $stomp->receive_frame(); + + # we should still be able to read + ok($stomp->can_read, 'can_read'); + + # third frame + $stomp->receive_frame(); + + # there should be no frames left + ok(!$stomp->can_read, 'cannot read'); + + sleep 2; # wait longer than the server + + eval { $stomp->can_read({timeout => 3}) }; + ok($@, 'dies on EOF'); + $stomp->socket->close(); +} + +if (my $pid = fork) { + client_main(); + waitpid $pid, 0; +} +else { + server_main(); +} +
Net-Stomp-0.33.tar.gz just hit CPAN with your patches.