B424 Parallel and Distributed Programming

Monitors

Data abstraction mechanisms (Brinch-Hansen and Hoare 1974).

Object-oriented data structure encapsulating the protection of acritical section.

The monitor class contains the definition of the shared variable.

The only access to the shared variable is through class methods. All of them are protected by mutex/semaphores which are also part of the class.

A monitor can also include queues of processes/threads demanding access to the shared resources.

Generic Monitor

monitor name {
 private:
  declaration of shared variables;
  declaration of synchronization tools;
 public:
  class methods;
};

Monitor Invariant

A Monitor Invariant (MI) specifies the acceptable states of a monitor as a condition on the shared variables that must be true when no thread is accessing them.

The MI has to be true:

The MI must be proven to hold after the initialization, at the end of every method.

The MI can be assumed to be true at the beginning of a method.

For the bounded buffer: (MI) 0 <= count <= n.

monitor Bounded_buffer {
 private:
  type buf[n];
  int count;
  pthread_cond_t notFull, notEmpty;
  pthread_mutex_t lockFull, lockEmpty, lockBuf;
 public:
  void init();
  void deposit(type data);
  void fetch(type &result);
};

void Bounded_buffer::deposit(type data)
{
  pthread_mutex_lock(&lockFull);
  while (count == n)
    pthread_cond_wait(¬Full, &lockFull);
  pthread_mutex_lock(&lockBuf); buf[count++] = data;
  pthread_mutex_unlock(&lockBuf);
  pthread_mutex_unlock(&lockFull);
  pthread_mutex_lock(&lockEmpty);
  pthread_cond_signal(¬Empty);
  pthread_mutex_unlock(&lockEmpty);
}

void Bounded_buffer:: fetch(type &result)
{
  pthread_mutex_lock(&lockEmpty);
  while (count == 0)
    pthread_cond_wait(¬Empty, &lockEmpty);
  pthread_mutex_lock(&lockBuf);
  result = buf[--count];
  pthread_mutex_unlock(&lockBuf);
  pthread_mutex_unlock(&lockEmpty);
  pthread_mutex_lock(&lockFull);
  pthread_cond_signal(¬Full);
  pthread_mutex_unlock(&lockFull);
}

Shortest Job Next

A monitor implementing a priority conditional variable (scheduler). The goal is to allocate a shared resource such that those who need it the least amount of time get it first.

The threads requesting access to the conditional variable must do so based on the time that they need the resource for.

This requires a priority queue. We also need it to be FIFO for objects of the same priority (sorted array + insertion sort).

monitor Shortest_job_next {
 private:
  PQueue turn;
  bool free;
  pthread_cond_t cond[nr_threads];
  pthread_mutex_t lock;
 public:
  void wait(int id, int time);
  void signal(id);
  void request(int id, int time);
  void release();
  inline int minrank() { return turn.minrank() };
};

Invariant:
(turn is ordered by time) &
(free => turn is empty)

  
void Shortest_job_next::wait(int id, int time)
{
  pthread_mutex_lock(&lock);
  turn.enqueue(id, time); pthread_mutex_unlock(&lock);
  pthread_cond_wait(&(cond[id]), ...);
}

void Shortest_job_next::signal(int id)
{
  pthread_cond_signal(&(cond[id]));
}

void Shortest_job_next::request(int id, int time)
{
  pthread_mutex_lock(&lock);
  if (free) {
    free = false;
    pthread_mutex_unlock(&lock);
  }
  else {
    pthread_mutex_unlock(&lock);
    wait(id, time);
  }
}

void Shortest_job_next:: release() 
{
  pthread_mutex_lock(&lock);
  if (turn.is_empty()) {
    free = true;
    pthread_mutex_unlock(&lock);
  }
  else {
    int id = turn.dequeue();
    pthread_mutex_unlock(&lock);
    signal(id);
  }
}

Interval Timer

A timer that allows threads to sleep for a specified number of time units.

Must implement the function delay(interval).

First version: using a variable tod (time of day) that increases periodically. The delay function must wait until tod has increased interval number of times.

Second version: using a priority conditional variable (see previous example).

monitor Timer {
private:
  int tod=0;
  pthread_cond_t check; pthread_mutex_t lock;
 public:
  void delay(int interval) 
  {
    pthread_mutex_lock(&lock);
    int wake_time = tod + interval;
    while(wake_time > tod)
      pthread_cond_wait(&check, &lock);
    pthread_mutex_unlock(&lock);
  }

  void tick() 
  {
    pthread_mutex_lock(&lock);
    tod = tod + 1;
    pthread_cond_bcast(&check);
    pthread_mutex_unlock(&lock);
  }
};

Second solution:

monitor Timer {
private:
  int tod=0; SNJ check; pthread_mutex_t lock;
 public:
  void delay(int interval, int id) {
    pthread_mutex_lock(&lock);
    int wake_time = tod + interval;
    pthread_mutex_unlock(&lock);
    if (wake_time > tod)
      check.wait(id, wake_time);
  }

  void tick() {
    pthread_mutex_lock(&lock);
    tod = tod + 1;
    pthread_mutex_unlock(&lock);
    while(check.minrank()<=tod)
      check.release();
  }
};