Skip Menu |

This queue is for tickets about the IPC-PubSub CPAN distribution.

Report information
The Basics
Id: 54168
Status: open
Priority: 0/
Queue: IPC-PubSub

People
Owner: Nobody in particular
Requestors: sprout [...] cpan.org
Cc:
AdminCc:

Bug Information
Severity: (no value)
Broken in: (no value)
Fixed in: (no value)



Subject: DBM_Deep concurrency problems
Date: Sun, 31 Jan 2010 12:29:44 -0800
To: cpan [...] audreyt.org, bug-IPC-PubSub [...] rt.cpan.org
From: Father Chrysostomos <sprout [...] cpan.org>
Although DBM::Deep does locking on all access, it cannot tell when multiple accesses are supposed to be atomic. This happens with % { ... }. (See http://github.com/robkinyon/dbm-deep/commit/065e75f for a longer explanation.) The DBM_Deep back end needs to lock the database manually, to avoid concurrency problems. See the report at <https://rt.cpan.org/Ticket/Display.html?id=42129 Show quoted text
>. The attached patch fixes this.

Message body is not shown because sender requested not to inline it.

Subject: Re: [rt.cpan.org #54168] DBM_Deep concurrency problems
Date: Tue, 2 Feb 2010 09:52:54 -0800
To: Father Chrysostomos via RT <bug-IPC-PubSub [...] rt.cpan.org>
From: Jesse Vincent <jesse [...] fsck.com>
I just tried to apply this patch and it's causing fails on Jifty::DBI. Did it pass with all other backends on your test system? -Jesse PERL_DL_NONLAZY=1 /usr/bin/perl "-MExtUtils::Command::MM" "-e" "test_harness(0, 'inc', 'blib/lib', 'blib/arch')" t/*.t t/basic.t ...... # Testing backend JiftyDBI t/basic.t ...... 13/45 # Testing backend DBM_Deep # Testing backend PlainHash t/basic.t ...... ok t/multi.t ...... 1/2 # Failed test 'concurrent access with JiftyDBI' # at t/multi.t line 69. # got: '' # expected: 'glat' Can't call method "SUPER::CLONE_SKIP" without a package or object reference at (eval 36) line 8. # Looks like you planned 2 tests but ran 1. # Looks like you failed 1 test of 1 run. # Looks like your test exited with 2 just after 1. On Sun 31.Jan'10 at 15:30:57 -0500, Father Chrysostomos via RT wrote: Show quoted text
> Sun Jan 31 15:30:55 2010: Request 54168 was acted upon. > Transaction: Ticket created by sprout@cpan.org > Queue: IPC-PubSub > Subject: DBM_Deep concurrency problems > Broken in: (no value) > Severity: (no value) > Owner: Nobody > Requestors: sprout@cpan.org > Status: new > Ticket <URL: https://rt.cpan.org/Ticket/Display.html?id=54168 > > > > Although DBM::Deep does locking on all access, it cannot tell when > multiple accesses are supposed to be atomic. This happens with % > { ... }. (See http://github.com/robkinyon/dbm-deep/commit/065e75f for > a longer explanation.) The DBM_Deep back end needs to lock the > database manually, to avoid concurrency problems. See the report at <https://rt.cpan.org/Ticket/Display.html?id=42129
> >. The attached patch fixes this.
> >
Show quoted text
> diff -Nurp IPC-PubSub-0.29-zGZ1X8/MANIFEST IPC-PubSub-0.29-zGZ1X8 copy/MANIFEST > --- IPC-PubSub-0.29-zGZ1X8/MANIFEST 2008-12-12 22:17:56.000000000 -0800 > +++ IPC-PubSub-0.29-zGZ1X8 copy/MANIFEST 2010-01-30 12:05:42.000000000 -0800 > @@ -25,4 +25,5 @@ META.yml > README > SIGNATURE Public-key signature (added by MakeMaker) > t/basic.t > +t/multi.t > t/publisher.t > diff -Nurp IPC-PubSub-0.29-zGZ1X8/lib/IPC/PubSub/Cache/DBM_Deep.pm IPC-PubSub-0.29-zGZ1X8 copy/lib/IPC/PubSub/Cache/DBM_Deep.pm > --- IPC-PubSub-0.29-zGZ1X8/lib/IPC/PubSub/Cache/DBM_Deep.pm 2008-08-25 17:05:05.000000000 -0700 > +++ IPC-PubSub-0.29-zGZ1X8 copy/lib/IPC/PubSub/Cache/DBM_Deep.pm 2010-01-30 12:00:55.000000000 -0800 > @@ -34,21 +34,28 @@ sub store { > > sub publisher_indices { > my ($self, $chan) = @_; > - return { %{ $$self->get("pubs:$chan") || {} } }; > + $$self->lock_exclusive; > + my $ret = { %{ $$self->get("pubs:$chan") || {} } }; > + $$self->unlock; > + $ret; > } > > sub add_publisher { > my ($self, $chan, $pub) = @_; > + $$self->lock_exclusive; > my $pubs = { %{ $$self->get("pubs:$chan") || {} } }; > $pubs->{$pub} = 0; > $$self->put("pubs:$chan", $pubs); > + $$self->unlock; > } > > sub remove_publisher { > my ($self, $chan, $pub) = @_; > + $$self->lock_exclusive; > my $pubs = { %{ $$self->get("pubs:$chan") || {} } }; > delete $pubs->{$pub}; > $$self->put("pubs:$chan", $pubs); > + $$self->unlock; > } > > sub get_index { > @@ -58,9 +65,11 @@ sub get_index { > > sub set_index { > my ($self, $chan, $pub, $idx) = @_; > + $$self->lock_exclusive; > my $pubs = { %{ $$self->get("pubs:$chan") || {} } }; > $pubs->{$pub} = $idx; > $$self->put("pubs:$chan", $pubs); > + $$self->unlock; > } > > 1; > diff -Nurp IPC-PubSub-0.29-zGZ1X8/t/multi.t IPC-PubSub-0.29-zGZ1X8 copy/t/multi.t > --- IPC-PubSub-0.29-zGZ1X8/t/multi.t 1969-12-31 16:00:00.000000000 -0800 > +++ IPC-PubSub-0.29-zGZ1X8 copy/t/multi.t 2010-01-30 11:58:23.000000000 -0800 > @@ -0,0 +1,76 @@ > +use strict; > +use Config; > +use warnings; > +use Test::More; > +use Time::HiRes <sleep time>; > +use IPC::PubSub; > +use IO::Socket::INET; > +use File::Temp ':POSIX'; > + > +BEGIN{ > + if($Config{useithreads}) { > + require threads; > + import threads; > + } > + elsif($Config{d_fork}) { > + eval ' > + $SIG{CHLD} = "IGNORE"; > + sub async(&){ > + my $pid = fork; > + no warnings "exiting"; > + defined $pid or goto SKIP; > + $pid or &{+shift}, exit; > + _: > + } > + '; > + } > + else { plan skip_all => "No threads or fork" } > +} > + > +my @backends; > + > +unshift @backends, 'DBM_Deep' if eval { require DBM::Deep }; > +unshift @backends, 'JiftyDBI' if eval { require Jifty::DBI }; > +unshift @backends, 'Memcached' if eval { require Cache::Memcached } and IO::Socket::INET->new('127.0.0.1:11211'); > + > +@backends or plan skip_all => "No back ends available"; > + > +plan tests => scalar @backends; > + > +my $tmp = tmpnam(); > +END { > + unlink $tmp > + if defined $tmp > +} > + > +my %init_args = ( > + DBM_Deep => [ $tmp ], > + JiftyDBI => [ db_init => 1 ], > + Memcached => [ rand() . $$ ], > +); > + > +for my $backend (@backends) { > + my $thr = async { > + my $bus = IPC::PubSub->new( $backend, @{ $init_args{$backend} } ); > + undef $tmp; # stop the END block from being naughty > + sleep .5; > + my $pub = $bus->new_publisher( "user-client" ); > + $pub->msg("glat"); > + }; > + > + my $bus = IPC::PubSub->new( $backend, @{ $init_args{$backend} } ); > + my $sub = $bus->new_subscriber('user-client'); > + my $start_time = time; > + my @msg; > + while (time-$start_time < 1) { > + push @msg, $sub->get and last; > + } > + > + is "@msg", "glat", "concurrent access with $backend"; > + > + $thr and $thr->join; > + next; > + > + # Fork failures bring us here: > + SKIP: { skip "Fork failed", 1 } > +}
Subject: Re: [rt.cpan.org #54168] DBM_Deep concurrency problems
Date: Sun, 14 Feb 2010 14:47:07 -0800
To: bug-IPC-PubSub [...] rt.cpan.org
From: Father Chrysostomos <sprout [...] cpan.org>
On Feb 2, 2010, at 9:53 AM, Jesse via RT wrote: Show quoted text
> <URL: https://rt.cpan.org/Ticket/Display.html?id=54168 > > > I just tried to apply this patch and it's causing fails on Jifty::DBI. > Did it pass with all other backends on your test system?
No, I didn’t test with Jifty::DBI at first, because it didn’t install cleanly. When I tried fixing the script to work with it, I found that neither Scalar::Defer nor DBD::SQLite is thread-safe. And then I found that it consistently fails with DBM_Deep and Memcached if all CPUs are busy (try running a copy of ‘perl -e0while-1’ for each core). So I give up. I have no idea how to write a test case for this, although my DBM_Deep.pm patch is correct and does fix a bona fide bug. Show quoted text
> > -Jesse > > PERL_DL_NONLAZY=1 /usr/bin/perl "-MExtUtils::Command::MM" "-e" > "test_harness(0, 'inc', 'blib/lib', 'blib/arch')" t/*.t > t/basic.t ...... # Testing backend JiftyDBI > t/basic.t ...... 13/45 # Testing backend DBM_Deep > # Testing backend PlainHash > t/basic.t ...... ok > t/multi.t ...... 1/2 > # Failed test 'concurrent access with JiftyDBI' > # at t/multi.t line 69. > # got: '' > # expected: 'glat' > Can't call method "SUPER::CLONE_SKIP" without a package or object > reference at (eval 36) line 8. > # Looks like you planned 2 tests but ran 1. > # Looks like you failed 1 test of 1 run. > # Looks like your test exited with 2 just after 1. > > > > > On Sun 31.Jan'10 at 15:30:57 -0500, Father Chrysostomos via RT wrote:
>> Sun Jan 31 15:30:55 2010: Request 54168 was acted upon. >> Transaction: Ticket created by sprout@cpan.org >> Queue: IPC-PubSub >> Subject: DBM_Deep concurrency problems >> Broken in: (no value) >> Severity: (no value) >> Owner: Nobody >> Requestors: sprout@cpan.org >> Status: new >> Ticket <URL: https://rt.cpan.org/Ticket/Display.html?id=54168 > >> >> >> Although DBM::Deep does locking on all access, it cannot tell when >> multiple accesses are supposed to be atomic. This happens with % >> { ... }. (See http://github.com/robkinyon/dbm-deep/commit/065e75f for >> a longer explanation.) The DBM_Deep back end needs to lock the >> database manually, to avoid concurrency problems. See the report at >> <https://rt.cpan.org/Ticket/Display.html?id=42129
>>> . The attached patch fixes this.
>> >>
>
>> diff -Nurp IPC-PubSub-0.29-zGZ1X8/MANIFEST IPC-PubSub-0.29-zGZ1X8 >> copy/MANIFEST >> --- IPC-PubSub-0.29-zGZ1X8/MANIFEST 2008-12-12 22:17:56.000000000 >> -0800 >> +++ IPC-PubSub-0.29-zGZ1X8 copy/MANIFEST 2010-01-30 >> 12:05:42.000000000 -0800 >> @@ -25,4 +25,5 @@ META.yml >> README >> SIGNATURE Public-key signature (added by MakeMaker) >> t/basic.t >> +t/multi.t >> t/publisher.t >> diff -Nurp IPC-PubSub-0.29-zGZ1X8/lib/IPC/PubSub/Cache/DBM_Deep.pm >> IPC-PubSub-0.29-zGZ1X8 copy/lib/IPC/PubSub/Cache/DBM_Deep.pm >> --- IPC-PubSub-0.29-zGZ1X8/lib/IPC/PubSub/Cache/DBM_Deep.pm >> 2008-08-25 17:05:05.000000000 -0700 >> +++ IPC-PubSub-0.29-zGZ1X8 copy/lib/IPC/PubSub/Cache/DBM_Deep.pm >> 2010-01-30 12:00:55.000000000 -0800 >> @@ -34,21 +34,28 @@ sub store { >> >> sub publisher_indices { >> my ($self, $chan) = @_; >> - return { %{ $$self->get("pubs:$chan") || {} } }; >> + $$self->lock_exclusive; >> + my $ret = { %{ $$self->get("pubs:$chan") || {} } }; >> + $$self->unlock; >> + $ret; >> } >> >> sub add_publisher { >> my ($self, $chan, $pub) = @_; >> + $$self->lock_exclusive; >> my $pubs = { %{ $$self->get("pubs:$chan") || {} } }; >> $pubs->{$pub} = 0; >> $$self->put("pubs:$chan", $pubs); >> + $$self->unlock; >> } >> >> sub remove_publisher { >> my ($self, $chan, $pub) = @_; >> + $$self->lock_exclusive; >> my $pubs = { %{ $$self->get("pubs:$chan") || {} } }; >> delete $pubs->{$pub}; >> $$self->put("pubs:$chan", $pubs); >> + $$self->unlock; >> } >> >> sub get_index { >> @@ -58,9 +65,11 @@ sub get_index { >> >> sub set_index { >> my ($self, $chan, $pub, $idx) = @_; >> + $$self->lock_exclusive; >> my $pubs = { %{ $$self->get("pubs:$chan") || {} } }; >> $pubs->{$pub} = $idx; >> $$self->put("pubs:$chan", $pubs); >> + $$self->unlock; >> } >> >> 1; >> diff -Nurp IPC-PubSub-0.29-zGZ1X8/t/multi.t IPC-PubSub-0.29-zGZ1X8 >> copy/t/multi.t >> --- IPC-PubSub-0.29-zGZ1X8/t/multi.t 1969-12-31 16:00:00.000000000 >> -0800 >> +++ IPC-PubSub-0.29-zGZ1X8 copy/t/multi.t 2010-01-30 >> 11:58:23.000000000 -0800 >> @@ -0,0 +1,76 @@ >> +use strict; >> +use Config; >> +use warnings; >> +use Test::More; >> +use Time::HiRes <sleep time>; >> +use IPC::PubSub; >> +use IO::Socket::INET; >> +use File::Temp ':POSIX'; >> + >> +BEGIN{ >> + if($Config{useithreads}) { >> + require threads; >> + import threads; >> + } >> + elsif($Config{d_fork}) { >> + eval ' >> + $SIG{CHLD} = "IGNORE"; >> + sub async(&){ >> + my $pid = fork; >> + no warnings "exiting"; >> + defined $pid or goto SKIP; >> + $pid or &{+shift}, exit; >> + _: >> + } >> + '; >> + } >> + else { plan skip_all => "No threads or fork" } >> +} >> + >> +my @backends; >> + >> +unshift @backends, 'DBM_Deep' if eval { require DBM::Deep }; >> +unshift @backends, 'JiftyDBI' if eval { require Jifty::DBI }; >> +unshift @backends, 'Memcached' if eval { require >> Cache::Memcached } and IO::Socket::INET->new('127.0.0.1:11211'); >> + >> +@backends or plan skip_all => "No back ends available"; >> + >> +plan tests => scalar @backends; >> + >> +my $tmp = tmpnam(); >> +END { >> + unlink $tmp >> + if defined $tmp >> +} >> + >> +my %init_args = ( >> + DBM_Deep => [ $tmp ], >> + JiftyDBI => [ db_init => 1 ], >> + Memcached => [ rand() . $$ ], >> +); >> + >> +for my $backend (@backends) { >> + my $thr = async { >> + my $bus = IPC::PubSub->new( $backend, @{ $init_args >> {$backend} } ); >> + undef $tmp; # stop the END block from being naughty >> + sleep .5; >> + my $pub = $bus->new_publisher( "user-client" ); >> + $pub->msg("glat"); >> + }; >> + >> + my $bus = IPC::PubSub->new( $backend, @{ $init_args >> {$backend} } ); >> + my $sub = $bus->new_subscriber('user-client'); >> + my $start_time = time; >> + my @msg; >> + while (time-$start_time < 1) { >> + push @msg, $sub->get and last; >> + } >> + >> + is "@msg", "glat", "concurrent access with $backend"; >> + >> + $thr and $thr->join; >> + next; >> + >> + # Fork failures bring us here: >> + SKIP: { skip "Fork failed", 1 } >> +}
> >