Skip Menu |

This queue is for tickets about the TheSchwartz CPAN distribution.

Report information
The Basics
Id: 99075
Status: resolved
Priority: 0/
Queue: TheSchwartz

People
Owner: Jeff.Fearn [...] gmail.com
Requestors: SGREEN [...] cpan.org
Cc:
AdminCc:

Bug Information
Severity: Important
Broken in: (no value)
Fixed in: 1.11



Subject: When using priority, order by jobid too.
We had an issue at $dayjob where the job queue was not processing the queue in roughly the order they were created causing issues when a large number of jobs (>30,000) were queued. This patch changes the sort order when prioritze is set to "priority DESC, jobid ASC"
Subject: theschwartz-order.patch
--- old/TheSchwartz-1.10/lib/TheSchwartz.pm 2010-03-16 07:14:30.000000000 +1000 +++ TheSchwartz-1.10/lib/TheSchwartz.pm 2014-09-23 11:45:57.044698655 +1000 @@ -146,9 +146,9 @@ sub list_jobs { my TheSchwartz $client = shift; my $arg = shift; - my @options; - push @options, run_after => { op => '<=', value => $arg->{run_after} } if exists $arg->{run_after}; - push @options, grabbed_until => { op => '<=', value => $arg->{grabbed_until} } if exists $arg->{grabbed_until}; + my (%terms, %options); + $terms{run_after} = { op => '<=', value => $arg->{run_after} } if exists $arg->{run_after}; + $terms{grabbed_until} = { op => '<=', value => $arg->{grabbed_until} } if exists $arg->{grabbed_until}; die "No funcname" unless exists $arg->{funcname}; $arg->{want_handle} = 1 unless defined $arg->{want_handle}; @@ -156,7 +156,15 @@ if ($arg->{coalesce}) { $arg->{coalesce_op} ||= '='; - push @options, coalesce => { op => $arg->{coalesce_op}, value => $arg->{coalesce}}; + $terms{coalesce} = { op => $arg->{coalesce_op}, value => $arg->{coalesce}}; + } + + $options{limit} = $limit; + if ($client->prioritize) { + $options{sort} = [ + { column => 'priority', direction => 'descend' }, + { column => 'jobid' }, + ]; } my @jobs; @@ -164,11 +172,10 @@ ## If the database is dead, skip it next if $client->is_database_dead($hashdsn); my $driver = $client->driver_for($hashdsn); - my $funcid; if (ref($arg->{funcname})) { - $funcid = [map { $client->funcname_to_id($driver, $hashdsn, $_) } @{$arg->{funcname}}]; + $terms{funcid} = [map { $client->funcname_to_id($driver, $hashdsn, $_) } @{$arg->{funcname}}]; } else { - $funcid = $client->funcname_to_id($driver, $hashdsn, $arg->{funcname}); + $terms{funcid} = $client->funcname_to_id($driver, $hashdsn, $arg->{funcname}); } if ($arg->{want_handle}) { @@ -180,22 +187,9 @@ }); $_->handle($handle); $_; - } $driver->search('TheSchwartz::Job' => { - funcid => $funcid, - @options - }, { limit => $limit, - ( $client->prioritize ? ( sort => 'priority', - direction => 'descend' ) : () ) - }); + } $driver->search('TheSchwartz::Job' => \%terms, \%options); } else { - push @jobs, $driver->search('TheSchwartz::Job' => { - funcid => $funcid, - @options - }, { limit => $limit, - ( $client->prioritize ? ( sort => 'priority', - direction => 'descend' ) : () ) - } - ); + push @jobs, $driver->search('TheSchwartz::Job' => \%terms, \%options); } } return @jobs; @@ -217,6 +211,14 @@ my TheSchwartz $client = shift; my ($op, $funcname, $coval) = @_; + my %options = ( limit => $FIND_JOB_BATCH_SIZE ); + if ($client->prioritize) { + $options{sort} = [ + { column => 'priority', direction => 'descend' }, + { column => 'jobid' }, + ]; + } + for my $hashdsn ($client->shuffled_databases) { ## If the database is dead, skip it next if $client->is_database_dead($hashdsn); @@ -238,10 +240,7 @@ run_after => \ "<= $unixtime", grabbed_until => \ "<= $unixtime", coalesce => { op => $op, value => $coval }, - }, { limit => $FIND_JOB_BATCH_SIZE, - ( $client->prioritize ? ( sort => 'priority', - direction => 'descend' ) : () ) - } + }, \%options, ); }; if ($@) { @@ -260,6 +259,14 @@ my($worker_classes) = @_; $worker_classes ||= $client->{current_abilities}; + my %options = ( limit => $FIND_JOB_BATCH_SIZE ); + if ($client->prioritize) { + $options{sort} = [ + { column => 'priority', direction => 'descend' }, + { column => 'jobid' }, + ]; + } + for my $hashdsn ($client->shuffled_databases) { ## If the database is dead, skip it. next if $client->is_database_dead($hashdsn); @@ -281,10 +288,7 @@ funcid => \@ids, run_after => \ "<= $unixtime", grabbed_until => \ "<= $unixtime", - }, { limit => $FIND_JOB_BATCH_SIZE, - ( $client->prioritize ? ( sort => 'priority', - direction => 'descend' ) : () ) - } + }, \%options, ); }; if ($@) {
Added patch, also added test. NOTE: the order for prioritize is not gauranteed unless FIND_JOB_BATCH_SIZE is set to 1. This is due to _grab_a_job running shuffle. See RT #72815 To git@github.com:jfearn/TheSchwartz.git 92bb661..7a137a7 master -> master
A fix for this issue shipped in Version 1.11.