//
// $Source: /nfs/elm/d3/home/cur/djb1/java/javat/RCS/Object_Channel.java,v $
//
// $Id: Object_Channel.java,v 1.1 1997/06/23 17:04:34 djb1 Exp $
//
// Object_Channels - occam/CSP CHANs of Objects
//
// (C) Copyright 1997 Peter Welch <P.H.Welch@ukc.ac.uk>
// University of Kent at Canterbury
//

// package occam;

public class Object_Channel {

  /*
    
  Documentation
  
  Channel extends an occam CHAN OF OBJECT for a single reader and multiple
  writers.
  
  It now allows ALTing by that single reader (multiple writers are allowed but
  cannot themselves ALT).  This implementation follows closely the transputer
  implementation -- ALTing is passive (i.e. no busy-waits).
  
  There is full synchronisation between a reading and writing thread.  Any
  thread may read or write on this Channel.  Multiple writers are queued,
  allowing only one at a time to find a reader.  Only a single reader is
  catered for and this may back off if it selects another Channel.  Writers
  are not allowed to back off.  A reader only completes when it finds a writer.
  A writer only completes when it gets to the front of its queue and finds
  a reader.
  
  There is no logical buffering of data in the Channel.
  
  Note: only the `read' and `write' methods are public.  The `enable' and
  `disable' Channels are called only from an Alternative object (by the
  ALTing process).  It is deliberate that `disable' is not synchronised.
  
  */

  // local state
  private Object channel_hold;             // buffer (not detectable to users)
  
  protected boolean channel_empty = true;  // synchronisation flag
  
  private Object write_monitor =           // all writers multiplex through this
    new Object ();
  
  private Alternative alt;                 // state of reader
  
  
  public synchronized Object read () throws InterruptedException {
    if (channel_empty) {
      channel_empty = false;           // first to the rendezvous
      wait ();                         // wait for the writer thread
      notify ();                       // schedule the writer to finish
    } else {
      channel_empty = true;            // second to the rendezvous
      notify ();                       // schedule the waiting writer thread
    }
    return channel_hold;
  }
  
  
  public void write (Object n) throws InterruptedException {
    synchronized (write_monitor) {
      synchronized (this) {
        Alternative tmp_alt = alt;         // avoid race-hazard on the volatile alt
        channel_hold = n;
        if (tmp_alt != null) {             // a reader was ALTing on this Channel
          channel_empty = false;           // first to the rendezvous
          tmp_alt.schedule ();             // tell the reader we are here
          wait ();                         // wait for the reader thread
        } else if (channel_empty) {
          channel_empty = false;           // first to the rendezvous
          wait ();                         // wait for the reader thread
        } else {
          channel_empty = true;            // second to the rendezvous
          notify ();                       // schedule the waiting reader thread
          wait ();                         // let the reader regain this monitor
        }
      }
    }
  }


  synchronized boolean enable (Alternative alt) {
    if (channel_empty) {
      this.alt = alt;
      return false;
    } else {
      return true;
    }
  }

  boolean disable () {
    alt = null;
    return !channel_empty;
  }
}

