Subject: | When using priority, order by jobid too. |
We had an issue at $dayjob where the job queue was not processing the queue in roughly the order they were created causing issues when a large number of jobs (>30,000) were queued. This patch changes the sort order when prioritze is set to "priority DESC, jobid ASC"
Subject: | theschwartz-order.patch |
--- old/TheSchwartz-1.10/lib/TheSchwartz.pm 2010-03-16 07:14:30.000000000 +1000
+++ TheSchwartz-1.10/lib/TheSchwartz.pm 2014-09-23 11:45:57.044698655 +1000
@@ -146,9 +146,9 @@
sub list_jobs {
my TheSchwartz $client = shift;
my $arg = shift;
- my @options;
- push @options, run_after => { op => '<=', value => $arg->{run_after} } if exists $arg->{run_after};
- push @options, grabbed_until => { op => '<=', value => $arg->{grabbed_until} } if exists $arg->{grabbed_until};
+ my (%terms, %options);
+ $terms{run_after} = { op => '<=', value => $arg->{run_after} } if exists $arg->{run_after};
+ $terms{grabbed_until} = { op => '<=', value => $arg->{grabbed_until} } if exists $arg->{grabbed_until};
die "No funcname" unless exists $arg->{funcname};
$arg->{want_handle} = 1 unless defined $arg->{want_handle};
@@ -156,7 +156,15 @@
if ($arg->{coalesce}) {
$arg->{coalesce_op} ||= '=';
- push @options, coalesce => { op => $arg->{coalesce_op}, value => $arg->{coalesce}};
+ $terms{coalesce} = { op => $arg->{coalesce_op}, value => $arg->{coalesce}};
+ }
+
+ $options{limit} = $limit;
+ if ($client->prioritize) {
+ $options{sort} = [
+ { column => 'priority', direction => 'descend' },
+ { column => 'jobid' },
+ ];
}
my @jobs;
@@ -164,11 +172,10 @@
## If the database is dead, skip it
next if $client->is_database_dead($hashdsn);
my $driver = $client->driver_for($hashdsn);
- my $funcid;
if (ref($arg->{funcname})) {
- $funcid = [map { $client->funcname_to_id($driver, $hashdsn, $_) } @{$arg->{funcname}}];
+ $terms{funcid} = [map { $client->funcname_to_id($driver, $hashdsn, $_) } @{$arg->{funcname}}];
} else {
- $funcid = $client->funcname_to_id($driver, $hashdsn, $arg->{funcname});
+ $terms{funcid} = $client->funcname_to_id($driver, $hashdsn, $arg->{funcname});
}
if ($arg->{want_handle}) {
@@ -180,22 +187,9 @@
});
$_->handle($handle);
$_;
- } $driver->search('TheSchwartz::Job' => {
- funcid => $funcid,
- @options
- }, { limit => $limit,
- ( $client->prioritize ? ( sort => 'priority',
- direction => 'descend' ) : () )
- });
+ } $driver->search('TheSchwartz::Job' => \%terms, \%options);
} else {
- push @jobs, $driver->search('TheSchwartz::Job' => {
- funcid => $funcid,
- @options
- }, { limit => $limit,
- ( $client->prioritize ? ( sort => 'priority',
- direction => 'descend' ) : () )
- }
- );
+ push @jobs, $driver->search('TheSchwartz::Job' => \%terms, \%options);
}
}
return @jobs;
@@ -217,6 +211,14 @@
my TheSchwartz $client = shift;
my ($op, $funcname, $coval) = @_;
+ my %options = ( limit => $FIND_JOB_BATCH_SIZE );
+ if ($client->prioritize) {
+ $options{sort} = [
+ { column => 'priority', direction => 'descend' },
+ { column => 'jobid' },
+ ];
+ }
+
for my $hashdsn ($client->shuffled_databases) {
## If the database is dead, skip it
next if $client->is_database_dead($hashdsn);
@@ -238,10 +240,7 @@
run_after => \ "<= $unixtime",
grabbed_until => \ "<= $unixtime",
coalesce => { op => $op, value => $coval },
- }, { limit => $FIND_JOB_BATCH_SIZE,
- ( $client->prioritize ? ( sort => 'priority',
- direction => 'descend' ) : () )
- }
+ }, \%options,
);
};
if ($@) {
@@ -260,6 +259,14 @@
my($worker_classes) = @_;
$worker_classes ||= $client->{current_abilities};
+ my %options = ( limit => $FIND_JOB_BATCH_SIZE );
+ if ($client->prioritize) {
+ $options{sort} = [
+ { column => 'priority', direction => 'descend' },
+ { column => 'jobid' },
+ ];
+ }
+
for my $hashdsn ($client->shuffled_databases) {
## If the database is dead, skip it.
next if $client->is_database_dead($hashdsn);
@@ -281,10 +288,7 @@
funcid => \@ids,
run_after => \ "<= $unixtime",
grabbed_until => \ "<= $unixtime",
- }, { limit => $FIND_JOB_BATCH_SIZE,
- ( $client->prioritize ? ( sort => 'priority',
- direction => 'descend' ) : () )
- }
+ }, \%options,
);
};
if ($@) {