Skip Menu |

This queue is for tickets about the ZeroMQ CPAN distribution.

Report information
The Basics
Id: 64836
Status: resolved
Priority: 0/
Queue: ZeroMQ

People
Owner: Nobody in particular
Requestors: jason [...] ball.net
Cc:
AdminCc:

Bug Information
Severity: (no value)
Broken in: 0.05
Fixed in: (no value)



Subject: Message Corruption/Loss
I have found I get a lot of corrupted messages and it looks like a race condition where a message partially overwrites a prior message that was sent a few hundred microseconds earlier. I have attached a new test case/torture test demonstrating this behaviour. I have found that this only occurs when a number of messages are sent in rapid succession - and introducing a delay in the sender does reduce the ocurrence of the problem suggesting the race condition. This test script sets up a publisher and a subscriber, the publisher then sends 1000 messages (the content being 0-999). The subscriber should receive these in order, or at the very least receive them all. I don't think I've stuffed up the test... always possible ;) The final message should be 'end'. An example failed test: [root@bjbld02 t]# perl 999_messagecorruptions.t |less # Failed test 'Error -- messages not received in sequence or corrupt' # at /usr/lib/perl5/5.8.8/Test/More.pm line 372. # got: 'end' # expected: '859' # Failed test 'Error -- messages not received in sequence or corrupt' # at /usr/lib/perl5/5.8.8/Test/More.pm line 372. # got: 'end' # expected: '860' # Failed test 'Error -- messages not received in sequence or corrupt' # at /usr/lib/perl5/5.8.8/Test/More.pm line 372. # got: 'end' # expected: '861' # Failed test 'Error -- messages not received in sequence or corrupt' # at /usr/lib/perl5/5.8.8/Test/More.pm line 372. # got: 'end' # expected: '862' # Failed test 'Error -- messages not received in sequence or corrupt' # at /usr/lib/perl5/5.8.8/Test/More.pm line 372. # got: 'end' # expected: '863' # Failed test 'Error -- messages not received in sequence or corrupt' # at /usr/lib/perl5/5.8.8/Test/More.pm line 372. # got: 'end' # expected: '864' # Failed test 'Error -- messages not received in sequence or corrupt' # at /usr/lib/perl5/5.8.8/Test/More.pm line 372. # got: 'end' # expected: '865'
Subject: 999_messagecorruptions.t
use strict; use Test::More; use Test::Requires qw( Test::TCP AnyEvent ); use ZeroMQ qw(ZMQ_PUB ZMQ_SUB ZMQ_SNDMORE); use Time::HiRes qw(usleep); BEGIN { use_ok "ZeroMQ"; use_ok "ZeroMQ::Constants", ":all"; } my $port = empty_port(); test_tcp( client => sub { my $port = shift; my $ctxt = ZeroMQ::Context->new(); my $sock = $ctxt->socket(ZMQ_SUB); sleep(1); $sock->connect("tcp://127.0.0.1:$port" ); $sock->setsockopt(ZMQ_SUBSCRIBE, ''); my $data = join '.', time(), $$, rand, {}; my $cnt=0; while($cnt<1000) { # expect to receive numbers 1..1000... my $msg = undef; $msg = $sock->recv(); is($msg->data, $cnt++, "Error -- messages not received in sequence or corrupt"); } note "OK"; }, server => sub { my $port = shift; my $ctxt = ZeroMQ::Context->new(); my $sock = $ctxt->socket(ZMQ_PUB); note "Server Binding to port $port\n"; $sock->bind("tcp://127.0.0.1:$port"); note "Waiting on client to bind..."; sleep 5; note "Server sending ordered data... (numbers 1..1000)"; for (my $c=0; $c<1000; $c++) { $sock->send($c, ZMQ_SNDMORE); #usleep(1000); } $sock->send("end"); # end of data stream... note "OK"; } ); done_testing;
Hi, thanks for the report. I haven't looked at your report closely yet, but do you think this is a Perl binding issue? Have you tried with, say, just the C binding? Just curious -- if it's zeromq itself, I'd save myself a ton of debug time ;) --d
Subject: Re: [rt.cpan.org #64836] Message Corruption/Loss
Date: Mon, 17 Jan 2011 18:50:46 +1100
To: bug-ZeroMQ [...] rt.cpan.org
From: Jason Ball <jason [...] ball.net>
I'll code a version using the raw sockets shortly. I was going to do so tonight to see if that solves the problem ;) Cheers J. On Mon, Jan 17, 2011 at 6:01 PM, Daisuke Maki via RT <bug-ZeroMQ@rt.cpan.org Show quoted text
> wrote:
Show quoted text
> <URL: https://rt.cpan.org/Ticket/Display.html?id=64836 > > > Hi, > > thanks for the report. > I haven't looked at your report closely yet, but do you think this is a > Perl binding issue? Have you > tried with, say, just the C binding? > > Just curious -- if it's zeromq itself, I'd save myself a ton of debug time > ;) > > --d >
-- -- Illegitimi non carborundum
From: jason [...] ball.net
I've coded a test using the ZeroMQ::Raw API and sadly the same behaviour is present. I'll look at a C version shortly to determine if it's ZeroMQ or the Perl binding.
Subject: 999_messagecorruptions_lowlevel.t
use strict; use Test::More; use Test::Requires qw( Test::TCP ); use Data::Dumper; BEGIN { use_ok "ZeroMQ::Raw"; use_ok "ZeroMQ::Constants", ":all"; } my $port = empty_port(); test_tcp( client => sub { my $port = shift; my $ctxt = zmq_init(); my $sock = zmq_socket($ctxt, ZMQ_SUB); sleep(1); zmq_connect($sock,"tcp://127.0.0.1:$port" ); zmq_setsockopt($sock, ZMQ_SUBSCRIBE, ''); my $cnt=0; while($cnt<1000) { # expect to receive numbers 1..1000... my $rawmsg = undef; $rawmsg = zmq_recv($sock); my $msg = zmq_msg_data($rawmsg); is($msg, $cnt++, "Error -- messages not received in sequence or corrupt"); } note "OK"; }, server => sub { my $port = shift; my $ctxt = zmq_init(); my $sock = zmq_socket($ctxt, ZMQ_PUB); note "Server Binding to port $port\n"; zmq_bind($sock, "tcp://127.0.0.1:$port"); note "Waiting on client to bind..."; sleep 5; note "Server sending ordered data... (numbers 1..1000)"; for (my $c=0; $c<1000; $c++) { my $msg = zmq_msg_init_data($c); zmq_send($sock, $msg); zmq_msg_close($msg); } note "OK"; } ); done_testing;
From: jason [...] ball.net
This problem does not occur at the 'C' level, so it's in the binding. I suspect it is a sender side issue (zmq_send).
Subject: torture_server.c
#include <zmq.h> #include <stdio.h> #include <unistd.h> #include <string.h> #include <stdlib.h> int main() { void *context = zmq_init(1); void *pub = zmq_socket(context, ZMQ_PUB); zmq_bind(pub, "tcp://*:14567"); int c; printf("Server: Waiting for client to connect\n"); sleep(10); printf("Server: Sending Messages\n"); for(c=0; c<10000; c++) { char buffer[10]; zmq_msg_t msg; snprintf((char *)&buffer, 10, "%d", c); zmq_msg_init_size(&msg, sizeof(buffer)); memcpy((void *) zmq_msg_data(&msg), buffer, 10); zmq_send(pub, &msg, 0); zmq_msg_close(&msg); } printf("Server: Waiting to exit\n"); sleep(20); printf("Server: Exiting\n"); zmq_term(context); }
Subject: torture_client.c
#include <zmq.h> #include <stdio.h> #include <unistd.h> #include <string.h> #include <stdlib.h> int main() { void *context = zmq_init(1); void *sub = zmq_socket(context, ZMQ_SUB); printf("Connecting to Server\n"); zmq_connect(sub, "tcp://*:14567"); zmq_setsockopt(sub, ZMQ_SUBSCRIBE, "", 0); int c; printf("Waiting for messages\n"); for(c=0; c<10000; c++) { char buffer[10]; zmq_msg_t msg; zmq_msg_init(&msg); zmq_recv(sub, &msg, 0); printf("Received Message\n"); int value = atoi((char *)zmq_msg_data(&msg)); if(c != value) { printf("ERROR: expected %d, got %d\n", c, value); } else { printf("OK: Got %d\n", value); } zmq_msg_close(&msg); } zmq_term(context); }
Thank you, you rock! The bad news is I'm going to be a bit tied for the next couple of days :/ I'll get back to this as soon as I can.
From: jason [...] ball.net
On Mon Jan 17 04:25:00 2011, DMAKI wrote: Show quoted text
> Thank you, you rock! > > The bad news is I'm going to be a bit tied for the next couple of days :/ > I'll get back to this as soon as I can.
I'll see if I can patch it in the meantime...
From: jason [...] ball.net
Definitely a race condition. Adding: threads->yield(); to the test immediately following the 'zmq_send... zmq_msg_close' significantly reduces the ocurrences of the problem. I'm not sure (at this time) how to debug this one :/ J.
Thanks for the work! While I'm agree this smells fishy, I'm not so sure about the threading problem bit. As far as I can see, there's no threading involved in my part of the code :/ Show quoted text
> Definitely a race condition. Adding: > > threads->yield(); >
Where are you adding this threads->yield() ?
From: jason [...] ball.net
calling it a day. I have attached a patch with the tests developed so far -- once I work out the basics for github I'll submit updates there to make life simpler, I get consistent results with these tests: t/999_messagecorruptions.t ........... Failed 100/102 subtests t/999_messagecorruptions_lowlevel.t .. ok The tests _should_ be identical. Note that increasing the number of messages in these tests will induce errors in the low level code -- nowhere near as badly as the higher level binding. Removing the 'threads->yield' guarantees a 100% failure rate in both tests -- I'm afraid I need to spend more time reviewing the code to understand why this would make any difference at all.
Subject: ZeroMQ-Perl-message-curruption-tests.patch
commit ded0374098e3014b2fe73e9c7cfbec4e618e6243 Author: Jason Ball <jason.ball@tibra.com> Date: Mon Jan 17 22:55:14 2011 +1100 Test cases for low level corruptions diff --git a/t/999_messagecorruptions.t b/t/999_messagecorruptions.t new file mode 100644 index 0000000..5a00340 --- /dev/null +++ b/t/999_messagecorruptions.t @@ -0,0 +1,57 @@ +use strict; +use Test::More; +use Test::Requires qw( Test::TCP AnyEvent ); +use ZeroMQ qw(ZMQ_PUB ZMQ_SUB ZMQ_SNDMORE); +use Time::HiRes qw(usleep); + +BEGIN { + use_ok "ZeroMQ"; + use_ok "ZeroMQ::Constants", ":all"; +} + + +my $port = empty_port(); + +test_tcp( + client => sub { + my $port = shift; + my $ctxt = ZeroMQ::Context->new(); + my $sock = $ctxt->socket(ZMQ_SUB); + + sleep(1); + $sock->connect("tcp://127.0.0.1:$port" ); + $sock->setsockopt(ZMQ_SUBSCRIBE, ''); + my $data = join '.', time(), $$, rand, {}; + + my $cnt=0; + + while($cnt<1000) { # expect to receive numbers 1..1000... + my $msg = undef; + $msg = $sock->recv(); + is($msg->data, $cnt++, "Error -- messages not received in sequence or corrupt"); + } + + note "OK"; + }, + server => sub { + my $port = shift; + my $ctxt = ZeroMQ::Context->new(); + my $sock = $ctxt->socket(ZMQ_PUB); + + note "Server Binding to port $port\n"; + $sock->bind("tcp://127.0.0.1:$port"); + + note "Waiting on client to bind..."; + sleep 5; + + note "Server sending ordered data... (numbers 1..1000)"; + for (my $c=0; $c<1000; $c++) { + $sock->send($c, ZMQ_SNDMORE); + #usleep(1000); + } + $sock->send("end"); # end of data stream... + note "OK"; + } +); + +done_testing; diff --git a/t/999_messagecorruptions_lowlevel.t b/t/999_messagecorruptions_lowlevel.t new file mode 100644 index 0000000..caf2bd9 --- /dev/null +++ b/t/999_messagecorruptions_lowlevel.t @@ -0,0 +1,56 @@ +use strict; +use Test::More; +use Test::Requires qw( Test::TCP ); +use Data::Dumper; + +BEGIN { + use_ok "ZeroMQ::Raw"; + use_ok "ZeroMQ::Constants", ":all"; +} + + +my $port = empty_port(); + +test_tcp( + client => sub { + my $port = shift; + my $ctxt = zmq_init(); + my $sock = zmq_socket($ctxt, ZMQ_SUB); + + sleep(1); + zmq_connect($sock,"tcp://127.0.0.1:$port" ); + zmq_setsockopt($sock, ZMQ_SUBSCRIBE, ''); + + my $cnt=0; + + while($cnt<1000) { # expect to receive numbers 1..1000... + my $rawmsg = undef; + $rawmsg = zmq_recv($sock); + my $msg = zmq_msg_data($rawmsg); + is($msg, $cnt++, "Error -- messages not received in sequence or corrupt"); + } + + note "OK"; + }, + server => sub { + my $port = shift; + my $ctxt = zmq_init(); + my $sock = zmq_socket($ctxt, ZMQ_PUB); + + note "Server Binding to port $port\n"; + zmq_bind($sock, "tcp://127.0.0.1:$port"); + + note "Waiting on client to bind..."; + sleep 5; + + note "Server sending ordered data... (numbers 1..1000)"; + for (my $c=0; $c<1000; $c++) { + my $msg = zmq_msg_init_data($c); + zmq_send($sock, $msg); + zmq_msg_close($msg); + } + note "OK"; + } +); + +done_testing;
thanks! I'll check your work -- meanwhile, I think I'm onto something. using zmq_msg_init_size + zmq_msg_data like you're doing in your torture_server.c fixes the problem ... so I'm starting to think that I'm not using the zmq_msg_* API correctly. Will work on it for a couple of more hours. Thank you so much for your work!
Fixed! I was able to pin-point the problem because you provided me with a working C example. Thank you so much. Please checkout the current HEAD, or the tarball from master: http://github.com/lestrrat/ZeroMQ-Perl/tarball/master
From: jason [...] ball.net
Dude! That is awesome news, thanks! Cheers J.
From: jason [...] ball.net
Confirmed as working. Cheers.
Cool, just shipped 0.06. should be on CPAN sonn-ish. Closing ticket :)