Subject: | Breaks with multiple body frames |
Hi,
when a message content is split up in multiple body frames, AnyEvent::RabbitMQ::Channel
doesn't handle it correctly.
I attached a test and what I think might be a fix for that. Would be cool if you could have a look
at it.
Btw. I didn't get the 'recover' test to run (with or without the patch) so I'm not sure if everything
still works as expected.
Cheers,
Othello
Subject: | anyevent-rabbitmq.patch |
diff -u -r AnyEvent-RabbitMQ-1.03/lib/AnyEvent/RabbitMQ/Channel.pm AnyEvent-RabbitMQ-1.03-patched/lib/AnyEvent/RabbitMQ/Channel.pm
--- AnyEvent-RabbitMQ-1.03/lib/AnyEvent/RabbitMQ/Channel.pm 2011-04-06 04:56:45.000000000 +0200
+++ AnyEvent-RabbitMQ-1.03-patched/lib/AnyEvent/RabbitMQ/Channel.pm 2011-07-18 14:49:08.836538341 +0200
@@ -625,6 +625,8 @@
my ($type, $frame, $cb, $failure_cb,) = @_;
my $response = {$type => $frame};
+ my $body_size = 0;
+
$self->{_content_queue}->get(sub{
my $frame = shift;
@@ -638,17 +640,32 @@
) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');
$response->{header} = $header_frame;
+ $body_size = $frame->body_size;
});
- $self->{_content_queue}->get(sub{
+ my $build_body;
+ $build_body = sub {
my $frame = shift;
return $failure_cb->('Received data is not body frame')
if !$frame->isa('Net::AMQP::Frame::Body');
- $response->{body} = $frame;
- $cb->($response);
- });
+ if (!$response->{body}) {
+ $response->{body} = $frame;
+ }
+ else {
+ $response->{body}->payload($response->{body}->payload . $frame->payload);
+ }
+
+ if (length($response->{body}->payload) < $body_size) {
+ $self->{_content_queue}->get($build_body);
+ }
+ else {
+ $cb->($response);
+ }
+ };
+
+ $self->{_content_queue}->get($build_body);
return $self;
}
diff -u -r AnyEvent-RabbitMQ-1.03/xt/04_anyevent.t AnyEvent-RabbitMQ-1.03-patched/xt/04_anyevent.t
--- AnyEvent-RabbitMQ-1.03/xt/04_anyevent.t 2011-04-07 04:29:12.000000000 +0200
+++ AnyEvent-RabbitMQ-1.03-patched/xt/04_anyevent.t 2011-07-18 16:37:41.277338837 +0200
@@ -32,7 +32,7 @@
plan skip_all => 'Connection failure: '
. $conf{host} . ':' . $conf{port} if $@;
-plan tests => 25;
+plan tests => 26;
use AnyEvent::RabbitMQ;
@@ -142,6 +142,33 @@
);
$done->recv;
+# publish a large message
+$done = AnyEvent->condvar;
+$ch->consume(
+ queue => 'test_q',
+ on_success => sub {
+ my $frame = shift;
+ $consumer_tag = $frame->method_frame->consumer_tag;
+ },
+ on_consume => sub {
+ my $response = shift;
+ ok($response->{body}->payload eq 'a'x140_000, 'consume large msg');
+ $done->send;
+ },
+);
+publish($ch, 'a'x140_000, $done,);
+$done->recv;
+
+$done = AnyEvent->condvar;
+$ch->cancel(
+ consumer_tag => $consumer_tag,
+ on_success => sub {
+ $done->send;
+ },
+ on_failure => failure_cb($done),
+);
+$done->recv;
+
$done = AnyEvent->condvar;
publish($ch, 'I love RabbitMQ.', $done,);
$ch->get(