I just tried to apply this patch and it's causing fails on Jifty::DBI.
Did it pass with all other backends on your test system?
-Jesse
PERL_DL_NONLAZY=1 /usr/bin/perl "-MExtUtils::Command::MM" "-e"
"test_harness(0, 'inc', 'blib/lib', 'blib/arch')" t/*.t
t/basic.t ...... # Testing backend JiftyDBI
t/basic.t ...... 13/45 # Testing backend DBM_Deep
# Testing backend PlainHash
t/basic.t ...... ok
t/multi.t ...... 1/2
# Failed test 'concurrent access with JiftyDBI'
# at t/multi.t line 69.
# got: ''
# expected: 'glat'
Can't call method "SUPER::CLONE_SKIP" without a package or object
reference at (eval 36) line 8.
# Looks like you planned 2 tests but ran 1.
# Looks like you failed 1 test of 1 run.
# Looks like your test exited with 2 just after 1.
On Sun 31.Jan'10 at 15:30:57 -0500, Father Chrysostomos via RT wrote:
Show quoted text> Sun Jan 31 15:30:55 2010: Request 54168 was acted upon.
> Transaction: Ticket created by sprout@cpan.org
> Queue: IPC-PubSub
> Subject: DBM_Deep concurrency problems
> Broken in: (no value)
> Severity: (no value)
> Owner: Nobody
> Requestors: sprout@cpan.org
> Status: new
> Ticket <URL:
https://rt.cpan.org/Ticket/Display.html?id=54168 >
>
>
> Although DBM::Deep does locking on all access, it cannot tell when
> multiple accesses are supposed to be atomic. This happens with %
> { ... }. (See
http://github.com/robkinyon/dbm-deep/commit/065e75f for
> a longer explanation.) The DBM_Deep back end needs to lock the
> database manually, to avoid concurrency problems. See the report at <
https://rt.cpan.org/Ticket/Display.html?id=42129
> >. The attached patch fixes this.
>
>
Show quoted text> diff -Nurp IPC-PubSub-0.29-zGZ1X8/MANIFEST IPC-PubSub-0.29-zGZ1X8 copy/MANIFEST
> --- IPC-PubSub-0.29-zGZ1X8/MANIFEST 2008-12-12 22:17:56.000000000 -0800
> +++ IPC-PubSub-0.29-zGZ1X8 copy/MANIFEST 2010-01-30 12:05:42.000000000 -0800
> @@ -25,4 +25,5 @@ META.yml
> README
> SIGNATURE Public-key signature (added by MakeMaker)
> t/basic.t
> +t/multi.t
> t/publisher.t
> diff -Nurp IPC-PubSub-0.29-zGZ1X8/lib/IPC/PubSub/Cache/DBM_Deep.pm IPC-PubSub-0.29-zGZ1X8 copy/lib/IPC/PubSub/Cache/DBM_Deep.pm
> --- IPC-PubSub-0.29-zGZ1X8/lib/IPC/PubSub/Cache/DBM_Deep.pm 2008-08-25 17:05:05.000000000 -0700
> +++ IPC-PubSub-0.29-zGZ1X8 copy/lib/IPC/PubSub/Cache/DBM_Deep.pm 2010-01-30 12:00:55.000000000 -0800
> @@ -34,21 +34,28 @@ sub store {
>
> sub publisher_indices {
> my ($self, $chan) = @_;
> - return { %{ $$self->get("pubs:$chan") || {} } };
> + $$self->lock_exclusive;
> + my $ret = { %{ $$self->get("pubs:$chan") || {} } };
> + $$self->unlock;
> + $ret;
> }
>
> sub add_publisher {
> my ($self, $chan, $pub) = @_;
> + $$self->lock_exclusive;
> my $pubs = { %{ $$self->get("pubs:$chan") || {} } };
> $pubs->{$pub} = 0;
> $$self->put("pubs:$chan", $pubs);
> + $$self->unlock;
> }
>
> sub remove_publisher {
> my ($self, $chan, $pub) = @_;
> + $$self->lock_exclusive;
> my $pubs = { %{ $$self->get("pubs:$chan") || {} } };
> delete $pubs->{$pub};
> $$self->put("pubs:$chan", $pubs);
> + $$self->unlock;
> }
>
> sub get_index {
> @@ -58,9 +65,11 @@ sub get_index {
>
> sub set_index {
> my ($self, $chan, $pub, $idx) = @_;
> + $$self->lock_exclusive;
> my $pubs = { %{ $$self->get("pubs:$chan") || {} } };
> $pubs->{$pub} = $idx;
> $$self->put("pubs:$chan", $pubs);
> + $$self->unlock;
> }
>
> 1;
> diff -Nurp IPC-PubSub-0.29-zGZ1X8/t/multi.t IPC-PubSub-0.29-zGZ1X8 copy/t/multi.t
> --- IPC-PubSub-0.29-zGZ1X8/t/multi.t 1969-12-31 16:00:00.000000000 -0800
> +++ IPC-PubSub-0.29-zGZ1X8 copy/t/multi.t 2010-01-30 11:58:23.000000000 -0800
> @@ -0,0 +1,76 @@
> +use strict;
> +use Config;
> +use warnings;
> +use Test::More;
> +use Time::HiRes <sleep time>;
> +use IPC::PubSub;
> +use IO::Socket::INET;
> +use File::Temp ':POSIX';
> +
> +BEGIN{
> + if($Config{useithreads}) {
> + require threads;
> + import threads;
> + }
> + elsif($Config{d_fork}) {
> + eval '
> + $SIG{CHLD} = "IGNORE";
> + sub async(&){
> + my $pid = fork;
> + no warnings "exiting";
> + defined $pid or goto SKIP;
> + $pid or &{+shift}, exit;
> + _:
> + }
> + ';
> + }
> + else { plan skip_all => "No threads or fork" }
> +}
> +
> +my @backends;
> +
> +unshift @backends, 'DBM_Deep' if eval { require DBM::Deep };
> +unshift @backends, 'JiftyDBI' if eval { require Jifty::DBI };
> +unshift @backends, 'Memcached' if eval { require Cache::Memcached } and IO::Socket::INET->new('127.0.0.1:11211');
> +
> +@backends or plan skip_all => "No back ends available";
> +
> +plan tests => scalar @backends;
> +
> +my $tmp = tmpnam();
> +END {
> + unlink $tmp
> + if defined $tmp
> +}
> +
> +my %init_args = (
> + DBM_Deep => [ $tmp ],
> + JiftyDBI => [ db_init => 1 ],
> + Memcached => [ rand() . $$ ],
> +);
> +
> +for my $backend (@backends) {
> + my $thr = async {
> + my $bus = IPC::PubSub->new( $backend, @{ $init_args{$backend} } );
> + undef $tmp; # stop the END block from being naughty
> + sleep .5;
> + my $pub = $bus->new_publisher( "user-client" );
> + $pub->msg("glat");
> + };
> +
> + my $bus = IPC::PubSub->new( $backend, @{ $init_args{$backend} } );
> + my $sub = $bus->new_subscriber('user-client');
> + my $start_time = time;
> + my @msg;
> + while (time-$start_time < 1) {
> + push @msg, $sub->get and last;
> + }
> +
> + is "@msg", "glat", "concurrent access with $backend";
> +
> + $thr and $thr->join;
> + next;
> +
> + # Fork failures bring us here:
> + SKIP: { skip "Fork failed", 1 }
> +}