Subject: | Message Corruption/Loss |
I have found I get a lot of corrupted messages and it looks like a race
condition where a message partially overwrites a prior message that was
sent a few hundred microseconds earlier.
I have attached a new test case/torture test demonstrating this
behaviour. I have found that this only occurs when a number of messages
are sent in rapid succession - and introducing a delay in the sender
does reduce the ocurrence of the problem suggesting the race condition.
This test script sets up a publisher and a subscriber, the publisher
then sends 1000 messages (the content being 0-999). The subscriber
should receive these in order, or at the very least receive them all.
I don't think I've stuffed up the test... always possible ;)
The final message should be 'end'.
An example failed test:
[root@bjbld02 t]# perl 999_messagecorruptions.t |less
# Failed test 'Error -- messages not received in sequence or corrupt'
# at /usr/lib/perl5/5.8.8/Test/More.pm line 372.
# got: 'end'
# expected: '859'
# Failed test 'Error -- messages not received in sequence or corrupt'
# at /usr/lib/perl5/5.8.8/Test/More.pm line 372.
# got: 'end'
# expected: '860'
# Failed test 'Error -- messages not received in sequence or corrupt'
# at /usr/lib/perl5/5.8.8/Test/More.pm line 372.
# got: 'end'
# expected: '861'
# Failed test 'Error -- messages not received in sequence or corrupt'
# at /usr/lib/perl5/5.8.8/Test/More.pm line 372.
# got: 'end'
# expected: '862'
# Failed test 'Error -- messages not received in sequence or corrupt'
# at /usr/lib/perl5/5.8.8/Test/More.pm line 372.
# got: 'end'
# expected: '863'
# Failed test 'Error -- messages not received in sequence or corrupt'
# at /usr/lib/perl5/5.8.8/Test/More.pm line 372.
# got: 'end'
# expected: '864'
# Failed test 'Error -- messages not received in sequence or corrupt'
# at /usr/lib/perl5/5.8.8/Test/More.pm line 372.
# got: 'end'
# expected: '865'
Subject: | 999_messagecorruptions.t |
use strict;
use Test::More;
use Test::Requires qw( Test::TCP AnyEvent );
use ZeroMQ qw(ZMQ_PUB ZMQ_SUB ZMQ_SNDMORE);
use Time::HiRes qw(usleep);
BEGIN {
use_ok "ZeroMQ";
use_ok "ZeroMQ::Constants", ":all";
}
my $port = empty_port();
test_tcp(
client => sub {
my $port = shift;
my $ctxt = ZeroMQ::Context->new();
my $sock = $ctxt->socket(ZMQ_SUB);
sleep(1);
$sock->connect("tcp://127.0.0.1:$port" );
$sock->setsockopt(ZMQ_SUBSCRIBE, '');
my $data = join '.', time(), $$, rand, {};
my $cnt=0;
while($cnt<1000) { # expect to receive numbers 1..1000...
my $msg = undef;
$msg = $sock->recv();
is($msg->data, $cnt++, "Error -- messages not received in sequence or corrupt");
}
note "OK";
},
server => sub {
my $port = shift;
my $ctxt = ZeroMQ::Context->new();
my $sock = $ctxt->socket(ZMQ_PUB);
note "Server Binding to port $port\n";
$sock->bind("tcp://127.0.0.1:$port");
note "Waiting on client to bind...";
sleep 5;
note "Server sending ordered data... (numbers 1..1000)";
for (my $c=0; $c<1000; $c++) {
$sock->send($c, ZMQ_SNDMORE);
#usleep(1000);
}
$sock->send("end"); # end of data stream...
note "OK";
}
);
done_testing;