Subject: | replace_job can make a database dead for ever |
When a replace_job fail with a duplicate error the database will not
come back alive, because no commit/rollback will be executed and every
next query only get a error.
The patch fix some other things to:
Set ->did_something as last as possible (or it will be that and then a
txn will fail)
Change the schema for pg:
- job.jobid from SERIAL to BIGSERIAL (or the other jobid-foreign-columns
from BIGINT to INT)
- error.message to TEXT, mysql will silently truncate if > 255 bytes
will be insertet, pg die if the message is greater.
Call $driver->rollback if $errstr is set in ->work_safely
Added query_sql to t/lib/db-common.pl
Changes t/priority.t & t/unique.t for special pg behavior
Added t/replace_abort.t for special duplicate entry case in pg
Subject: | theschwartz-1.11.patch |
diff -Nru TheSchwartz-1.10/doc/schema-postgres.sql TheSchwartz-1.10.patched/doc/schema-postgres.sql
--- TheSchwartz-1.10/doc/schema-postgres.sql 2010-03-15 22:00:55.000000000 +0100
+++ TheSchwartz-1.10.patched/doc/schema-postgres.sql 2011-02-14 12:40:08.000000000 +0100
@@ -23,7 +23,7 @@
);
CREATE TABLE job (
- jobid SERIAL,
+ jobid BIGSERIAL,
funcid INT NOT NULL,
arg BYTEA,
uniqkey VARCHAR(255) NULL,
@@ -50,7 +50,7 @@
CREATE TABLE error (
error_time INTEGER NOT NULL,
jobid BIGINT NOT NULL,
- message VARCHAR(255) NOT NULL,
+ message TEXT NOT NULL,
funcid INT NOT NULL DEFAULT 0
);
diff -Nru TheSchwartz-1.10/lib/TheSchwartz/Job.pm TheSchwartz-1.10.patched/lib/TheSchwartz/Job.pm
--- TheSchwartz-1.10/lib/TheSchwartz/Job.pm 2010-03-15 22:00:55.000000000 +0100
+++ TheSchwartz-1.10.patched/lib/TheSchwartz/Job.pm 2011-02-14 11:12:53.000000000 +0100
@@ -190,9 +190,9 @@
$job->debug("can't call 'completed' on already finished job");
return 0;
}
- $job->did_something(1);
$job->set_exit_status(0);
$job->driver->remove($job);
+ $job->did_something(1);
}
sub permanent_failure {
@@ -237,7 +237,6 @@
sub _failed {
my ($job, $msg, $exit_status, $_retry, $failures) = @_;
- $job->did_something(1);
$job->debug("job failed: " . ($msg || "<no message>"));
## Mark the failure in the error table.
@@ -254,6 +253,7 @@
$job->set_exit_status($exit_status || 1);
$job->driver->remove($job);
}
+ $job->did_something(1);
}
sub replace_with {
diff -Nru TheSchwartz-1.10/lib/TheSchwartz/Worker.pm TheSchwartz-1.10.patched/lib/TheSchwartz/Worker.pm
--- TheSchwartz-1.10/lib/TheSchwartz/Worker.pm 2010-03-15 22:00:55.000000000 +0100
+++ TheSchwartz-1.10.patched/lib/TheSchwartz/Worker.pm 2011-02-14 11:12:50.000000000 +0100
@@ -33,6 +33,10 @@
my $cjob = $client->current_job;
if ($errstr) {
+ # something went wrong, better make a rollback!
+ my $driver = $cjob->driver;
+ $driver->rollback;
+
$job->debug("Eval failure: $errstr");
$cjob->failed($@);
}
diff -Nru TheSchwartz-1.10/t/lib/db-common.pl TheSchwartz-1.10.patched/t/lib/db-common.pl
--- TheSchwartz-1.10/t/lib/db-common.pl 2010-03-15 22:11:59.000000000 +0100
+++ TheSchwartz-1.10.patched/t/lib/db-common.pl 2011-02-14 13:44:55.000000000 +0100
@@ -221,4 +221,16 @@
split /;\s*/, $sql;
}
+sub query_sql {
+ my ($dbh, $sql) = @_;
+ my ($query, $bind) = ref($sql) ? @$sql : ($sql, []);
+ my $sth = $dbh->prepare($sql);
+ my $i = 0;
+ $sth->bind_param(++$i, $_) for @$bind;
+ $sth->execute;
+ $sth->bind_columns(\my $result);
+ $sth->fetch;
+ return $result;
+}
+
1;
diff -Nru TheSchwartz-1.10/t/priority.t TheSchwartz-1.10.patched/t/priority.t
--- TheSchwartz-1.10/t/priority.t 2010-03-15 22:00:55.000000000 +0100
+++ TheSchwartz-1.10.patched/t/priority.t 2011-02-14 12:56:37.000000000 +0100
@@ -20,10 +20,11 @@
$TheSchwartz::FIND_JOB_BATCH_SIZE = 1;
for (1..10) {
+ # Postgres uses ORDER BY priority NULLS FIRST when DESC is used
my $job = TheSchwartz::Job->new(
funcname => 'Worker::PriorityTest',
arg => { num => $_ },
- ( $_ == 1 ? () : ( priority => $_ ) ),
+ ( !$ENV{USE_PGSQL} && $_ == 1 ? () : ( priority => $_ ) ),
);
my $h = $client->insert($job);
ok($h, "inserted job (priority $_)");
@@ -35,7 +36,8 @@
Worker::PriorityTest->set_client($client);
for (1..10) {
- $record_expected = 11 - $_ == 1 ? undef : 11 - $_;
+ # Postgres uses ORDER BY priority NULLS FIRST when DESC is used
+ $record_expected = !$ENV{USE_PGSQL} && 11 - $_ == 1 ? undef : 11 - $_;
my $rv = eval { $client->work_once; };
ok($rv, "did stuff");
}
diff -Nru TheSchwartz-1.10/t/replace-abort.t TheSchwartz-1.10.patched/t/replace-abort.t
--- TheSchwartz-1.10/t/replace-abort.t 1970-01-01 01:00:00.000000000 +0100
+++ TheSchwartz-1.10.patched/t/replace-abort.t 2011-02-14 12:36:58.000000000 +0100
@@ -0,0 +1,168 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+
+require 't/lib/db-common.pl';
+
+use TheSchwartz;
+use Test::More tests => 13;
+
+run_tests_pgsql(13, sub {
+ my $client1 = test_client(dbs => ['ts1']);
+ my $client2 = test_client(dbs => ['ts1']);
+
+ my $driver = $client1->driver_for( ($client1->shuffled_databases)[0] );
+ my $dbh = $driver->rw_handle;
+
+ is(
+ query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey IN ('1','2','3','4','5');"),
+ 0,
+ 'namespace empty',
+ );
+
+
+ $client1->can_do('Test::Job::Completed');
+ $client2->can_do('Test::Job::Replace');
+
+# job 1
+ $client1->insert(TheSchwartz::Job->new(
+ funcname => 'Test::Job::Completed',
+ uniqkey => 1,
+ ));
+
+ is(
+ query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '1';"),
+ 1,
+ 'Job 1 gepostet',
+ );
+
+
+# Job 1
+ $client1->work_once;
+
+ is(
+ query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '1';"),
+ 0,
+ 'Job 1 abgearbeitet',
+ );
+
+# Job 2
+ $client2->insert(TheSchwartz::Job->new(
+ funcname => 'Test::Job::Replace',
+ uniqkey => 2,
+ arg => 3,
+ ));
+
+ is(
+ query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '2';"),
+ 1,
+ 'Job 2 gepostet',
+ );
+
+# Job 2
+ $client2->work_once;
+
+ is(
+ query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '2';"),
+ 0,
+ 'Job 2 abgearbeitet',
+ );
+ is(
+ query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"),
+ 1,
+ 'Job 2 ersetzt durch Job 3',
+ );
+
+# Job 4
+ $client2->insert(TheSchwartz::Job->new(
+ funcname => 'Test::Job::Replace',
+ uniqkey => 4,
+ arg => 3,
+ ));
+
+ is(
+ query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '4';"),
+ 1,
+ 'Job 4 gepostet',
+ );
+
+# Job 4
+ $client2->work_once;
+
+ is(
+ query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '4';"),
+ 1,
+ 'Job 4 abgebrochen',
+ );
+ is(
+ query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"),
+ 1,
+ 'Job 4 nicht durch Job 3 ersetzt',
+ );
+
+# Job 3
+ $client1->work_once;
+
+ is(
+ query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"),
+ 0,
+ 'Job 3 abgearbeitet',
+ );
+
+# cleanup job.run_after & retry_at, so we dont have to wait
+ $dbh->do("UPDATE job SET run_after = 0 WHERE uniqkey = '4';");
+ $client2->{retry_at} = {};
+
+# Job 4
+ $client2->work_once;
+
+
+ is(
+ query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '4';"),
+ 0,
+ 'Job 4 abgearbeitet',
+ );
+ is(
+ query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"),
+ 1,
+ 'Job 4 ersetzt durch Job 3',
+ );
+
+# Job 5
+ $client1->work_once;
+
+ is(
+ query_sql($dbh, "SELECT COUNT(*) FROM job WHERE uniqkey = '3';"),
+ 0,
+ 'Job 3 erneut abgearbeitet',
+ );
+});
+
+
+
+
+# TheSchwartz Worker/Jobs
+package Test::Job::Completed;
+
+use base qw(TheSchwartz::Worker);
+
+sub work {
+ my ($client, $job) = @_;
+ $job->completed;
+}
+sub max_retries { 10; }
+
+package Test::Job::Replace;
+
+use base qw(TheSchwartz::Worker);
+
+sub work {
+ my ($client, $job) = @_;
+ $job->replace_with(TheSchwartz::Job->new(
+ funcname => 'Test::Job::Completed',
+ uniqkey => $job->arg,
+ ));
+}
+sub max_retries { 10; }
+
diff -Nru TheSchwartz-1.10/t/unique.t TheSchwartz-1.10.patched/t/unique.t
--- TheSchwartz-1.10/t/unique.t 2010-03-15 22:00:55.000000000 +0100
+++ TheSchwartz-1.10.patched/t/unique.t 2011-02-14 13:01:13.000000000 +0100
@@ -34,6 +34,9 @@
$handle = $client->insert($job);
ok(! $handle, 'no handle');
+ # pg failes and marks the database as dead
+ $client->{retry_at} = {};
+
# insert same uniqkey, but different func
$job = TheSchwartz::Job->new(
funcname => 'scratch',