Subject: | can_read and Frame->parse |
-- this is perl 5.10.0 on darwin2-level, Net::Stomp 0.32
can_read doesn't behave correctly. The docs say it returns true iff
there's a frame waiting to be read, but it's just a select() call to see
if there's data on the socket, which can return both false positives and
false negatives.
While investigating this, I noticed that Net::Stomp::Frame->parse was
modified at some point to take a socket as its argument. This is
inconsistent with docs and probably not what was wanted anyway. parse
and as_string should be symmetrical.
Attached is a patch that fixes both issues and adds tests for them.
Subject: | Net-Stomp-can_read.patch |
=== modified file 'MANIFEST'
--- MANIFEST 2008-06-06 14:41:14 +0000
+++ MANIFEST 2008-06-10 16:17:15 +0000
@@ -7,4 +7,7 @@
README
t/pod.t
t/pod_coverage.t
+t/000-frame-serialization.t
+t/001-bytes_message.t
+t/101-can_read-buffer-bug.t
META.yml
=== modified file 'lib/Net/Stomp.pm'
--- lib/Net/Stomp.pm 2008-06-06 14:41:14 +0000
+++ lib/Net/Stomp.pm 2008-06-10 15:46:13 +0000
@@ -5,7 +5,11 @@
use IO::Select;
use Net::Stomp::Frame;
use base 'Class::Accessor::Fast';
-__PACKAGE__->mk_accessors(qw(hostname port select socket));
+__PACKAGE__->mk_accessors(qw(
+ hostname port
+ fdset socket
+ frames buffer
+));
our $VERSION = '0.32';
sub new {
@@ -19,10 +23,11 @@
die "Error connecting to " . $self->hostname . ':' . $self->port . ": $!"
unless $socket;
binmode($socket);
+
$self->socket($socket);
- my $select = IO::Select->new();
- $select->add($socket);
- $self->select($select);
+ $self->fdset(IO::Select->new($socket));
+ $self->frames([]);
+ $self->buffer('');
return $self;
}
@@ -43,10 +48,46 @@
$self->socket->close;
}
+sub _parse_frames {
+ my $self = shift;
+ my $buf;
+ while ($self->fdset->can_read(0)) {
+ unless ($self->socket->sysread($buf, 1024)) {
+ $self->socket->close();
+ die "End of file";
+ }
+ $self->buffer($self->buffer . $buf);
+ }
+ while (1) {
+ my ($leftovers, $frame) = Net::Stomp::Frame->parse($self->buffer);
+ last unless $frame;
+ push(@{ $self->frames }, $frame);
+ $self->buffer($leftovers);
+ }
+}
+
sub can_read {
my ( $self, $conf ) = @_;
- my $timeout = $conf->{timeout} || 0;
- return $self->select->can_read($timeout) || 0;
+ my $timeout = $conf->{timeout};
+ my $block = $conf->{block};
+
+ $self->_parse_frames();
+ return @{ $self->frames } > 0 unless ($timeout || $block);
+
+ while ($block || $timeout > 0) {
+ return 1 if @{ $self->frames } > 0;
+ if ($block) {
+ $self->fdset->can_read;
+ }
+ else {
+ my $start = time;
+ $self->fdset->can_read($timeout);
+ $timeout -= time - $start;
+ }
+ $self->_parse_frames();
+ }
+
+ return 0;
}
sub send {
@@ -83,16 +124,16 @@
sub send_frame {
my ( $self, $frame ) = @_;
- # warn "send [" . $frame->as_string . "]\n";
+# warn "send [" . $frame->as_string . "]\n";
$self->socket->print( $frame->as_string );
}
sub receive_frame {
my $self = shift;
- my $frame = Net::Stomp::Frame->parse( $self->socket );
-
- # warn "receive [" . $frame->as_string . "]\n";
+ $self->can_read({block => 1});
+ my $frame = pop(@{ $self->frames });
+# warn "recv [" . $frame->as_string . "]\n";
return $frame;
}
@@ -268,11 +309,11 @@
=head2 can_read
This returns whether a frame is waiting to be read. Optionally takes a
-timeout in seconds:
+timeout in seconds OR a directive to block until a frame is ready to read:
my $can_read = $stomp->can_read;
my $can_read = $stomp->can_read({ timeout => '0.1' });
-
+ $stomp->can_read({ block => 1 });
=head2 ack
=== modified file 'lib/Net/Stomp/Frame.pm'
--- lib/Net/Stomp/Frame.pm 2008-06-06 14:41:14 +0000
+++ lib/Net/Stomp/Frame.pm 2008-06-10 15:44:36 +0000
@@ -22,52 +22,45 @@
while ( my ( $key, $value ) = each %{ $headers || {} } ) {
$frame .= $key . ':' . $value . "\n";
}
- $frame .= "\n";
- $frame .= $body || '';
- $frame .= "\000";
+
+ $body ||= '';
+ return "$frame\n$body\000";
}
sub parse {
- my ( $package, $socket ) = @_;
- local $/ = "\n";
-
- # read the command
- my $command;
- while (1) {
- $command = $socket->getline || die "Error reading command: $!";
- chop $command;
- last if $command;
- }
-
- # read headers
- my $headers;
- while (1) {
- my $line = $socket->getline || die "Error reading header: $!";
- chop $line;
- last if $line eq "";
- my ( $key, $value ) = split /: ?/, $line, 2;
- $headers->{$key} = $value;
- }
-
- # read the body
- my $body;
- if ( $headers->{"content-length"} ) {
- $socket->read( $body, $headers->{"content-length"} )
- || die "Error reading body: $!";
- $socket->getc; # eat the trailing null
+ my ($class, $string) = @_;
+ my ($command, $headers, $body);
+
+ my @failure = ($string, undef);
+
+ ($command, $string) = split("\n", $string, 2);
+ return @failure unless $command;
+
+ while (1) {
+ my ($header, $key, $val);
+ ($header, $string) = split("\n", $string, 2);
+ last unless $header;
+ ($key, $val) = split(/: */, $header, 2);
+ $headers->{$key} = $val;
+ }
+
+ if (my $length = $headers->{'content-length'}) {
+ return @failure unless length($string) >= $length;
$headers->{bytes_message} = 1;
- } else {
- while (1) {
- my $byte = $socket->getc;
- die "Error reading body: $!" unless defined $byte;
- last if $byte eq "\000";
- $body .= $byte;
- }
+ $body = substr($string, 0, $length);
+ $string = substr($string, $length + 1); # +1 to eat trailing null
+ }
+ else {
+ return @failure unless $string =~ "\000";
+ ($body, $string) = split("\000", $string, 2);
}
- my $frame = Net::Stomp::Frame->new(
- { command => $command, headers => $headers, body => $body } );
- return $frame;
+ my $frame = Net::Stomp::Frame->new({
+ command => $command,
+ headers => $headers,
+ body => $body,
+ });
+ return ($string, $frame);
}
1;
@@ -117,13 +110,22 @@
=head2 parse
-Create a new L<Net::Somp::Frame> given a string containing the serialised frame:
+Create a new L<Net::Stomp::Frame> given a string containing the serialised
+frame:
my $frame = Net::Stomp::Frame->parse($string);
+If called in scalar context as above, the frame is returned (or undef if the
+frame couldn't be parsed).
+
+In list context, the remainder of the string left over from parsing and the
+frame are returned (or in failure mode, the original string and undef).
+
+ my ($remaining, $frame) = Net::Stomp::Frame->parse($string);
+
=head2 as_string
-Create a string containing the serialised frame representing the frame:
+Create a string representing the serialized frame:
my $string = $frame->as_string;
=== added file 't/000-frame-serialization.t'
--- t/000-frame-serialization.t 1970-01-01 00:00:00 +0000
+++ t/000-frame-serialization.t 2008-06-10 16:02:34 +0000
@@ -0,0 +1,16 @@
+use warnings;
+use strict;
+
+use Net::Stomp::Frame;
+use Test::More tests => 1;
+
+my $body = "Row, row, row your boat.";
+
+my $frame = Net::Stomp::Frame->new({
+ command => 'MESSAGE',
+ body => $body,
+ headers => {'message-id' => '12345'},
+});
+
+my $str = $frame->as_string;
+is($str, $frame->parse($str)->as_string, 'parse/as_string symmetry');
=== added file 't/001-bytes_message.t'
--- t/001-bytes_message.t 1970-01-01 00:00:00 +0000
+++ t/001-bytes_message.t 2008-06-10 16:02:55 +0000
@@ -0,0 +1,21 @@
+use warnings;
+use strict;
+
+use Net::Stomp::Frame;
+use Test::More tests => 5;
+
+my $body = '0' . "\000" . '123456789';
+my $str = Net::Stomp::Frame->new({
+ command => 'MESSAGE',
+ headers => {
+ 'content-length' => 11,
+ 'destination' => '/queue/whatever'
+ },
+ body => $body,
+})->as_string . 'gibberish';
+my ($leftovers, $frame) = Net::Stomp::Frame->parse($str);
+is($frame->command, 'MESSAGE', 'command');
+is($frame->headers->{destination}, '/queue/whatever', 'destination');
+is($frame->body, $body, 'body');
+ok($frame->headers->{bytes_message}, 'bytes_message');
+is($leftovers, 'gibberish', 'leftovers');
=== added file 't/101-can_read-buffer-bug.t'
--- t/101-can_read-buffer-bug.t 1970-01-01 00:00:00 +0000
+++ t/101-can_read-buffer-bug.t 2008-06-10 16:06:06 +0000
@@ -0,0 +1,67 @@
+use warnings;
+use strict;
+
+use IO::Socket::INET;
+use Net::Stomp;
+use Net::Stomp::Frame;
+
+my $server = IO::Socket::INET->new(Listen => 1);
+
+sub server_main {
+ my $client = $server->accept();
+ my $body = "o hai there";
+ foreach my $i (1..3) {
+ use bytes;
+ $client->print(Net::Stomp::Frame->new({
+ command => 'MESSAGE',
+ headers => {
+ destination => '/queue/wordbin',
+ 'message-id' => "$i",
+ 'content-length' => bytes::length($body),
+ },
+ body => $body,
+ })->as_string);
+ }
+ sleep 1; # leave the socket open long enough for can_read to fail
+ # because if you close it, the buffering problem doesn't show up.
+ $client->close();
+ $server->close();
+ exit 0;
+}
+
+sub client_main {
+ use Test::More tests => 3;
+ my $stomp = Net::Stomp->new({
+ hostname => 'localhost',
+ port => $server->sockport
+ });
+ # this makes the first frame come in
+ $stomp->connect({ login => 'hello', passcode => 'there' });
+
+ # second frame
+ $stomp->receive_frame();
+
+ # we should still be able to read
+ ok($stomp->can_read, 'can_read');
+
+ # third frame
+ $stomp->receive_frame();
+
+ # there should be no frames left
+ ok(!$stomp->can_read, 'cannot read');
+
+ sleep 2; # wait longer than the server
+
+ eval { $stomp->can_read({timeout => 3}) };
+ ok($@, 'dies on EOF');
+ $stomp->socket->close();
+}
+
+if (my $pid = fork) {
+ client_main();
+ waitpid $pid, 0;
+}
+else {
+ server_main();
+}
+