Skip Menu |

This queue is for tickets about the POE-Component-Client-Stomp CPAN distribution.

Report information
The Basics
Id: 30072
Status: resolved
Worked: 2 hours (120 min)
Priority: 0/
Queue: POE-Component-Client-Stomp

People
Owner: kesteb [...] wsipc.org
Requestors: lloy0076 [...] adam.com.au
Cc:
AdminCc:

Bug Information
Severity: Important
Broken in: 0.04
Fixed in: (no value)



Subject: Message Disappareance
SYNOPOSIS The POE::Component::Client::Stomp appears to drop messages into the ether; specifically even though the message queue server (ActiveMQ) notes the message and the sender's log indicates the message is sent, the message appears not to have been received. PLATFORM This is perl, v5.8.8 built for i86pc-solaris-thread-multi Solaris Community Express, buil 70 (Note: Perl is built by http://www.blastwave.org/) Apache ActiveMQ 4.1 THE ISSUE I send a number of simple text messages of the format, "1", "2" through to "#n" to a queue "activmeq/perl". What will happen, though, is that some messages seem never to be acknowledged or received by the Stomp lisener. The algorithm of my tests goes: # Record the messages sent # Then check to make sure only one is sent (i.e. no duplicates) # Then check if any are missing It's this latter that fails, I get output like: Max: 250 173 does not exist! 177 does not exist! 236 does not exist! Errors: 3 Doing some diagnostics, I decided to write a client using Net::Stomp and bypassing POE. With this Net::Stomp client I could send 25 000 messages without a message being missed. I did this because I wanted to remove POE from the equation (as it's obviously a lot more complex a system than plain Net::Stomp). My question is: (assumption: it is POE that is somehow missing messages) (basis: Net::Stomp copes with Java sending messages as fast as it can, hence ActiveMQ or Java don't appear to be the cause) * Why would POE miss messages? * In what circumstances might POE "silently" just lose this data? I adjusted the Java program to "Thread.sleep(100)" (sleep 100 milliseconds) between sending messages and no messages were lost. Then to "Thread.sleep(5)" seconds and it seems that this module only lost 3 messages (as opposed to more than 50). Whilst it would be unusual for it to be Java's problem, I tried again with a Perl stomp client and got similar results: * Messages would go missing * Or the final messages wouldn't be seen by the POE stomp client I'm happy to do more tests but I'm not quite sure what tests to perform from here. DSL
Subject: StompClient.pl
#!/usr/bin/perl use strict; use warnings; use Carp; $Carp::Verbose = 1; use Data::Dumper; use Net::Stomp; my $client = Net::Stomp->new( { hostname => 'localhost', port => '61613', }); print STDERR "About to connect...\n\n"; $client->connect({ login => '', passcode => ''}); my $message_counter = 0; my $messages_seen = {}; $client->subscribe( { destination => '/queue/activemq/perl', ack => 'client', }); while (1) { my $frame = $client->receive_frame(); $client->ack( { frame => $frame}); my $message = $frame->body(); chomp $message; $message_counter++; $messages_seen->{$message} += 1 if ( $message =~ /^\d+$/ ); print STDERR "Message: $message ($message_counter)\n"; if ( $message eq "dump" ) { print STDERR "Dump:\n"; foreach my $key ( sort { $a <=> $b } keys %{ $messages_seen } ) { my $value = $messages_seen->{$key}; print STDERR "$key => $value\n"; } print STDERR "Finished dumping...\n"; } if ( $message eq "check" ) { print STDERR "Check:\n"; foreach my $key ( sort { $a <=> $b } keys %{ $messages_seen } ) { my $value = $messages_seen->{$key}; if ( $value != 1 ) { print STDERR "E: $key => $value\n"; } } print STDERR "Finished checking...\n"; } if ( $message eq "clear" ) { print STDERR "Clearing...\n"; $messages_seen = {}; $message_counter = 0; } if ( $message =~ m/^max(\d+)/ ) { my $max = $1; print STDERR "Max: $max\n\n"; my $err_count = 0; foreach ( 1 .. $max ) { if ( !exists $messages_seen->{$_} ) { print STDERR "$_ does not exist!\n"; $err_count += 1; } } print STDERR "Errors: $err_count\n\n"; } } $client->disconnect(); print STDERR "Disconnected...\n\n"; 1;
Subject: stomp_test_queue.pl
#!/usr/bin/perl package Client; use strict; use warnings; use Data::Dumper; use POE; use base qw(POE::Component::Client::Stomp); use strict; use warnings; sub handle_connection { my ( $kernel, $heap, $self ) = @_[ KERNEL, HEAP, OBJECT ]; print STDERR "We are handling a connection...\n"; $heap->{message_counter} = 0; $heap->{messages_seen} = {}; my $nframe = $self->stomp->connect(); $kernel->yield( 'send_data' => $nframe ); } sub handle_connected { my ( $kernel, $self, $frame ) = @_[ KERNEL, OBJECT, ARG0 ]; print STDERR "Connected...\n"; my $nframe = $self->stomp->subscribe( { destination => $self->config('Queue'), ack => 'client' } ); $kernel->yield( 'send_data' => $nframe ); } sub handle_message { my ( $kernel, $heap, $self, $session, $frame ) = @_[ KERNEL, HEAP, OBJECT, SESSION, ARG0 ]; my $message_id = $frame->headers->{'message-id'}; my $message = $frame->body(); chomp $message; $heap->{message_counter} += 1; $heap->{messages_seen}->{$message} += 1 if ( $message =~ /^\d+$/ ); print STDERR $message_id . " => $message (" . $heap->{message_counter} . ")\n"; if ( $message eq "dump" ) { print STDERR "Dump:\n"; foreach my $key ( sort { $a <=> $b } keys %{ $heap->{messages_seen} } ) { my $value = $heap->{messages_seen}->{$key}; print STDERR "$key => $value\n"; } print STDERR "Finished dumpring...\n"; } if ( $message eq "check" ) { print STDERR "Check:\n"; foreach my $key ( sort { $a <=> $b } keys %{ $heap->{messages_seen} } ) { my $value = $heap->{messages_seen}->{$key}; if ( $value != 1 ) { print STDERR "E: $key => $value\n"; } } print STDERR "Finished checking...\n"; } if ( $message eq "clear" ) { print STDERR "Clearing...\n"; $heap->{messages_seen} = {}; $heap->{message_counter} = 0; } if ( $message =~ m/^max(\d+)/ ) { my $max = $1; print STDERR "Max: $max\n\n"; my $err_count = 0; foreach ( 1 .. $max ) { if ( !exists $heap->{messages_seen}->{$_} ) { print STDERR "$_ does not exist!\n"; $err_count += 1; } } print STDERR "Errors: $err_count\n\n"; } my $nframe = $self->stomp->ack( { 'message-id' => $message_id } ); $kernel->yield( 'send_data', $nframe ); } package main; use POE; use strict; ## This turns up without /queue because the /queue gets stripped my $queue_name = "/queue/activemq/perl"; Client->spawn( Alias => 'STOMPMDB', Queue => $queue_name, RemoteAddress => 'localhost', RemotePort => '61613', ); $poe_kernel->run(); print STDERR "Finishing!"; exit 0;
Subject: send_stomp.pl
#!/usr/bin/perl $| = 1; use strict; use warnings; use Carp; use Net::Stomp; ## This turns up as just "mdb" because the /queue gets stripped my $queue_name = "/queue/activemq/perl"; my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } ); $stomp->connect(); my $count = 5000; while ($count-- > 0) { my $body = $count; $stomp->send( { destination => $queue_name, body => $body } ); } $stomp->send( {destination => $queue_name, body => 'check'}); $stomp->send( {destination => $queue_name, body => 'max$count'}); $stomp->disconnect();
Subject: ActiveMQNative.java
Download ActiveMQNative.java
application/octet-stream 2.5k

