NAME
   Coro::ProcessPool - an asynchronous process pool

SYNOPSIS
       use Coro::ProcessPool;

       my $pool = Coro::ProcessPool->new(
           max_procs => 4,
           max_reqs  => 100,
       );

       my $double = sub { $_[0] * 2 };

       # Process in sequence
       my %result;
       foreach my $i (1 .. 1000) {
           $result{$i} = $pool->process($double, [$i]);
       }

       # Process as a batch
       my @results = $pool->map($double, 1 .. 1000);

       # Defer waiting for result
       my %deferred = map { $_ => $pool->defer($double, [$_]) } 1 .. 1000);
       foreach my $i (keys %deferred) {
           print "$i = " . $deferred{$i}->() . "\n";
       }

       # Use a "task class", implementing 'new' and 'run'
       my $result = $pool->process('Task::Doubler', 21);

       $pool->shutdown;

DESCRIPTION
   Processes tasks using a pool of external Perl processes.

METHODS
 new
   Creates a new process pool. Processes will be spawned as needed.

   max_procs
       This is the maximum number of child processes to maintain. If all
       processes are busy handling tasks, further calls to "process" in .
       will yield until a process becomes available. If not specified,
       defaults to the number of CPUs on the system.

   max_reqs
       If this is a positive number (defaults to 0), child processes will
       be terminated and replaced after handling "max_reqs" tasks. Choosing
       the correct value for "max_reqs" is a tradeoff between the need to
       clear memory leaks in the child process and the time it takes to
       spawn a new process and import any packages used by client code.

 process($f, $args, $timeout)
   Processes code ref $f in a child process from the pool. If $args is
   provided, it is an array ref of arguments that will be passed to $f.
   Returns the result of calling $f->(@$args).

   Alternately, $f may be the name of a class implementing the methods
   "new" and "run", in which case the result is equivalent to calling
   $f->new(@$args)->run(). Note that the include path for worker processes
   is identical to that of the calling process.

   This call will yield until the results become available. If all
   processes are busy, this method will block until one becomes available.
   Processes are spawned as needed, up to "max_procs", from this method.
   Also note that the use of "max_reqs" can cause this method to yield
   while a new process is spawned.

   A timeout may be optionally specified in fractional seconds. If
   specified, $timeout will cause "process" to croak if $timeout seconds
   pass an no process becomes available to handle the task.

   Note that the timeout only applies to the time it takes to acquire an
   available process. It does not watch the time it takes to perform the
   task.

 map($f, @args)
   Applies $f to each value in @args in turn and returns a list of the
   results. Although the order in which each argument is processed is not
   guaranteed, the results are guaranteed to be in the same order as @args,
   even if the result of calling $f returns a list itself (in which case,
   the results of that calcuation is flattened into the list returned by
   "map".

 defer($f, $args)
   Similar to "process" in ., but returns immediately. The return value is
   a code reference that, when called, returns the results of calling
   "$f-"(@$args)>.

       my $deferred = $pool->defer($coderef, [ $x, $y, $z ]);
       my $result   = $deferred->();

 queue($f, $args, $callback)
   Queues the execution of "$f-"(@$args)> and returns immediately. If
   $callback is specified and is a code ref, it will be called with the
   result of the executed code once the task is processed. Note that the
   callback is not provided with any identifying information about the task
   being executed. That is the responsibility of the caller. It is also the
   caller's responsibility to coordinate any code that depends on the
   result, for example using a condition variable.

       my $cv = AnyEvent->condvar;

       sub make_callback {
           my $n = shift;
           return sub {
               my $result = shift;
               print "2 * $n = $result\n";
               $cv->send;
           }
       }

       $pool->queue($doubler_function, [21], make_callback(21));
       $cv->recv;

 shutdown
   Shuts down all processes and resets state on the process pool. After
   calling this method, the pool is effectively in a new state and may be
   used normally.

A NOTE ABOUT IMPORTS AND CLOSURES
   Code refs are serialized using Storable to pass them to the worker
   processes. Once deserialized in the pool process, these functions can no
   longer see the stack as it is in the parent process. Therefore, imports
   and variables external to the function are unavailable.

   Something like this will not work:

       use Foo;
       my $foo = Foo->new();

       my $result = $pool->process(sub {
           return $foo->bar; # $foo not found
       });

   Nor will this:

       use Foo;
       my $result = $pool->process(sub {
           my $foo = Foo->new; # Foo not found
           return $foo->bar;
       });

   The correct way to do this is to import from within the function:

       my $result = $pool->process(sub {
           require Foo;
           my $foo = Foo->new();
           return $foo->bar;
       });

   ...or to pass in external variables that are needed by the function:

       use Foo;
       my $foo = Foo->new();

       my $result = $pool->process(sub { $_[0]->bar }, [ $foo ]);

 Use versus require
   The "use" pragma is run at compile time, whereas "require" is evaluated
   at runtime. Because of this, the use of "use" in code passed directly to
   the "process" method can fail because the "use" statement has already
   been evaluated when the calling code was compiled.

   This will not work:

       $pool->process(sub {
           use Foo;
           my $foo = Foo->new();
       });

   This will work:

       $pool->process(sub {
           require Foo;
           my $foo = Foo->new();
       });

   If "use" is necessary (for example, to import a method or transform the
   calling code via import), it is recommended to move the code into its
   own module, which can then be called in the anonymous routine:

       package Bar;

       use Foo;

       sub dostuff {
           ...
       }

   Then, in your caller:

       $pool->process(sub {
           require Bar;
           Bar::dostuff();
       });

 If it's a problem...
   Use the task class method if the loading requirements are causing
   headaches:

       my $result = $pool->process('Task::Class', [@args]);

COMPATIBILITY
   "Coro::ProcessPool" will likely break on Win32 due to missing support
   for non-blocking file descriptors (Win32 can only call "select" and
   "poll" on actual network sockets). Without rewriting this as a network
   server, which would impact performance and be really annoying, it is
   likely this module will not support Win32 in the near future.

   The following modules will get you started if you wish to explore a
   synchronous process pool on Windows:

   Win32::Process
   Win32::IPC
   Win32::Pipe

AUTHOR
   Jeff Ober <[email protected]>

LICENSE
   BSD License