Subject: | Message Disappareance |
SYNOPOSIS
The POE::Component::Client::Stomp appears to drop messages into the
ether; specifically even though the message queue server (ActiveMQ)
notes the message and the sender's log indicates the message is sent,
the message appears not to have been received.
PLATFORM
This is perl, v5.8.8 built for i86pc-solaris-thread-multi
Solaris Community Express, buil 70
(Note: Perl is built by http://www.blastwave.org/)
Apache ActiveMQ 4.1
THE ISSUE
I send a number of simple text messages of the format, "1", "2" through
to "#n" to a queue "activmeq/perl". What will happen, though, is that
some messages seem never to be acknowledged or received by the Stomp
lisener.
The algorithm of my tests goes:
# Record the messages sent
# Then check to make sure only one is sent (i.e. no duplicates)
# Then check if any are missing
It's this latter that fails, I get output like:
Max: 250
173 does not exist!
177 does not exist!
236 does not exist!
Errors: 3
Doing some diagnostics, I decided to write a client using Net::Stomp and
bypassing POE. With this Net::Stomp client I could send 25 000 messages
without a message being missed. I did this because I wanted to remove
POE from the equation (as it's obviously a lot more complex a system
than plain Net::Stomp).
My question is:
(assumption: it is POE that is somehow missing messages)
(basis: Net::Stomp copes with Java sending messages as fast as it can,
hence ActiveMQ or Java don't appear to be the cause)
* Why would POE miss messages?
* In what circumstances might POE "silently" just lose this data?
I adjusted the Java program to "Thread.sleep(100)" (sleep 100
milliseconds) between sending messages and no messages were lost. Then
to "Thread.sleep(5)" seconds and it seems that this module only lost 3
messages (as opposed to more than 50).
Whilst it would be unusual for it to be Java's problem, I tried again
with a Perl stomp client and got similar results:
* Messages would go missing
* Or the final messages wouldn't be seen by the POE stomp client
I'm happy to do more tests but I'm not quite sure what tests to perform
from here.
DSL
Subject: | StompClient.pl |
#!/usr/bin/perl
use strict;
use warnings;
use Carp;
$Carp::Verbose = 1;
use Data::Dumper;
use Net::Stomp;
my $client = Net::Stomp->new( {
hostname => 'localhost',
port => '61613',
});
print STDERR "About to connect...\n\n";
$client->connect({ login => '', passcode => ''});
my $message_counter = 0;
my $messages_seen = {};
$client->subscribe( {
destination => '/queue/activemq/perl',
ack => 'client',
});
while (1) {
my $frame = $client->receive_frame();
$client->ack( { frame => $frame});
my $message = $frame->body();
chomp $message;
$message_counter++;
$messages_seen->{$message} += 1 if ( $message =~ /^\d+$/ );
print STDERR "Message: $message ($message_counter)\n";
if ( $message eq "dump" ) {
print STDERR "Dump:\n";
foreach my $key ( sort { $a <=> $b } keys %{ $messages_seen } ) {
my $value = $messages_seen->{$key};
print STDERR "$key => $value\n";
}
print STDERR "Finished dumping...\n";
}
if ( $message eq "check" ) {
print STDERR "Check:\n";
foreach my $key ( sort { $a <=> $b } keys %{ $messages_seen } ) {
my $value = $messages_seen->{$key};
if ( $value != 1 ) {
print STDERR "E: $key => $value\n";
}
}
print STDERR "Finished checking...\n";
}
if ( $message eq "clear" ) {
print STDERR "Clearing...\n";
$messages_seen = {};
$message_counter = 0;
}
if ( $message =~ m/^max(\d+)/ ) {
my $max = $1;
print STDERR "Max: $max\n\n";
my $err_count = 0;
foreach ( 1 .. $max ) {
if ( !exists $messages_seen->{$_} ) {
print STDERR "$_ does not exist!\n";
$err_count += 1;
}
}
print STDERR "Errors: $err_count\n\n";
}
}
$client->disconnect();
print STDERR "Disconnected...\n\n";
1;
Subject: | stomp_test_queue.pl |
#!/usr/bin/perl
package Client;
use strict;
use warnings;
use Data::Dumper;
use POE;
use base qw(POE::Component::Client::Stomp);
use strict;
use warnings;
sub handle_connection {
my ( $kernel, $heap, $self ) = @_[ KERNEL, HEAP, OBJECT ];
print STDERR "We are handling a connection...\n";
$heap->{message_counter} = 0;
$heap->{messages_seen} = {};
my $nframe = $self->stomp->connect();
$kernel->yield( 'send_data' => $nframe );
}
sub handle_connected {
my ( $kernel, $self, $frame ) = @_[ KERNEL, OBJECT, ARG0 ];
print STDERR "Connected...\n";
my $nframe = $self->stomp->subscribe(
{
destination => $self->config('Queue'),
ack => 'client'
}
);
$kernel->yield( 'send_data' => $nframe );
}
sub handle_message {
my ( $kernel, $heap, $self, $session, $frame ) = @_[ KERNEL, HEAP, OBJECT, SESSION, ARG0 ];
my $message_id = $frame->headers->{'message-id'};
my $message = $frame->body();
chomp $message;
$heap->{message_counter} += 1;
$heap->{messages_seen}->{$message} += 1 if ( $message =~ /^\d+$/ );
print STDERR $message_id
. " => $message ("
. $heap->{message_counter} . ")\n";
if ( $message eq "dump" ) {
print STDERR "Dump:\n";
foreach my $key ( sort { $a <=> $b } keys %{ $heap->{messages_seen} } ) {
my $value = $heap->{messages_seen}->{$key};
print STDERR "$key => $value\n";
}
print STDERR "Finished dumpring...\n";
}
if ( $message eq "check" ) {
print STDERR "Check:\n";
foreach my $key ( sort { $a <=> $b } keys %{ $heap->{messages_seen} } ) {
my $value = $heap->{messages_seen}->{$key};
if ( $value != 1 ) {
print STDERR "E: $key => $value\n";
}
}
print STDERR "Finished checking...\n";
}
if ( $message eq "clear" ) {
print STDERR "Clearing...\n";
$heap->{messages_seen} = {};
$heap->{message_counter} = 0;
}
if ( $message =~ m/^max(\d+)/ ) {
my $max = $1;
print STDERR "Max: $max\n\n";
my $err_count = 0;
foreach ( 1 .. $max ) {
if ( !exists $heap->{messages_seen}->{$_} ) {
print STDERR "$_ does not exist!\n";
$err_count += 1;
}
}
print STDERR "Errors: $err_count\n\n";
}
my $nframe = $self->stomp->ack( { 'message-id' => $message_id } );
$kernel->yield( 'send_data', $nframe );
}
package main;
use POE;
use strict;
## This turns up without /queue because the /queue gets stripped
my $queue_name = "/queue/activemq/perl";
Client->spawn(
Alias => 'STOMPMDB',
Queue => $queue_name,
RemoteAddress => 'localhost',
RemotePort => '61613',
);
$poe_kernel->run();
print STDERR "Finishing!";
exit 0;
Subject: | send_stomp.pl |
#!/usr/bin/perl
$| = 1;
use strict;
use warnings;
use Carp;
use Net::Stomp;
## This turns up as just "mdb" because the /queue gets stripped
my $queue_name = "/queue/activemq/perl";
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
$stomp->connect();
my $count = 5000;
while ($count-- > 0) {
my $body = $count;
$stomp->send( { destination => $queue_name, body => $body } );
}
$stomp->send( {destination => $queue_name, body => 'check'});
$stomp->send( {destination => $queue_name, body => 'max$count'});
$stomp->disconnect();
Subject: | ActiveMQNative.java |
Message body not shown because it is not plain text.
Subject: | perldashv.output.txt |
Summary of my perl5 (revision 5 version 8 subversion 8) configuration:
Platform:
osname=solaris, osvers=2.8, archname=i86pc-solaris-thread-multi
uname='sunos thor 5.8 generic_117351-46 i86pc i386 i86pc'
config_args=''
hint=recommended, useposix=true, d_sigaction=define
usethreads=define use5005threads=undef useithreads=define usemultiplicity=define
useperlio=define d_sfio=undef uselargefiles=define usesocks=undef
use64bitint=undef use64bitall=undef uselongdouble=undef
usemymalloc=n, bincompat5005=undef
Compiler:
cc='cc', ccflags ='-D_REENTRANT -xO3 -xarch=386 -xspace -xildoff -I/opt/csw/bdb44/include -I/opt/csw/include -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64',
optimize='-xO3 -xarch=386 -xspace -xildoff',
cppflags='-D_REENTRANT -xO3 -xarch=386 -xspace -xildoff -I/opt/csw/bdb44/include -I/opt/csw/include'
ccversion='Sun C 5.8 Patch 121016-05 2007/01/10', gccversion='', gccosandvers=''
intsize=4, longsize=4, ptrsize=4, doublesize=8, byteorder=1234
d_longlong=define, longlongsize=8, d_longdbl=define, longdblsize=12
ivtype='long', ivsize=4, nvtype='double', nvsize=8, Off_t='off_t', lseeksize=8
alignbytes=4, prototype=define
Linker and Libraries:
ld='cc', ldflags ='-xarch=386 -L/opt/csw/bdb44/lib -L/opt/csw/lib -L/usr/lib -L/usr/ccs/lib -L/opt/SUNWspro/prod/lib -L/lib'
libpth=/usr/lib /usr/ccs/lib /opt/SUNWspro/prod/lib /lib /opt/csw/lib
libs=-lsocket -lnsl -lgdbm -ldb-4.4 -ldl -lm -lpthread -lc
perllibs=-lsocket -lnsl -ldb-4.4 -ldl -lm -lpthread -lc
libc=/lib/libc.so, so=so, useshrplib=true, libperl=libperl.so.5.8.8
gnulibc_version=''
Dynamic Linking:
dlsrc=dl_dlopen.xs, dlext=so, d_dlsymun=undef, ccdlflags='-R /opt/csw/lib'
cccdlflags='-KPIC', lddlflags='-G -L/opt/csw/bdb44/lib -L/opt/csw/lib -L/usr/lib -L/usr/ccs/lib -L/opt/SUNWspro/prod/lib -L/lib'
Characteristics of this binary (from libperl):
Compile-time options: MULTIPLICITY PERL_IMPLICIT_CONTEXT
PERL_MALLOC_WRAP USE_ITHREADS USE_LARGE_FILES
USE_PERLIO USE_REENTRANT_API USE_SITECUSTOMIZE
Built under solaris
Compiled at Mar 16 2007 11:31:22
@INC:
/opt/csw/lib/perl/5.8.8
/opt/csw/share/perl/5.8.8
/opt/csw/lib/perl/site_perl
/opt/csw/share/perl/site_perl
/opt/csw/share/perl/site_perl
/opt/csw/lib/perl/csw
/opt/csw/share/perl/csw
/opt/csw/share/perl/csw
.