Message body not shown because it is not plain text.

Subject: perldashv.output.txt
Summary of my perl5 (revision 5 version 8 subversion 8) configuration: Platform: osname=solaris, osvers=2.8, archname=i86pc-solaris-thread-multi uname='sunos thor 5.8 generic_117351-46 i86pc i386 i86pc' config_args='' hint=recommended, useposix=true, d_sigaction=define usethreads=define use5005threads=undef useithreads=define usemultiplicity=define useperlio=define d_sfio=undef uselargefiles=define usesocks=undef use64bitint=undef use64bitall=undef uselongdouble=undef usemymalloc=n, bincompat5005=undef Compiler: cc='cc', ccflags ='-D_REENTRANT -xO3 -xarch=386 -xspace -xildoff -I/opt/csw/bdb44/include -I/opt/csw/include -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64', optimize='-xO3 -xarch=386 -xspace -xildoff', cppflags='-D_REENTRANT -xO3 -xarch=386 -xspace -xildoff -I/opt/csw/bdb44/include -I/opt/csw/include' ccversion='Sun C 5.8 Patch 121016-05 2007/01/10', gccversion='', gccosandvers='' intsize=4, longsize=4, ptrsize=4, doublesize=8, byteorder=1234 d_longlong=define, longlongsize=8, d_longdbl=define, longdblsize=12 ivtype='long', ivsize=4, nvtype='double', nvsize=8, Off_t='off_t', lseeksize=8 alignbytes=4, prototype=define Linker and Libraries: ld='cc', ldflags ='-xarch=386 -L/opt/csw/bdb44/lib -L/opt/csw/lib -L/usr/lib -L/usr/ccs/lib -L/opt/SUNWspro/prod/lib -L/lib' libpth=/usr/lib /usr/ccs/lib /opt/SUNWspro/prod/lib /lib /opt/csw/lib libs=-lsocket -lnsl -lgdbm -ldb-4.4 -ldl -lm -lpthread -lc perllibs=-lsocket -lnsl -ldb-4.4 -ldl -lm -lpthread -lc libc=/lib/libc.so, so=so, useshrplib=true, libperl=libperl.so.5.8.8 gnulibc_version='' Dynamic Linking: dlsrc=dl_dlopen.xs, dlext=so, d_dlsymun=undef, ccdlflags='-R /opt/csw/lib' cccdlflags='-KPIC', lddlflags='-G -L/opt/csw/bdb44/lib -L/opt/csw/lib -L/usr/lib -L/usr/ccs/lib -L/opt/SUNWspro/prod/lib -L/lib' Characteristics of this binary (from libperl): Compile-time options: MULTIPLICITY PERL_IMPLICIT_CONTEXT PERL_MALLOC_WRAP USE_ITHREADS USE_LARGE_FILES USE_PERLIO USE_REENTRANT_API USE_SITECUSTOMIZE Built under solaris Compiled at Mar 16 2007 11:31:22 @INC: /opt/csw/lib/perl/5.8.8 /opt/csw/share/perl/5.8.8 /opt/csw/lib/perl/site_perl /opt/csw/share/perl/site_perl /opt/csw/share/perl/site_perl /opt/csw/lib/perl/csw /opt/csw/share/perl/csw /opt/csw/share/perl/csw .
On Wed Oct 17 23:03:23 2007, lloy0076@adam.com.au wrote: Show quoted text
> SYNOPOSIS > > The POE::Component::Client::Stomp appears to drop messages into the > ether; specifically even though the message queue server (ActiveMQ) > notes the message and the sender's log indicates the message is sent, > the message appears not to have been received. > > PLATFORM > > This is perl, v5.8.8 built for i86pc-solaris-thread-multi > Solaris Community Express, buil 70 > (Note: Perl is built by http://www.blastwave.org/) > Apache ActiveMQ 4.1 > > THE ISSUE > > I send a number of simple text messages of the format, "1", "2" through > to "#n" to a queue "activmeq/perl". What will happen, though, is that > some messages seem never to be acknowledged or received by the Stomp > lisener. > > The algorithm of my tests goes: > > # Record the messages sent > # Then check to make sure only one is sent (i.e. no duplicates) > # Then check if any are missing > > It's this latter that fails, I get output like: > > Max: 250 > > 173 does not exist! > 177 does not exist! > 236 does not exist! > Errors: 3 > > Doing some diagnostics, I decided to write a client using Net::Stomp and > bypassing POE. With this Net::Stomp client I could send 25 000 messages > without a message being missed. I did this because I wanted to remove > POE from the equation (as it's obviously a lot more complex a system > than plain Net::Stomp). > > My question is: > > (assumption: it is POE that is somehow missing messages) > (basis: Net::Stomp copes with Java sending messages as fast as it can, > hence ActiveMQ or Java don't appear to be the cause) > > * Why would POE miss messages? > * In what circumstances might POE "silently" just lose this data? > > I adjusted the Java program to "Thread.sleep(100)" (sleep 100 > milliseconds) between sending messages and no messages were lost. Then > to "Thread.sleep(5)" seconds and it seems that this module only lost 3 > messages (as opposed to more than 50). > > Whilst it would be unusual for it to be Java's problem, I tried again > with a Perl stomp client and got similar results: > > * Messages would go missing > * Or the final messages wouldn't be seen by the POE stomp client > > I'm happy to do more tests but I'm not quite sure what tests to perform > from here. > > DSL
Unable to duplicate this problem nor the previous one. My test environment is Suse 10-sp1 on a HP 4 core blade, a dual 500mhz Celeron processor system running Debian unstable and several multiple cpus AIX 5.2 systems. All are running POE::Component::MessageQueue, Perl 5.8.x, POE .09999 and 0.04 of this library. I don't not have access to Solaris nor do I have ActiveMQ installed. Testing with the supplied code does not produce any errors. Even thou I noticed that if the sending code didn't send a "clear" message the results from consecutive runs would be erroneous, i.e. the internal hash was not cleared between runs. Testing with the dump_queue.pl and feed_queue.pl from the examples directory and the supplied mq.pl from POE::Component::MessageQueue also didn't produce any errors. The testing with dump_queue.pl, feed_queue.pl and mq.pl consisted of running various number of messages thru mq.pl and counting the number of messages passed. I started with 100, proceeded to 1000 and finished with 10,000. Since by default dump_queue.pl makes one line per received message, the log files were scanned with wc to count the number of lines. Another helpful feature is that mq.pl starts it's message id's from 0, the same as feed_queue.pl. No duplicates nor dropped messages were found, and the tests completed as expected, even thou it took several minutes to finish the 10,000 message run on the dual Celeron system.. I can not comment on how POE handles task switching nor how it stores internal data. Those questions are best asked on the POE support forums.