/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2009, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

// $Id: MailBox.scala 16893 2009-01-13 13:09:22Z cunei $


package scala.concurrent

/** This class ...
 *
 *  @author  Martin Odersky
 *  @version 1.0, 12/03/2003
 */
//class MailBox with Monitor with LinkedListQueueCreator {
class MailBox extends AnyRef with ListQueueCreator {

  type Message = AnyRef

  private abstract class PreReceiver {
    var msg: Message = null
    def isDefinedAt(msg: Message): Boolean
  }

  private class Receiver[A](receiver: PartialFunction[Message, A]) extends PreReceiver {

    def isDefinedAt(msg: Message) = receiver.isDefinedAt(msg)

    def receive(): A = synchronized {
      while (msg eq null) wait()
      receiver(msg)
    }

    def receiveWithin(msec: Long): A = synchronized {
      if (msg eq null) wait(msec)
      receiver(if (msg ne null) msg else TIMEOUT)
    }
  }

  private val messageQueue = queueCreate[Message]
  private val receiverQueue = queueCreate[PreReceiver]

  /** Unconsumed messages. */
  private var sent = messageQueue.make

  /** Pending receivers. */
  private var receivers = receiverQueue.make

  /**
  * Check whether the receiver can be applied to an unconsumed message.
  * If yes, the message is extracted and associated with the receiver.
  * Otherwise the receiver is appended to the list of pending receivers.
  */
  private def scanSentMsgs[A](receiver: Receiver[A]): Unit = synchronized {
    messageQueue.extractFirst(sent, msg => receiver.isDefinedAt(msg)) match {
      case None =>
        receivers = receiverQueue.append(receivers, receiver)
      case Some((msg, withoutMsg)) =>
        sent = withoutMsg
        receiver.msg = msg
    }
  }

  /**
  * First check whether a pending receiver is applicable to the sent
  * message. If yes, the receiver is notified. Otherwise the message
  * is appended to the linked list of sent messages.  
  */
  def send(msg: Message): Unit = synchronized {
    receiverQueue.extractFirst(receivers, r => r.isDefinedAt(msg)) match {
      case None =>
        sent = messageQueue.append(sent, msg)
      case Some((receiver, withoutReceiver)) =>
        receivers = withoutReceiver
        receiver.msg = msg
        receiver synchronized { receiver.notify() }
    }
  }

  /**
  * Block until there is a message in the mailbox for which the processor
  * <code>f</code> is defined.
  */
  def receive[A](f: PartialFunction[Message, A]): A = {
    val r = new Receiver(f)
    scanSentMsgs(r)
    r.receive()
  }

  /**
  * Block until there is a message in the mailbox for which the processor
  * <code>f</code> is defined or the timeout is over.
  */
  def receiveWithin[A](msec: Long)(f: PartialFunction[Message, A]): A = {
    val r = new Receiver(f)
    scanSentMsgs(r)
    r.receiveWithin(msec)
  }

}

/////////////////////////////////////////////////////////////////

/**
* Module for dealing with queues.
*/
trait QueueModule[A] {
  /** Type of queues. */
  type T
  /** Create an empty queue. */
  def make: T
  /** Append an element to a queue. */
  def append(l: T, x: A): T
  /** Extract an element satisfying a predicate from a queue. */
  def extractFirst(l: T, p: A => Boolean): Option[(A, T)]
}

/** Inefficient but simple queue module creator. */
trait ListQueueCreator {
  def queueCreate[A]: QueueModule[A] = new QueueModule[A] {
    type T = List[A]
    def make: T = Nil
    def append(l: T, x: A): T = l ::: x :: Nil
    def extractFirst(l: T, p: A => Boolean): Option[(A, T)] =
      l match {
        case Nil => None
        case head :: tail =>
          if (p(head))
            Some((head, tail))
          else
            extractFirst(tail, p) match {
              case None => None
              case Some((x, without_x)) => Some((x, head :: without_x))
            }
      }
  }
}

/** Efficient queue module creator based on linked lists. */
trait LinkedListQueueCreator {
  import scala.collection.mutable.LinkedList
  def queueCreate[A >: Null <: AnyRef]: QueueModule[A] = new QueueModule[A] {
    type T = (LinkedList[A], LinkedList[A]) // fst = the list, snd = last elem
    def make: T = {
      val l = new LinkedList[A](null, null)
      (l, l)
    }
    def append(l: T, x: A): T = {
      val atTail = new LinkedList(x, null)
      l._2 append atTail;
      (l._1, atTail)
    }
    def extractFirst(l: T, p: A => Boolean): Option[(A, T)] = {
      var xs = l._1
      var xs1 = xs.next
      while ((xs1 ne null) && !p(xs1.elem)) {
        xs = xs1
        xs1 = xs1.next
      }
      if (xs1 ne null) {
        xs.next = xs1.next
        if (xs.next eq null)
          Some((xs1.elem, (l._1, xs)))
        else
          Some((xs1.elem, l))
      }
      else
        None
    }
  }
}