Subject: | Improvements in the face of broker disconnects |
Hi, thanks for writing the useful Net::Stomp library. We at Cloudmark
have made some improvements to the library to gracefully handle
sending/receiving messages in the face of brokers going down and coming
back up.
I've attached a summary of the changes and a patch file based on the
0.41 tarball that incorporates the changes.
If you have further improvements or refinements you'd like to see, we'd
be happy to make them. Otherwise, enjoy the changes.
Subject: | 01-cloudmark-disconnect-improvements-README.txt |
We made the following improvements to the Net::Stomp library to be
more robust and return more concrete errors in the face of broker
disconnects:
- Many functions didn't have a clearly documented return
value. Updated all functions to have a return value. Generally 1
implies success and 0 implies failure. See the perl docs for more
details on the return values from each function.
- Add a new option to the constructor 'ignore_sigpipe'. If provided
and set to 1, it will ignore SIGPIPE so writes to closed sockets
don't result in killing the process.
- Add a new option to the constructor 'reconnect_attempts'. If
provided and set to non-zero, when sending/receiving messages, the
library will only attempt to reconnect this many times before
giving up and returning an error. By default, this is set to 0,
which retains the old behavior of attempting to reconnect forever
on any failure.
- In connect(), verify that a response frame was actually read and
actually had a CONNECTED response.
- Whenever calling functions like send/send_frame internally, check
the return value and stop if the action does not succeed.
- Add new functions send_with_receipt/send_frame_with_receipt that
send messages with a receipt header, which forces the server to
reply with a RECEIPT message.
- Modify higher-level functions like subscribe/send_transactional to
always use send_with_receipt/send_frame_with_receipt, to avoid
problems where an ERROR response from a message that would
otherwise not return anything ends up being accidentally read as
the response to a later message.
- Monitor the value of syswrite when writing messages to the socket
to ensure that all of the message was written.
Subject: | 01-cloudmark-disconnect-improvements.patch |
Index: lib/Net/Stomp.pm
===================================================================
13c13
< ssl_options subscriptions _connect_headers bufsize
---
> ssl_options subscriptions _connect_headers bufsize reconnect_attempts ignore_sigpipe
29a30,40
> # Make the number of reconnection attempts per send/receive
> # attempt configurable. The default is 0 (retry forever).
> if (! defined $self->reconnect_attempts) {
> $self->reconnect_attempts(0);
> }
>
> # If configured to do so, ignore SIGPIPE.
> if (defined $self->ignore_sigpipe && $self->ignore_sigpipe == 1) {
> $SIG{PIPE} = 'IGNORE';
> }
>
111c122
< my ( $self, $conf ) = @_;
---
> my ( $self, $conf, $response_frame_ref ) = @_;
115c126,129
< $self->send_frame($frame);
---
> if ($self->send_frame($frame) == 0) {
> return undef;
> }
>
117a132,148
> # If response_frame_ref is defined, set it to the response.
> if (defined $response_frame_ref) {
> $$response_frame_ref = $frame;
> }
>
> # If receive_frame didn't actually return anything, we can return
> # failure immediately.
> if (! defined $frame) {
> return 0;
> }
>
> # If receive_frame didn't return a CONNECTED response, return
> # failure.
> if ($frame->command ne "CONNECTED") {
> return 0;
> }
>
123c154
< return $frame;
---
> return 1;
128a160,161
>
> # Note: ignoring result from send_frame.
135a169
>
140,142c174,175
< while ($@) {
< sleep(5);
< eval { $self->_get_connection };
---
> if ($@) {
> return;
144c177,182
< $self->connect( $self->_connect_headers );
---
>
> if ($self->connect( $self->_connect_headers ) == 0) {
> $self->socket->close;
> return;
> }
>
146c184,187
< $self->subscribe($self->subscriptions->{$sub});
---
> if ($self->subscribe($self->subscriptions->{$sub}) == 0) {
> $self->socket->close;
> return;
> }
173c214,225
< $self->send_frame($frame);
---
> return $self->send_frame($frame);
> }
>
> sub send_with_receipt {
> my ($self, $conf, $send_response_ref) = @_;
>
> my $body = $conf->{body};
> delete $conf->{body};
> my $frame = Net::Stomp::Frame->new(
> { command => 'SEND', headers => $conf, body => $body } );
>
> return $self->send_frame_with_receipt($frame, $send_response_ref);
180a233,239
> # Note: as this function depends on a sequence of messages, any of
> # which *may* return an error from the broker on failure, we send
> # all messages using send_with_receipt/send_frame_with_receipt. If
> # we didn't include receipt ids, one of these messages could
> # return 0 or 1 responses back, which throws off the pipeline for
> # subsequent messages or subsequent calls to send_transactional.
>
182a242
>
187c247,249
< $self->send_frame($begin_frame);
---
> if ($self->send_frame_with_receipt($begin_frame) == 0) {
> return 0;
> }
190,200d251
< my $receipt_id = $self->_get_next_transaction;
< $conf->{receipt} = $receipt_id;
< my $message_frame = Net::Stomp::Frame->new(
< { command => 'SEND', headers => $conf, body => $body } );
< $self->send_frame($message_frame);
<
< # check the receipt
< my $receipt_frame = $self->receive_frame;
< if ( $receipt_frame->command eq 'RECEIPT'
< && $receipt_frame->headers->{'receipt-id'} eq $receipt_id )
< {
202,209c253,254
< # success, commit the transaction
< my $frame_commit = Net::Stomp::Frame->new(
< { command => 'COMMIT',
< headers => { transaction => $transaction_id }
< }
< );
< return $self->send_frame($frame_commit);
< } else {
---
> if ($self->send_with_receipt($conf, undef) == 0) {
> # The send wasn't acknowledged. ABORT the transaction.
211d255
< # some failure, abort transaction
217c261,264
< $self->send_frame($frame_abort);
---
>
> # Note: ignoring result from send_frame_with_receipt, as we're
> # going to return an error anyway.
> $self->send_frame_with_receipt($frame_abort);
219a267,274
>
> # success, commit the transaction
> my $frame_commit = Net::Stomp::Frame->new(
> { command => 'COMMIT',
> headers => { transaction => $transaction_id }
> }
> );
> return $self->send_frame_with_receipt($frame_commit);
226c281,283
< $self->send_frame($frame);
---
> if ($self->send_frame_with_receipt($frame) == 0) {
> return 0;
> }
228a286,287
>
> return 1;
235c294,296
< $self->send_frame($frame);
---
> if ($self->send_frame_with_receipt($frame) == 0) {
> return 0;
> }
237a299,300
>
> return 1;
245c308
< $self->send_frame($frame);
---
> return $self->send_frame_with_receipt($frame);
251,254c314,331
< # warn "send [" . $frame->as_string . "]\n";
< $self->socket->syswrite( $frame->as_string );
< my $connected = $self->socket->connected;
< unless (defined $connected) {
---
> # Try up to reconnect_attempts times to send the frame (or
> # forever, if reconnect_attempts is 0).
>
> for(my $attempts = $self->reconnect_attempts;
> $self->reconnect_attempts == 0 || $attempts > 0;
> $attempts--) {
>
> # warn "send [" . $frame->as_string . "]\n";
> my $written = $self->socket->syswrite( $frame->as_string );
>
> if (defined $written && $written == length($frame->as_string)) {
> return 1;
> }
>
> # If here, we couldn't send the frame, which was either due to a
> # short write or due to an inability to write at all. In either
> # case, try to reconnect.
> sleep(5);
256d332
< $self->send_frame($frame);
257a334,371
>
> # If here, we couldn't send. Give up.
> return 0;
> }
>
> sub send_frame_with_receipt {
> my ($self, $frame, $send_response_ref) = @_;
>
> if (defined $send_response_ref) {
> $$send_response_ref = undef;
> }
>
> my $receipt_id = $self->_get_next_transaction;
> $frame->headers->{receipt} = $receipt_id;
>
> if ($self->send_frame($frame) == 0) {
> return 0;
> }
>
> # Read the response, which will either be a RECEIPT or some
> # ERROR. Always wait up to 30 seconds for a response.
>
> my $response = $self->receive_frame({timeout => 30});
>
> if (! defined $response) {
> return 0;
> }
>
> if (defined $send_response_ref) {
> $$send_response_ref = $response;
> }
>
> if ($response->command ne 'RECEIPT' ||
> $response->headers->{'receipt-id'} ne $receipt_id) {
> return 0;
> }
>
> return 1;
339c453,460
< unless (defined $connected) {
---
>
> # Try up to reconnect_attempts times to connect (or
> # forever, if reconnect_attempts is 0).
> for(my $attempts = $self->reconnect_attempts;
> (! defined $connected && ($self->reconnect_attempts == 0 || $attempts > 0));
> $attempts--) {
>
> sleep(5);
340a462
> $connected = $self->socket->connected;
342a465,467
> # If we still can't connect, give up.
> return undef if (! defined $connected);
>
467a593,615
> By default, if the broker goes away, socket writes could result in a
> SIGPIPE signal, terminating the process. If you pass the option
> ignore_sigpipe to new with value 1, the library will set up a signal
> handler for SIGPIPE that ignores the signal:
>
> my $stomp = Net::Stomp->new( {
> hostname => 'localhost',
> port => '61612',
> ignore_sigpipe => 1
> } );
>
> By default, when sending/receiving a message, if the library is not
> currently connected to the broker, it will attempt to reconnect
> (forever) before sending/receiving the message. If you want to limit
> the number of reconnection attempts that occur before giving up and
> returning an error, pass the option reconnect_attempts in new():
>
> my $stomp = Net::Stomp->new( {
> hostname => 'localhost',
> port => '61612',
> reconnect_attempts => 5
> } );
>
481,482c629,631
< This connects to the Stomp server. You may pass in a C<login> and
< C<passcode> options.
---
> This connects to the Stomp server. The first argument is a hash of
> headers to include along with the CONNECT message. In that hash, you
> may pass in a C<login> and C<passcode> options.
488c637,644
< $stomp->connect( { login => 'hello', passcode => 'there' } );
---
> The second (optional) argument is a scalar reference which passes back
> the response from the CONNECT message. It will either be set to undef
> if the connect failed, or a Net::Stomp::Frame object containing the
> response.
>
> $stomp->connect( { login => 'hello', passcode => 'there' }, \$connect_response );
>
> connect() returns 1 on success, 0 on failure.
498c654,678
< To send a BytesMessage, you should set the field 'bytes_message' to 1.
---
> To send a BytesMessage, you should set the field 'bytes_message' to
> 1.
>
> This function returns 1 on success, 0 on failure.
>
> =head2 send_with_receipt
>
> This sends a message to a queue or topic, and requires a positive
> acknowledgement via a RECEIPT response. You must pass in a destination
> and a body.
>
> The second (optional) argument is a scalar reference which passes back
> the response from the SEND message. It will either be set to undef if
> the message could not be sent, or a Net::Stomp::Frame object
> containing the response. This response will either be a RECEIPT
> message or some ERROR message.
>
> $stomp->send_with_receipt(
> { destination => '/queue/foo', body => 'test message' }, \$send_response );
>
> To send a BytesMessage, you should set the field 'bytes_message' to
> 1.
>
> This function returns 1 on success (i.e. the message was successfully
> sent and a RECEIPT message received), or 0 on failure.
514a695,697
> This function returns 1 on success (i.e. the message was sent and
> committed), 0 on failure.
>
575a759,760
> This function returns 1 on success, 0 on failure.
>
581a767,768
> This function returns 1 on success, 0 on failure.
>
596a784,786
> This function returns a Net::Stomp::Frame object on success, undef on
> failure (or timeout).
>
624a815,835
> This function returns 1 if the frame could be sent, 0 otherwise.
>
> =head2 send_frame_with_receipt
>
> This sends a single frame and requires a positive acknowledgement via
> a RECEIPT response.
>
> The second (optional) argument is a scalar reference which passes back
> the response from sending the frame. It will either be set to undef if
> the frame could not be sent, or a Net::Stomp::Frame object
> containing the response. This response will either be a RECEIPT
> message or some ERROR message.
>
> my $frame = Net::Stomp::Frame->new(
> { command => $command, headers => $conf, body => $body } );
>
> $stomp->send_frame_with_receipt($frame, \$send_response);
>
> This function returns 1 on success (i.e. the message was successfully
> sent and a RECEIPT message received), or 0 on failure.
>
635a847
> Mark Stemm <mstemm@cloudmark.com>