Skip Menu |

This queue is for tickets about the TheSchwartz CPAN distribution.

Report information
The Basics
Id: 65712
Status: resolved
Priority: 0/
Queue: TheSchwartz

People
Owner: Jeff.Fearn [...] gmail.com
Requestors: felix.ostmann [...] thewar.de
Cc:
AdminCc:

Bug Information
Severity: Critical
Broken in: 1.10
Fixed in: 1.12



Subject: replace_job can make a database dead for ever
When a replace_job fail with a duplicate error the database will not come back alive, because no commit/rollback will be executed and every next query only get a error. The patch fix some other things to: Set ->did_something as last as possible (or it will be that and then a txn will fail) Change the schema for pg: - job.jobid from SERIAL to BIGSERIAL (or the other jobid-foreign-columns from BIGINT to INT) - error.message to TEXT, mysql will silently truncate if > 255 bytes will be insertet, pg die if the message is greater. Call $driver->rollback if $errstr is set in ->work_safely Added query_sql to t/lib/db-common.pl Changes t/priority.t & t/unique.t for special pg behavior Added t/replace_abort.t for special duplicate entry case in pg
Subject: theschwartz-1.11.patch
diff -Nru TheSchwartz-1.10/doc/schema-postgres.sql TheSchwartz-1.10.patched/doc/schema-postgres.sql --- TheSchwartz-1.10/doc/schema-postgres.sql 2010-03-15 22:00:55.000000000 +0100 +++ TheSchwartz-1.10.patched/doc/schema-postgres.sql 2011-02-14 12:40:08.000000000 +0100 @@ -23,7 +23,7 @@ ); CREATE TABLE job ( - jobid SERIAL, + jobid BIGSERIAL, funcid INT NOT NULL, arg BYTEA, uniqkey VARCHAR(255) NULL, @@ -50,7 +50,7 @@ CREATE TABLE error ( error_time INTEGER NOT NULL, jobid BIGINT NOT NULL, - message VARCHAR(255) NOT NULL, + message TEXT NOT NULL, funcid INT NOT NULL DEFAULT 0 ); diff -Nru TheSchwartz-1.10/lib/TheSchwartz/Job.pm TheSchwartz-1.10.patched/lib/TheSchwartz/Job.pm --- TheSchwartz-1.10/lib/TheSchwartz/Job.pm 2010-03-15 22:00:55.000000000 +0100 +++ TheSchwartz-1.10.patched/lib/TheSchwartz/Job.pm 2011-02-14 11:12:53.000000000 +0100 @@ -190,9 +190,9 @@ $job->debug("can't call 'completed' on already finished job"); return 0; } - $job->did_something(1); $job->set_exit_status(0); $job->driver->remove($job); + $job->did_something(1); } sub permanent_failure { @@ -237,7 +237,6 @@ sub _failed { my ($job, $msg, $exit_status, $_retry, $failures) = @_; - $job->did_something(1); $job->debug("job failed: " . ($msg || "<no message>")); ## Mark the failure in the error table. @@ -254,6 +253,7 @@ $job->set_exit_status($exit_status || 1); $job->driver->remove($job); } + $job->did_something(1); } sub replace_with { diff -Nru TheSchwartz-1.10/lib/TheSchwartz/Worker.pm TheSchwartz-1.10.patched/lib/TheSchwartz/Worker.pm --- TheSchwartz-1.10/lib/TheSchwartz/Worker.pm 2010-03-15 22:00:55.000000000 +0100 +++ TheSchwartz-1.10.patched/lib/TheSchwartz/Worker.pm 2011-02-14 11:12:50.000000000 +0100 @@ -33,6 +33,10 @@ my $cjob = $client->current_job; if ($errstr) { + # something went wrong, better make a rollback! + my $driver = $cjob->driver; + $driver->rollback; + $job->debug("Eval failure: $errstr"); $cjob->failed($@); } diff -Nru TheSchwartz-1.10/t/lib/db-common.pl TheSchwartz-1.10.patched/t/lib/db-common.pl --- TheSchwartz-1.10/t/lib/db-common.pl 2010-03-15 22:11:59.000000000 +0100 +++ TheSchwartz-1.10.patched/t/lib/db-common.pl 2011-02-14 13:44:55.000000000 +0100 @@ -221,4 +221,16 @@ split /;\s*/, $sql; } +sub query_sql { + my ($dbh, $sql) = @_; + my ($query, $bind) = ref($sql) ? @$sql : ($sql, []); + my $sth = $dbh->prepare($sql); + my $i = 0; + $sth->bind_param(++$i, $_) for @$bind; + $sth->execute; + $sth->bind_columns(\my $result); + $sth->fetch; + return $result; +} + 1; diff -Nru TheSchwartz-1.10/t/priority.t TheSchwartz-1.10.patched/t/priority.t --- TheSchwartz-1.10/t/priority.t 2010-03-15 22:00:55.000000000 +0100 +++ TheSchwartz-1.10.patched/t/priority.t 2011-02-14 12:56:37.000000000 +0100 @@ -20,10 +20,11 @@ $TheSchwartz::FIND_JOB_BATCH_SIZE = 1; for (1..10) { + # Postgres uses ORDER BY priority NULLS FIRST when DESC is used my $job = TheSchwartz::Job->new( funcname => 'Worker::PriorityTest', arg => { num => $_ }, - ( $_ == 1 ? () : ( priority => $_ ) ), + ( !$ENV{USE_PGSQL} && $_ == 1 ? () : ( priority => $_ ) ), ); my $h = $client->insert($job); ok($h, "inserted job (priority $_)"); @@ -35,7 +36,8 @@ Worker::PriorityTest->set_client($client); for (1..10) { - $record_expected = 11 - $_ == 1 ? undef : 11 - $_; + # Postgres uses ORDER BY priority NULLS FIRST when DESC is used + $record_expected = !$ENV{USE_PGSQL} && 11 - $_ == 1 ? undef : 11 - $_; my $rv = eval { $client->work_once; }; ok($rv, "did stuff"); } diff -Nru TheSchwartz-1.10/t/replace-abort.t TheSchwartz-1.10.patched/t/replace-abort.t --- TheSchwartz-1.10/t/replace-abort.t 1970-01-01 01:00:00.000000000 +0100 +++ TheSchwartz-1.10.patched/t/replace-abort.t 2011-02-14 12:36:58.000000000 +0100 @@ -0,0 +1,168 @@ +# -*-perl-*- + +use strict; +use warnings; + +require 't/lib/db-common.pl'; + +use TheSchwartz; +use Test::More tests => 13; + +run_tests_pgsql(13, sub { + my $client1 = test_client(dbs => ['ts1']); + my $client2 = test_client(dbs => ['ts1']); + + my $driver = $client1->driver_for( ($client1->shuffled_databases)[0] ); + my $dbh = $driver->rw_handle; + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey IN ('1','2','3','4','5');"), + 0, + 'namespace empty', + ); + + + $client1->can_do('Test::Job::Completed'); + $client2->can_do('Test::Job::Replace'); + +# job 1 + $client1->insert(TheSchwartz::Job->new( + funcname => 'Test::Job::Completed', + uniqkey => 1, + )); + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '1';"), + 1, + 'Job 1 gepostet', + ); + + +# Job 1 + $client1->work_once; + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '1';"), + 0, + 'Job 1 abgearbeitet', + ); + +# Job 2 + $client2->insert(TheSchwartz::Job->new( + funcname => 'Test::Job::Replace', + uniqkey => 2, + arg => 3, + )); + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '2';"), + 1, + 'Job 2 gepostet', + ); + +# Job 2 + $client2->work_once; + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '2';"), + 0, + 'Job 2 abgearbeitet', + ); + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"), + 1, + 'Job 2 ersetzt durch Job 3', + ); + +# Job 4 + $client2->insert(TheSchwartz::Job->new( + funcname => 'Test::Job::Replace', + uniqkey => 4, + arg => 3, + )); + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '4';"), + 1, + 'Job 4 gepostet', + ); + +# Job 4 + $client2->work_once; + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '4';"), + 1, + 'Job 4 abgebrochen', + ); + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"), + 1, + 'Job 4 nicht durch Job 3 ersetzt', + ); + +# Job 3 + $client1->work_once; + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"), + 0, + 'Job 3 abgearbeitet', + ); + +# cleanup job.run_after & retry_at, so we dont have to wait + $dbh->do("UPDATE job SET run_after = 0 WHERE uniqkey = '4';"); + $client2->{retry_at} = {}; + +# Job 4 + $client2->work_once; + + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '4';"), + 0, + 'Job 4 abgearbeitet', + ); + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"), + 1, + 'Job 4 ersetzt durch Job 3', + ); + +# Job 5 + $client1->work_once; + + is( + query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"), + 0, + 'Job 3 erneut abgearbeitet', + ); +}); + + + + +# TheSchwartz Worker/Jobs +package Test::Job::Completed; + +use base qw(TheSchwartz::Worker); + +sub work { + my ($client, $job) = @_; + $job->completed; +} +sub max_retries { 10; } + +package Test::Job::Replace; + +use base qw(TheSchwartz::Worker); + +sub work { + my ($client, $job) = @_; + $job->replace_with(TheSchwartz::Job->new( + funcname => 'Test::Job::Completed', + uniqkey => $job->arg, + )); +} +sub max_retries { 10; } + diff -Nru TheSchwartz-1.10/t/unique.t TheSchwartz-1.10.patched/t/unique.t --- TheSchwartz-1.10/t/unique.t 2010-03-15 22:00:55.000000000 +0100 +++ TheSchwartz-1.10.patched/t/unique.t 2011-02-14 13:01:13.000000000 +0100 @@ -34,6 +34,9 @@ $handle = $client->insert($job); ok(! $handle, 'no handle'); + # pg failes and marks the database as dead + $client->{retry_at} = {}; + # insert same uniqkey, but different func $job = TheSchwartz::Job->new( funcname => 'scratch',
From: felix.ostmann [...] thewar.de
one small question: also in work_safely there is the following code: if ($errstr) { $job->debug("Eval failure: $errstr"); $cjob->failed($@); } Why not $cjob->failed($errstr)? I got the following output: Eval failure: Cmd 'sudo /usr/bin/rsync -z --bwlimit=1024 ... failed: 255' job failed. considering retry. is max_retries of 1000000 >= failures of 1? job failed: <no message>
Hi, Would you mind, making one or more pull requests on github? https://github.com/sixapart/TheSchwartz/ What would be helpful is to get one commit per individual change, so that we can investigate each logical change separately. Thanks, Yann
From: felix.ostmann [...] thewar.de
ofc i can :) Am Di 15. Feb 2011, 15:41:34, YANNK schrieb: Show quoted text
> Hi, > > Would you mind, making one or more pull requests on github? > https://github.com/sixapart/TheSchwartz/ > > What would be helpful is to get one commit per individual change, so > that we can investigate > each logical change separately. > > Thanks, > > Yann
Hi Felix, can you do separate pull requests for these changes on my github? git@github.com:jfearn/TheSchwartz.git Sorry the repeat request. Cheers, Jeff.
This fix was shipped in 1.12