Skip Menu |

This queue is for tickets about the AnyEvent-RabbitMQ CPAN distribution.

Report information
The Basics
Id: 83833
Status: resolved
Priority: 0/
Queue: AnyEvent-RabbitMQ

People
Owner: bobtfish [...] bobtfish.net
Requestors: DLAMBLEY [...] cpan.org
Cc:
AdminCc:

Bug Information
Severity: Normal
Broken in: 1.11
Fixed in: (no value)



Subject: Generate heartbeat packets according to spec.
The AMQP 0.9.1 specification appears to say the client should generate heartbeat packets without regard to what the server has sent. This patch changes AnyEvent::RabbitMQ to send a heartbeat packet at the frequency requested by the server, regardless of when the server sent a heartbeat. I have pushed this change into, https://github.com/bobtfish/AnyEvent- RabbitMQ Dave
Subject: heartbeat.patch
commit af7c3138d9a5a8e9f7a110614ef7e83f5509d260 Author: Dave Lambley <davel@state51.co.uk> Date: Fri Mar 8 14:12:13 2013 +0000 Spontaneously emit hearts as per amqp 0.9.1 spec. The AMQP spec says, "The client should start sending heartbeats after receiving a Connection.Tune method, and start monitoring heartbeats after receiving Connection.Open." There is no mention of merely responding to heartbeat packets emitted by the server. diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm index 62ef7c5..6bc83a0 100644 --- a/lib/AnyEvent/RabbitMQ.pm +++ b/lib/AnyEvent/RabbitMQ.pm @@ -186,10 +186,13 @@ sub _read_loop { '-----------', "\n"; } + # TODO - check that a packet has been received within two times the + # heartbeat period. + my $id = $frame->channel; if (0 == $id) { if ($frame->type_id == 8) { - $self->_push_write(Net::AMQP::Frame::Heartbeat->new()); + # Heartbeat, no action needs taking. return; } return if !$self->_check_close_and_clean($frame, $close_cb,); @@ -220,6 +223,7 @@ sub _check_close_and_clean { my $method_frame = $frame->method_frame; return 1 if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close'); + delete $self->{_heartbeat}; $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new()); $self->{_channels} = {}; $self->{_is_open} = 0; @@ -296,6 +300,17 @@ sub _tune { ); $self->_open(%args,); + + if ($frame->method_frame->heartbeat > 0) { + $self->{_heartbeat} = AnyEvent->timer( + after => $frame->method_frame->heartbeat, + interval => $frame->method_frame->heartbeat, + cb => sub { + $self->_push_write(Net::AMQP::Frame::Heartbeat->new()); + }, + ); + } + }, $args{on_failure}, );
Shipped!