NAME
   Coro::MP - erlang-style multi-processing/message-passing framework

SYNOPSIS
      use Coro::MP;

      # exports everything that AnyEvent::MP exports as well.
      # new stuff compared to AnyEvent::MP:

      # creating/using ports from threads
      my $port = port_async {
         # thread context, $SELF is set to $port

         # returning will "kil" the $port with an empty reason
      };

      # attach to an existing port
      spawn $NODE, "::initfunc";
      sub ::initfunc {
         rcv_async $SELF, sub {
            ...
         };
      }

      # simple "tag" receives:
      my ($pid) = get "pid", 30
         or die "no pid message received after 30s";

      # conditional receive
      my ($tag, @data) = get_cond { $_[0] =~ /^group1_/ };
      my @next_msg = get_cond { 1 } 30; # 30s timeout

      # run thread in port context
      peval_async $port, {
         die "kill the port\n";
      };

      # synchronous "cal"
      my @retval = syncol 30, $port, tag => $data;

DESCRIPTION
   This module (-family) implements a simple message passing framework.

   Despite its simplicity, you can securely message other processes running
   on the same or other hosts, and you can supervise entities remotely.

   This module depends heavily on AnyEvent::MP, in fact, many functions
   exported by this module are identical to AnyEvent::MP functions. This
   module family is simply the Coro API to AnyEvent::MP.

   Care has been taken to stay compatible with AnyEvent::MP, even if
   sometimes this required a less natural API ("spawn" should indeed spawn
   a thread, not just call an initfunc for example).

   For an introduction to AnyEvent::MP, see the AnyEvent::MP::Intro manual
   page.

