Skip Menu |

This queue is for tickets about the Thread-Queue CPAN distribution.

Report information
The Basics
Id: 79733
Status: resolved
Priority: 0/
Queue: Thread-Queue

People
Owner: Nobody in particular
Requestors: mschwern [...] cpan.org
Cc:
AdminCc:

Bug Information
Severity: Wishlist
Broken in: 2.12
Fixed in: 3.01



Subject: Idea: queue non-blocking option
I have an idea for Thread::Queue: a method which switches the whole queue to non-blocking. This will make all calls to dequeue() non-blocking. The purpose is to indicate that no more data is coming and threads can safely shut down. I hit upon this idea trying to take the SYNOPSIS into practice and hit a snag... my $q = Thread::Queue->new(); my $thr = threads->create(sub { while (my $item = $q->dequeue()) { # Do work on $item } })->detach(); # Send work to the thread $q->enqueue($item1, ...); The program will exit as soon as its done queuing and take the detached threads with it. I considered a few ways to fix this... while( $q->pending ) { sleep 1; } Polling sucks. Worse, this could exit while workers are working on the last items. $_->join for threads->list(threads::running); Seems simple enough, wait for all the threads to finish working. Except all the threads block on dequeue waiting for data that will never come. I can't use dequeue_nb(), that risks the threads ending before I get a chance to populate the queue. If I prepopulate the queue, there's no guarantee it won't be exhausted before I can add more. There's various ways to work around this, all seem overcomplicated. This is where the new feature comes in. $q->blocking(0); $_->join for threads->list(threads::running); When the queue is exhausted, dequeue() returns undef instead of blocking and all the threads finish. Simple. There would be a corresponding is_blocking() method. I'd originally thought about this as "the queue is finished", because that's my particular purpose, but that narrowed its utility too much. For example, if you're done adding items to a queue you probably wouldn't allow more to be enqueued. Happy to implement it. Seems simple enough, dequeue() just checks the blocking() flag and punts to dequeue_nb() if its false. blocking would default to true and be an option to new(). What do you think?
Good catch. I agree with your assessment of the example code in the SYNOPSIS. I'm going to change it to the following: use strict; use warnings; use threads; use Thread::Queue; my $q = Thread::Queue->new(); # A new empty queue # Worker thread my $thr = threads->create( sub { # Thread will loop until no more work while (defined(my $item = $q->dequeue())) { # Do work on $item ... } } ); # Send work to the thread $q->enqueue($item1, ...); # Send 'undef' to thread to signal no more work $q->enqueue(undef); # Join up with the thread when it finishes $thr->join(); I believe this addresses your concerns regarding the literal usage of the sample code. Of course, if 'undef' is a valid work time for the thread, then something more elaborate would be needed - such as an explicit "no more work" flag - but that's an exercise to be left up to the user. I don't deny that your suggestion may have potential merit, but I don't see that a blocking/non-blocking switch on the dequeue() call adds any advantages over the "solution" above (and it feels to me that it starts to diverge from the KISS principle). I'm, of course, open to more discussion if you have other insights on this.
I'd written up this whole thing about all the reasons having to put together your own end-of-queue flag is inconvenient and bug prone and harder to read and so on. And then I notice something. Your solution only works if there's one thread. D'oh! The first thread dequeues the magic value and the rest hang. I couldn't figure out how to do it. You, the author, made a mistake doing it. The original SYNOPSIS didn't think of it. I think that's a pretty solid argument for providing a built in, explicit, safe, no-edge case way to solve this problem. I'll code that patch up. Here's my now moot magnum opus. -------------------------------------------------------------------- I still think the block/non-block flag, or something like it, has merit. If for no other reason than that I didn't think of a solution and neither did the original author of the SYNOPSIS. And, as you pointed out, it avoids the issue of if undef is valid data and having to make up your own "end of queue" value. Proving your own "the queue is done" value is hacky and has edge cases. It means the user of the queue has to be *careful* and that's something you don't want, especially in threads. The queue management code and the queue worker code might be very far away from each other, and they must coordinate their magic value use. There could be multiple points of use of the same queue, each having to be updated if the magic value changes. If an undef accidentally gets into the queue, all your workers will stop. If a queue forgets to check for the magic value, it will hang waiting for more data. And if TQ doesn't provide a way to do it, everybody's going to write their own clever way which makes reading code harder and introduces bugs. That's off the top of my head. Lots of subtle things can go wrong, which means bugs. Avoidable bugs. As for which one is simpler, let's see it written out in a way which provides all the safeguards but doesn't use any extra methods. use strict; use warnings; use threads; use Thread::Queue; my $q = Thread::Queue->new(); # A new empty queue # Worker thread my $thr = threads->create( sub { # Thread will loop until no more work is coming while (1) { my $item = $q->dequeue; last if eval { $item->isa("Thread::Queue::Stop") }; # Do work on $item ...; } }); # Send work to the thread $q->enqueue($item1, ...); # Signal no more work is coming my $stop_flag = bless {}, "Thread::Queue::Stop"; $q->enqueue($stop_flag); # Join up with the thread when it finishes $thr->join(); Yuck. You can probably do better, but TQ is a convenience module. It's not about if it's better than the *best* solution a user could come up with. It's about being better than the solution a user is *likely* to come up with. Just because there exists a solution doesn't mean one can't provide a more obvious, more convenient and pre-packaged one. For example, this whole module is just a convenience wrapper around a shared array. done() is a trivial amount of code, and the block/non-block switch comes along for free.
The attached patch implements done() and should_block() as discussed earlier. It also adds an example of its use. I tried to follow the existing style as much as possible. In implementing I realized that its necessary for done() to unblock any threads blocking on dequeue() should done() be called after the queue is already empty. That issue alone necessitates done() being part of Thread::Queue. I had done() issue a cond_broadcast(). I've never gotten into the cond_blah stuff, so please triple check that work. In order to allow the queue to have flags, it was necessary to turn it into a hash ref object rather than a array ref. Most of the change is that. I merged the duplicated bits of dequeue() and dequeue_nb().
Subject: done.patch
diff --git a/lib/Thread/Queue.pm b/lib/Thread/Queue.pm index 8588ed5..7b43f82 100644 --- a/lib/Thread/Queue.pm +++ b/lib/Thread/Queue.pm @@ -20,72 +20,119 @@ sub new { my $class = shift; my @queue :shared = map { shared_clone($_) } @_; - return bless(\@queue, $class); + my %self :shared = ( queue => \@queue, should_block => 1 ); + return bless(\%self, $class); } # Add items to the tail of a queue sub enqueue { - my $queue = shift; - lock(@$queue); + my $self = shift; + lock(%$self); + + my $queue = $self->{queue}; push(@$queue, map { shared_clone($_) } @_) - and cond_signal(@$queue); + and cond_signal(%$self); } # Return a count of the number of items on a queue sub pending { - my $queue = shift; - lock(@$queue); + my $self = shift; + lock(%$self); + + my $queue = $self->{queue}; return scalar(@$queue); } +# Indicate that no more data will enter the queue +sub done +{ + my $self = shift; + lock $self; + + # No more data is coming, don't block on an empty queue + $self->should_block(0); + + # Release all blocked queues + cond_broadcast(%$self); + + return; +} + +# Should dequeue block? +sub should_block +{ + my $self = shift; + + if( @_ ) { + $self->{should_block} = shift; + return; + } + + return $self->{should_block}; +} + + # Return 1 or more items from the head of a queue, blocking if needed sub dequeue { - my $queue = shift; - lock(@$queue); + my $self = shift; + lock(%$self); + + my $queue = $self->{queue}; my $count = @_ ? $validate_count->(shift) : 1; # Wait for requisite number of items - cond_wait(@$queue) until (@$queue >= $count); - cond_signal(@$queue) if (@$queue > $count); + cond_wait(%$self) while ($self->should_block && @$queue < $count); + cond_signal(%$self) if (@$queue > $count); - # Return single item - return shift(@$queue) if ($count == 1); - - # Return multiple items - my @items; - push(@items, shift(@$queue)) for (1..$count); - return @items; + return $self->_dequeue_nb($count); } # Return items from the head of a queue with no blocking sub dequeue_nb { - my $queue = shift; - lock(@$queue); + my $self = shift; + lock(%$self); + + my $queue = $self->{queue}; my $count = @_ ? $validate_count->(shift) : 1; + # If there's not enough left in the queue, return what's left. + $count = @$queue if @$queue < $count; + + return $self->_dequeue_nb($count); +} + +# Do the dequeuing. Assume proper input and that the queue is locked. +sub _dequeue_nb { + my $self = shift; + my $queue = $self->{queue}; + + my $count = shift; + + return if ($count == 0); + # Return single item return shift(@$queue) if ($count == 1); # Return multiple items my @items; - for (1..$count) { - last if (! @$queue); - push(@items, shift(@$queue)); - } + push(@items, shift(@$queue)) for (1..$count); return @items; } # Return an item without removing it from a queue sub peek { - my $queue = shift; - lock(@$queue); + my $self = shift; + lock(%$self); + + my $queue = $self->{queue}; + my $index = @_ ? $validate_index->(shift) : 0; return $$queue[$index]; } @@ -93,8 +140,10 @@ sub peek # Insert items anywhere into a queue sub insert { - my $queue = shift; - lock(@$queue); + my $self = shift; + lock(%$self); + + my $queue = $self->{queue}; my $index = $validate_index->(shift); @@ -121,14 +170,16 @@ sub insert push(@$queue, @tmp); # Soup's up - cond_signal(@$queue); + cond_signal(%$self); } # Remove items from anywhere in a queue sub extract { - my $queue = shift; - lock(@$queue); + my $self = shift; + lock(%$self); + + my $queue = $self->{queue}; my $index = @_ ? $validate_index->(shift) : 0; my $count = @_ ? $validate_count->(shift) : 1; @@ -139,7 +190,7 @@ sub extract if ($index < 0) { $count += $index; return if ($count <= 0); # Beyond the head of the queue - return $queue->dequeue_nb($count); # Extract from the head + return $self->dequeue_nb($count); # Extract from the head } } @@ -224,14 +275,16 @@ This document describes Thread::Queue version 2.12 # Worker thread my $thr = threads->create(sub { - while (my $item = $q->dequeue()) { - # Do work on $item - } - })->detach(); + while (my $item = $q->dequeue()) { + # Do work on $item + } + })->detach(); # Send work to the thread $q->enqueue($item1, ...); + # Done adding things to the queue, unblock any threads + $q->done; # Count of items in the queue my $left = $q->pending(); @@ -328,9 +381,13 @@ Adds a list of items onto the end of the queue. =item ->dequeue(COUNT) Removes the requested number of items (default is 1) from the head of the -queue, and returns them. If the queue contains fewer than the requested -number of items, then the thread will be blocked until the requisite number -of items are available (i.e., until other threads <enqueue> more items). +queue, and returns them. + +If the queue contains fewer than the requested number of items, then +the thread will be blocked until the requisite number of items are +available (i.e., until other threads <enqueue> more items) unless C<< +$queue->should_block >> is false as after C<< $queue->done >> is +called. =item ->dequeue_nb() @@ -346,6 +403,14 @@ returned. Returns the number of items still in the queue. +=item ->done() + +Declares that no more items will be added to the queue. + +All queues blocked waiting for items will unblock. All further +dequeues and similar methods will no longer block waiting for more +items. + =back =head1 ADVANCED METHODS @@ -442,8 +507,77 @@ greater than zero): # Queue now contains: bar, baz my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0 +=item ->should_block + +=item ->should_block(BOOLEAN) + +Get/set whether dequeue() and other blocking methods should block +while waiting for more items. + +Defaults to true. + =back +=head1 EXAMPLE + +=head2 Incrementally feed a large amount of data to a queue + +Let's say you can't build the whole queue at once. Maybe it's too +much data to fit in memory. Or maybe it just takes a long time to get +it all. + +Here's an example with illustrating print statements how to slowly +feed data to a queue while its being worked on using C<enqueue>, +C<dequeue> and C<done>. + + use strict; + use warnings; + + use threads; + use Thread::Queue; + + # This is our stand in for the slow data feed. It will + # return a random number of items after a pause. + my @data = (1..50); + sub get_data { + sleep 1; + return splice @data, 0, int(rand(10)+1); + } + + # A new empty queue + my $q = Thread::Queue->new(); + + # Make five worker threads. + # They will start and wait for work in the queue. + my @threads; + for my $thread (1..5) { + push @threads, threads->create(sub { + my $tid = threads->tid; + print "$tid: starting\n"; + while (my $item = $q->dequeue()) { + print "$tid: got $item\n"; + } + print "$tid: done\n"; + }); + } + + # After the workers have started, slowly add work into the queue. + while( my @data = get_data() ) { + print "Adding more data to the queue.\n"; + $q->enqueue(@data); + } + + # No more data, declare the queue done. + $q->done; + print "Done\n"; + + for my $thread (@threads) { + my $tid = $thread->tid; + print "$tid: joining\n"; + $thread->join; + print "$tid: joined\b"; + } + =head1 NOTES Queues created by L<Thread::Queue> can be used in both threaded and diff --git a/t/09_done_while_empty.t b/t/09_done_while_empty.t new file mode 100644 index 0000000..b0316a6 --- /dev/null +++ b/t/09_done_while_empty.t @@ -0,0 +1,53 @@ +# Test that threads will unblock if done() is called after the queue is +# empty and threads are blocking. + +use strict; +use warnings; + +use Config; + +BEGIN { + if (! $Config{'useithreads'}) { + print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); + exit(0); + } +} + +use threads; +use Thread::Queue; + +use Test::More; + +my @items = 1..20; +my $num_threads = 3; +plan tests => @items + ($num_threads * 2); + +my $q = Thread::Queue->new(); + +my @threads; +for my $i (1..$num_threads) { + push @threads, threads->create( sub { + # Thread will loop until no more work is coming + while (defined( my $item = $q->dequeue )) { + pass("'$item' read from queue"); + select(undef, undef, undef, rand(1)); + } + pass("Thread $i exiting"); + }); +} + +$q->enqueue(@items); + +# Make sure there's nothing in the queue and threads are blocking. +sleep 1 while $q->pending; +sleep 1; + +# Signal no more work is coming to the blocked threads, they +# should unblock. +$q->done; +note "Done sent"; + +for my $thread (@threads) { + $thread->join; + pass($thread->tid." joined"); +} diff --git a/t/10_done_not_empty.t b/t/10_done_not_empty.t new file mode 100644 index 0000000..e585136 --- /dev/null +++ b/t/10_done_not_empty.t @@ -0,0 +1,48 @@ +# Test that threads will keep going after done() is called if there's more +# in the queue. + +use strict; +use warnings; + +use Config; + +BEGIN { + if (! $Config{'useithreads'}) { + print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); + exit(0); + } +} + +use threads; +use Thread::Queue; + +use Test::More; + +my @items = 1..30; +my $num_threads = 3; +plan tests => @items + ($num_threads * 2); + +my $q = Thread::Queue->new(); + +my @threads; +for my $i (1..$num_threads) { + push @threads, threads->create( sub { + # Thread will loop until no more work is coming + while (defined( my $item = $q->dequeue )) { + pass("'$item' read from queue"); + select(undef, undef, undef, rand(1)); + } + pass("Thread $i exiting"); + }); +} + +$q->enqueue(@items); + +# Signal no more work is coming while there's still stuff in the queue. +$q->done; +note "Done sent"; + +for my $thread (@threads) { + $thread->join; + pass($thread->tid." joined"); +}
Subject: Re: [rt.cpan.org #79733] Idea: queue non-blocking option
Date: Tue, 23 Oct 2012 15:28:59 -0400
To: bug-Thread-Queue [...] rt.cpan.org
From: "Jerry D. Hedden" <jdhedden [...] cpan.org>
On 2012-10-22 15:44:26, MSCHWERN wrote: Show quoted text
> Your solution only works if there's one thread. D'oh! > The first thread dequeues the magic value and the rest > hang. > > I couldn't figure out how to do it. You, the author, made > a mistake doing it. The original SYNOPSIS didn't think of > it. I think that's a pretty solid argument for providing > a built in, explicit, safe, no-edge case way to solve this > problem.
No, I didn't make a mistake. My example was simplistic with only one thread. If you have more, then you send more 'undef's - one (or more as appropriate) per thread. For example: $_->enqueue(undef) foreach (threads->list(threads::running)); Since each thread will only dequeue one 'undef' to stop work, this is the solution. I use a similar approach in examples/pool_reuse.pl in the 'threads' distribution. Show quoted text
> my $item = $q->dequeue; > last if eval { $item->isa("Thread::Queue::Stop") };
... Show quoted text
> # Signal no more work is coming > my $stop_flag = bless {}, "Thread::Queue::Stop"; > $q->enqueue($stop_flag);
I like this concept of constructing a specific termination item. Quite clever. For more threads, you'd just use: $_->enqueue($stop_flag) foreach (threads->list(threads::running)); As to your patch, I'm concerned about a few issues I noted. Show quoted text
> I had done() issue a cond_broadcast(). I've never gotten > into the cond_blah stuff, so please triple check that > work.
This is inappropriate. cond_broadcast() wakes up all threads which can then all attempt to dequeue items at the same time and hence cause data integrity problems (e.g., more than one thread getting the same queue item). The fix is to use cond_signal() and to also put a test on should_block on the cond_signal call in dequeue(): cond_signal(%$self) if ((@$queue > $count) || ! $self->should_block); In 'dequeue' you have: cond_wait(%$self) while ($self->should_block && @$queue < $count); If the queue is set to 'done()' before the $count requirement is met, then a blocked thread will not get the correct number of items it is expecting. This means the programmer needs to check for this, and take corrective action. This is conceptually similar to the objections you brought up in the first place. (I.e., if the programmer is not *careful*, the program will go awry.) This is not a bad thing per se, but it does illustrate that no matter what is conceived of, the programmer will still be responsible for programming things correctly. Then in your merged _dequeue_nb() you have: push(@items, shift(@$queue)) for (1..$count); If $count is greater than the size of the queue, then 'undef's will be added to the queue which again may (or may not) be problematic. That is why the two dequeue methods were separate. I think I will eliminate the should_block() call. I cannot conceive of a significant situation when a programmer would need to sent a queue to non-blocking and then back again. All told, thanks for the ideas and the code. I will work it into an update and send it to you under separate cover for your further comment.
On Tue Oct 23 15:29:40 2012, JDHEDDEN wrote: Show quoted text
> On 2012-10-22 15:44:26, MSCHWERN wrote:
> > Your solution only works if there's one thread. D'oh! > > The first thread dequeues the magic value and the rest > > hang. > > > > I couldn't figure out how to do it. You, the author, made > > a mistake doing it. The original SYNOPSIS didn't think of > > it. I think that's a pretty solid argument for providing > > a built in, explicit, safe, no-edge case way to solve this > > problem.
> > No, I didn't make a mistake. My example was simplistic with > only one thread. If you have more, then you send more > 'undef's - one (or more as appropriate) per thread. For > example: > > $_->enqueue(undef) foreach (threads->list(threads::running)); > > Since each thread will only dequeue one 'undef' to stop > work, this is the solution. I use a similar approach in > examples/pool_reuse.pl in the 'threads' distribution.
... Show quoted text
> I like this concept of constructing a specific termination > item. Quite clever. For more threads, you'd just use: > > $_->enqueue($stop_flag) foreach (threads->list(threads::running));
I hope you'll agree, it's not a very safe solution. You have to know A) how many threads are running and B) what their behavior will be. A has a race condition where if a thread starts in the middle of that loop, you're hosed. You have to patch it up with something like... $_->enqueue($stop_flag) for threads->list(threads::running) while(threads->list(threads::running)); But I'm not even sure that's always reliable. B relies on the queuer having intimate knowledge of how each worker behaves. Each worker must always pull a fixed and known number of items off the queue before exiting. Consider the following... sub work { my $item1 = $q->dequeue; ...do some work on $item1... my $item2 = $q->dequeue; ...do some work on $item1 and $item2... } Is the thread blocked on the first one or the second one? How many do you put on the queue? Sure, you might say "why would you do that when you can my @items = $q->dequeue(2); but somebody will do it. Here's maybe a more practical example. Two different sets of workers, one dequeues 1 at a time, one dequeues 3 at a time. threads::running only tells you how many workers you have, not which type they are. Either you need to add bookkeeping or you need to feed threads::running * 3 elements onto the queue. And hope the race condition doesn't start again. What if there's two sets of workers, each talking to a different queue? Now you need lists of what workers are on what queue. What if there's a third set not talking to any queue at all? What if there's a fourth talking to two queues, one is exhausted and one which is not? It might eat your stop flags before other threads get to see them, causing some threads to never unblock. If the queue doesn't have total control of which threads are working on their queue I don't think there's ANY reliable way to guarantee your workers will unblock. You wind up shovelling stop flags onto the queue as fast as possible and hope you can outrun the potential race conditions. More likely a user doesn't think about any of this. "Stop on a magic flag off the queue" requires too much careful coordination between the workers and the queue manager. It's too easy to get into a state where workers exhaust the stop flag (or forget to look for it) and wind up blocking. Doing it right involves a lot of careful consideration and a lot of code, which means a lot of people are going to get it subtly wrong. Show quoted text
> As to your patch, I'm concerned about a few issues I noted. >
> > I had done() issue a cond_broadcast(). I've never gotten > > into the cond_blah stuff, so please triple check that > > work.
> > This is inappropriate. cond_broadcast() wakes up all > threads which can then all attempt to dequeue items at the > same time and hence cause data integrity problems (e.g., > more than one thread getting the same queue item). > > The fix > is to use cond_signal() and to also put a test on > should_block on the cond_signal call in dequeue(): > > cond_signal(%$self) if ((@$queue > $count) || ! $self->should_block);
I'm a bit nervous about this. It seems to rely on done() issuing cond_signal, getting picked up by one thread which then wakes up and issues a cond_signal to the next thread which then wakes up and issues a cond_signal to the next thread... a chain which could be broken. A single "wake everybody up" call seems more reliable. Shouldn't the lock on %$self in dequeue() protect against multiple threads trying to pull data from it at once? If not, would a second lock inside _dequeue_nb() cover that? I wrote up some test code to try and make a collision but couldn't get one with either my patch or 3.01. https://gist.github.com/3942210 Show quoted text
> In 'dequeue' you have: > > cond_wait(%$self) while ($self->should_block && @$queue < $count); > > If the queue is set to 'done()' before the $count > requirement is met, then a blocked thread will not get the > correct number of items it is expecting. This means the > programmer needs to check for this, and take corrective > action. This is conceptually similar to the objections you > brought up in the first place. (I.e., if the programmer is > not *careful*, the program will go awry.) This is not a bad > thing per se, but it does illustrate that no matter what is > conceived of, the programmer will still be responsible for > programming things correctly.
You're right, and "bad data" is a more prosaic sort of failure already present, anybody can put bad data onto the queue, and MUCH easier to debug than concurrency bugs. One way to mitigate it would be to change dequeue() so it throws an exception if not enough items are pulled off the queue. Then the developer can't miss the mistake. Also it's impossible to tell "bad data in the queue" from "the queue is exhausted" otherwise. Checking $q->pending after $q->dequeue invites a race condition. I like that idea. dequeue() on an exhausted queue is always going to return a single scalar undef or fixed size list possibly containing some undefs (depending on how you asked for them). While dequeue_nb() is going to return an empty list. I think this consistent with how these methods work normally, dequeue() is always going to give you a fixed list while dequeue_nb() gives you whatever's left. Show quoted text
> Then in your merged _dequeue_nb() you have: > > push(@items, shift(@$queue)) for (1..$count); > > If $count is greater than the size of the queue, then > 'undef's will be added to the queue which again may (or may > not) be problematic. That is why the two dequeue methods > were separate.
Assuming the queue is locked during the operation (ie. your cond fixes are applied) this is accounted for. dequeue() will always return a fixed size list. dequeue_nb() will knock the count down to what's left in the queue... # If there's not enough left in the queue, return what's left. $count = @$queue if @$queue < $count; This is the same as their current behaviors which I just left alone. If it makes you more comfortable to keep them separate but a bit redundant that's fine. Show quoted text
> I think I will eliminate the should_block() call. I cannot > conceive of a significant situation when a programmer would > need to sent a queue to non-blocking and then back again.
I'm cool with eliminating it from the public API. Show quoted text
> All told, thanks for the ideas and the code. I will work it > into an update and send it to you under separate cover for > your further comment.
This reply is already too huge. Let me do that in a separate reply.
I put your 3.01 tarball into my local git repo so I can compare it a bit easier. Aside from the changes to how threads are unblocked discussed earlier, the major issue I have is adding the restriction that no more values can be added to a queue after it's been declared done. I can see why you'd want it either way, so I'd suggest having two modes. Both unblock the queue, but one prevents adding anything more and the other does not. I like "close" rather than "end", the queue is closed to new input, but no biggie. "Unblock" is the other mode, same as "done", it just unblocks the queue but its still open to input. Being closed implies being unblocked. I also don't see why these states cannot be flipped. There's no internal reason why not, and it provides the user with more flexibility to do things we might not imagine. I can imagine some things I'd do with it. Consider having a data input that comes in waves. Sometimes you have a ton of data, sometimes you have a trickle. When a big chunk comes you might want to spin up a lot of workers. When it abates you might want to set the queue to "unblocked" so workers can exit. Later when data floods in again you might want to set the queue to blocked again so workers stick around. This might be better done with a separate worker manager monitoring the queue, but that adds a whole additional subsystem and its nice to have MTOWTDI. If you don't want to worry about that just yet, end/close could be added alone with the semantics you've provided and done/unblock added later. Other than that, some minor bits... * The note about the effect of end/done on dequeue was removed. It really needs to be mentioned there in the dequeue() docs because it changes the behavior of dequeue(). It's where the user is going to be looking for information about dequeue(), and not in a disjoint section at the bottom of the documentation. * Calling the flag ENDED() is disjoint from everything similar in the API like pending(). It should be ended(). Also the docs state ENDED is a method, but there is no ENDED method. It should *not* be a hash key the user grabs at, there's no reason to expose the object internals. * The EXAMPLE was removed. I think it should go back. Its a useful and more realistic example than the SYNOPSIS. It shows managing multiple threads as well as using illustrating print statements so the user can trace what's going on while running the code. And because it doesn't have to try and show every feature it is clearer. * More thoughts on dequeue's behavior on an unblocked queue... I think there's THREE outcomes of dequeue() when unblocked, particularly dequeue($n > 1). 1) Here is the data you requested. 2) There is no more data. 3) You requested more data than is on the queue. Currently you cannot safely distinguish these states. I'm thinking 1) a scalar or list like normal, 2) empty list, 3) an exception. That makes all these do the right thing. eval { while(my @items = $q->dequeue($n)) { ... } } if( $@ ) { ...handle there not being enough in the queue... } # Important that it works with both styles while(my $item = $q->dequeue) { ... } while(my($item) = $q->dequeue) { ... } Normally they'll loop. When the queue is exhausted they'll exit. If there's not enough in the queue (only possible with the first one) they get an exception. No special checks for undef or the number of items returned necessary.
Subject: Re: [rt.cpan.org #79733] Idea: queue non-blocking option
Date: Tue, 23 Oct 2012 21:09:10 -0400
To: bug-Thread-Queue [...] rt.cpan.org
From: "Jerry D. Hedden" <jdhedden [...] cpan.org>
Show quoted text
> * Calling the flag ENDED() is disjoint from everything > similar in the API like pending(). It should be ended(). > Also the docs state ENDED is a method, but there is no > ENDED method. It should *not* be a hash key the user > grabs at, there's no reason to expose the object > internals.
Thanks for pointing this out. I've removed it, and changed the ->pending() method as follows: =item ->pending() Returns the number of items still in the queue. Returns C<undef> if the queue has been ended (see below), and there are no more items in the queue. (Yes, I'm sure you're cringing at this, but I think it makes sense.) Show quoted text
> * The EXAMPLE was removed. I think it should go back. > Its a useful and more realistic example than the SYNOPSIS. > It shows managing multiple threads as well as using > illustrating print statements so the user can trace what's > going on while running the code. And because it doesn't > have to try and show every feature it is clearer.
There's an 'examples' sub-dir in the CPAN distribution. I'm updated examples/queue.pl to include the new ->end() method. I don't feel that the POD is a good place for long stretches of example code, and the SYNOPSIS is just that - a synopsis - code fragments to give you a clue and refresh your memory after you've read to docs. I appreciate your passion for wanting to improve the module. However, the first step is to get this by all of the p5p people. Thread::Queue is a core module - I don't really own it. The fact that the structure of the queue object is changing is huge (IMO), and may be a deal breaker for getting this through. (All the arguments about bad practices aside.) (Then again, it might be that no one takes notice or cares.) And I am in no way trying to be flippant here - I do respect your ideas - but you could also release your own module to CPAN that replaces Thread::Queue or extends it. For example, I like your Thread::Queue::Stop idea. It's a shame it wasn't there from the start. I'd like to see dequeue_nb() changed to return it instead of undef, and similarly have dequeue() return it when the queue is ended. That would allow undef to be used as data so-to-speak. But of course such a change could not be made to Thread::Queue otherwise the entirety of the known universe would cease to exist.
Subject: Re: [rt.cpan.org #79733] Idea: queue non-blocking option
Date: Wed, 24 Oct 2012 17:52:17 -0700
To: bug-Thread-Queue [...] rt.cpan.org
From: Michael G Schwern <schwern [...] pobox.com>
On 2012.10.23 6:09 PM, Jerry D. Hedden via RT wrote: Show quoted text
> <URL: https://rt.cpan.org/Ticket/Display.html?id=79733 > >
>> * Calling the flag ENDED() is disjoint from everything >> similar in the API like pending(). It should be ended(). >> Also the docs state ENDED is a method, but there is no >> ENDED method. It should *not* be a hash key the user >> grabs at, there's no reason to expose the object >> internals.
> > Thanks for pointing this out. I've removed it, and changed > the ->pending() method as follows: > > =item ->pending() > > Returns the number of items still in the queue. Returns > C<undef> if the queue has been ended (see below), and there > are no more items in the queue. > > (Yes, I'm sure you're cringing at this,
Yep, actually said "No! No! Oh god no!" to myself in my kitchen when I read it. :) Show quoted text
> but I think it makes sense.)
Tell me why this make sense? Show quoted text
>> * The EXAMPLE was removed. I think it should go back. >> Its a useful and more realistic example than the SYNOPSIS. >> It shows managing multiple threads as well as using >> illustrating print statements so the user can trace what's >> going on while running the code. And because it doesn't >> have to try and show every feature it is clearer.
> > There's an 'examples' sub-dir in the CPAN distribution.
Most users don't even know that exists because they do CPAN shell installs, I never noticed it and I'm patching the distribution! They don't survive install which gives them a user a fleeting moment to consult them. Consider moving them to the POD. Show quoted text
> I'm > updated examples/queue.pl to include the new ->end() method. > I don't feel that the POD is a good place for long stretches > of example code
Examples in POD may not be perfect, much easier to test files on disk, but at least it will be seen. Perfect documentation which is never read might as well not exist. Perhaps mention the examples directory in the POD so when a user searches for "example" they'll see that and know it exists. Show quoted text
> I appreciate your passion for wanting to improve the module. > However, the first step is to get this by all of the p5p > people. Thread::Queue is a core module - I don't really own > it. The fact that the structure of the queue object is > changing is huge (IMO), and may be a deal breaker for > getting this through. (All the arguments about bad > practices aside.) (Then again, it might be that no one > takes notice or cares.)
I checked the logs. p5p hasn't touched TQ since it was split out of threads::shared 10 years ago. They've just been picking up your releases since then. You own it. Thread::Queue is an object and it has a very neat and well documented API. There is no documented exposure of the underlying data structure, and the API is very good so there's no need to grab at it, so there is no real risk in changing the underlying structure. If you bring this up on p5p people will whinge about what if there's code out there which assumed the queue was an array and you'll have a big, unnecessary argument. Really this has all already been resolved: if it's not documented it doesn't exist. I have a wonderful quote from Tom Christiansen on this... You are wicked and wrong to have broken inside and peeked at the implementation and then relied upon it. -- tchrist in <31832.969261130@chthon> p5p has as much power as you give them, mainly in how much you attract their attention. So long as you're not breaking compatibility, just make a CPAN release and they'll update it like they have in the past. Mention it in the change log. That's my advice. Show quoted text
> And I am in no way trying to be flippant here - I do respect > your ideas - but you could also release your own module to > CPAN that replaces Thread::Queue or extends it.
I don't think we're anywhere near the need to fork the module. Please don't take my vigorous discussions as real dissatisfaction. It's enthusiasm and a desire to get it right so the user doesn't have to be careful. That philosophy of module design may be the real difference here? Also I know jack all about threads. Show quoted text
> For > example, I like your Thread::Queue::Stop idea. It's a shame > it wasn't there from the start. I'd like to see > dequeue_nb() changed to return it instead of undef, and > similarly have dequeue() return it when the queue is ended. > That would allow undef to be used as data so-to-speak. But > of course such a change could not be made to Thread::Queue > otherwise the entirety of the known universe would cease to > exist.
While I agree that allowing undef as data is important, I still think stop flags are a bad idea. Continuously returning one only solves one half of that problem, the worker still needs a bunch of code to detect the stop flag which the user is likely to forget. List vs empty list covers the problem well. The snag being that dequeue($n=1) always returns a single scalar, there's no way to use it in list return mode. -- 125. Two drink limit does not mean two kinds of drinks. -- The 213 Things Skippy Is No Longer Allowed To Do In The U.S. Army http://skippyslist.com/list/
3.01 on CPAN