Subject: | Generate heartbeat packets according to spec. |
The AMQP 0.9.1 specification appears to say the client should generate
heartbeat packets without regard to what the server has sent. This patch
changes AnyEvent::RabbitMQ to send a heartbeat packet at the frequency
requested by the server, regardless of when the server sent a heartbeat.
I have pushed this change into, https://github.com/bobtfish/AnyEvent-
RabbitMQ
Dave
Subject: | heartbeat.patch |
commit af7c3138d9a5a8e9f7a110614ef7e83f5509d260
Author: Dave Lambley <davel@state51.co.uk>
Date: Fri Mar 8 14:12:13 2013 +0000
Spontaneously emit hearts as per amqp 0.9.1 spec.
The AMQP spec says, "The client should start sending heartbeats after
receiving a Connection.Tune method, and start monitoring heartbeats after
receiving Connection.Open." There is no mention of merely responding to
heartbeat packets emitted by the server.
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index 62ef7c5..6bc83a0 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -186,10 +186,13 @@ sub _read_loop {
'-----------', "\n";
}
+ # TODO - check that a packet has been received within two times the
+ # heartbeat period.
+
my $id = $frame->channel;
if (0 == $id) {
if ($frame->type_id == 8) {
- $self->_push_write(Net::AMQP::Frame::Heartbeat->new());
+ # Heartbeat, no action needs taking.
return;
}
return if !$self->_check_close_and_clean($frame, $close_cb,);
@@ -220,6 +223,7 @@ sub _check_close_and_clean {
my $method_frame = $frame->method_frame;
return 1 if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close');
+ delete $self->{_heartbeat};
$self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
$self->{_channels} = {};
$self->{_is_open} = 0;
@@ -296,6 +300,17 @@ sub _tune {
);
$self->_open(%args,);
+
+ if ($frame->method_frame->heartbeat > 0) {
+ $self->{_heartbeat} = AnyEvent->timer(
+ after => $frame->method_frame->heartbeat,
+ interval => $frame->method_frame->heartbeat,
+ cb => sub {
+ $self->_push_write(Net::AMQP::Frame::Heartbeat->new());
+ },
+ );
+ }
+
},
$args{on_failure},
);