VARIABLES/FUNCTIONS
   NODE, $NODE, node_of, configure
   $SELF, *SELF, SELF, %SELF, @SELF...
   snd, mon, kil, psub
       These variables and functions work exactly as in AnyEvent::MP, in
       fact, they are exactly the same functions, and are used in much the
       same way.

   rcv This function works exactly as "AnyEvent::MP::rcv", and is in fact
       compatible with Coro::MP ports. However, the canonical way to
       receive messages with Coro::MP is to use "get" or "get_cond".

   port
       This function is exactly the same as "AnyEvent::MP::port" and
       creates new ports. You can attach a thread to them by calling
       "rcv_async" or you can do a create and attach in one operation using
       "port_async".

   peval
       This function works exactly as "AnyEvent::MP::psub" - you could use
       it to run callbacks within a port context (good for monitoring), but
       you cannot "get" messages unless the callback executes within the
       thread attached to the port.

       Since creating a thread with port context requires somewhta annoying
       syntax, there is a "peval_async" function that handles that for you
       - note that within such a thread, you still cannot "get" messages.

   spawn
       This function is identical to "AnyEvent::MP::spawn". This means that
       it doesn't spawn a new thread as one would expect, but simply calls
       an init function. The init function, however, can attach a new
       thread easily:

          sub initfun {
             my (@args) = @_;

             rcv_async $SELF, sub {
                # thread-code
             };
          }

   cal This function is identical to "AnyEvent::MP::cal". The easiest way
       to make a synchronous call is to use Coro's rouse functionality:

          # send 1, 2, 3 to $port and wait up to 30s for reply
          cal $port, 1, 2, 3, rouse_cb, 30;
          my @reply = rouse_wait;

       You can also use "syncal" if you want, and are ok with learning yet
       another function with a weird name:

          my @reply = syncal 30, $port, 1, 2, 3;

   $local_port = port_async { ... }
       Creates a new local port, and returns its ID. A new thread is
       created and attached to the port (see "rcv_async", below, for
       details).

   rcv_async $port, $threadcb
       This function creates and attaches a thread on a port. The thread is
       set to execute $threadcb and is put into the ready queue. The thread
       will receive all messages not filtered away by tagged receive
       callbacks (as set by "AnyEvent::MP::rcv") - it simply replaces the
       default callback of an AnyEvent::MP port.

       The special variable $SELF will be set to $port during thread
       execution.

       When $threadcb returns or the thread is canceled, the return/cancel
       values become the "kil" reason.

       It is not allowed to call "rcv_async" more than once on a given
       port.

   @msg = get $tag
   @msg = get $tag, $timeout
       Find, dequeue and return the next message with the specified $tag.
       If no matching message is currently queued, wait up to $timeout
       seconds (or forever if no $timeout has been specified or it is
       "undef") for one to arrive.

       Returns the message with the initial tag removed. In case of a
       timeout, the empty list. The function *must* be called in list
       context.

       Note that empty messages cannot be distinguished from a timeout when
       using "rcv".

       Example: send a "log" message to $SELF and then get and print it.

          snd $SELF, log => "text";
          my ($text) = get "log";
          print "log message: $text\n";

       Example: receive "p1" and "p2" messages, regardless of the order
       they arrive in on the port.

          my @p1 = get "p1";
          my @21 = get "p2";

       Example: assume a message with tag "now" is already in the queue and
       fetch it. If no message was there, do not wait, but die.

          my @msg = get "now", 0
             or die "expected now emssage to be there, but it wasn't";

   @msg = get_cond { condition... } [$timeout]
       Similarly to "get", looks for a matching message. Unlike "get",
       "matching" is not defined by a tag alone, but by a predicate, a
       piece of code that is executed on each candidate message in turn,
       with @_ set to the message contents.

       The predicate code is supposed to return the empty list if the
       message didn't match. If it returns anything else, then the message
       is removed from the queue and returned to the caller.

       In addition, if the predicate returns a code reference, then it is
       immediately called invoked on the removed message.

       If a $timeout is specified and is not "undef", then, after this many
       seconds have been passed without a matching message arriving, the
       empty list will be returned.

       Example: fetch the next message, wait as long as necessary.

          my @msg = get_cond { 1 };

       Example: fetch the next message whose tag starts with "group1_".

          my ($tag, @data) = get_cond { $_[0] =~ /^group1_/ };

       Example: check whether a message with tag "child_exit" and a second
       elemet of $pid is in the queue already.

          if (
             my (undef, $pid, $status) =
                get_cond {
                   $_[0] eq "child_exit" && $_[1] == $pid
                } 0
          ) {
             warn "child $pid did exit with status $status\n";
          }

       Example: implement a server that reacts to "log", "exit" and
       "reverse" messages, and exits after 30 seconds of idling.

          my $reverser = port_async {
             while() {
                get_cond {
                   $_[0] eq "exit" and return sub {
                      last; # yes, this is valid
                   };
                   $_[0] eq "log" and return sub {
                      print "log: $_[1]\n";
                   };
                   $_[0] eq "reverse" and return sub {
                      my (undef, $text, @reply) = @_;
                      snd @reply, scalar reverse $text;
                   };

                   die "unexpected message $_[0] received";
                } 30
                   or last;
             }
          };

   $async = peval_async { BLOCK }
       Sometimes you want to run a thread within a port context, for error
       handling.

       This function creates a new, ready, thread (using "Coro::async"),
       sets $SELF to the the current value of $SELF while it executing, and
       calls the given BLOCK.

       This is very similar to "psub" - note that while the BLOCK exeuctes
       in $SELF port context, you cannot call "get", as $SELF can only be
       attached to one thread.

       Example: execute some Coro::AIO code concurrently in another thread,
       but make sure any errors "kil" the originating port.

          port_async {
             ...
             peval_async {
                # $SELF set, but cannot call get etc. here

                my $fh = aio_open ...
                   or die "open: $!";

                aio_close $fh;
             };
          };

   @reply = syncal $port, @msg, $callback[, $timeout]
       The synchronous form of "cal", a simple form of RPC - it sends a
       message to the given $port with the given contents (@msg), but adds
       a reply port to the message.

       The reply port is created temporarily just for the purpose of
       receiving the reply, and will be "kil"ed when no longer needed.

       Then it will wait until a reply message arrives, which will be
       returned to the caller.

       If the $timeout is defined, then after this many seconds, when no
       message has arrived, the port will be "kil"ed and an empty list will
       be returned.

       If the $timeout is undef, then the local port will monitor the
       remote port instead, so it eventually gets cleaned-up.

       Example: call the string reverse example from "get_cond".

          my $reversed = syncal 1, $reverse, reverse => "Rotator";

SEE ALSO
   AnyEvent::MP::Intro - a gentle introduction.

   AnyEvent::MP - like Coro::MP, but event-based.

   AnyEvent.

AUTHOR
    Marc Lehmann <[email protected]>
    http://home.schmorp.de/