Dana Vrajitoru
B424 Parallel and Distributed Computing

Pipeline in Message Passing Mode

The Bubble Sort

Sequential algorithm

for (i=n-1; i>0; i--)
   for (j=0; j<i; j++) 
      if (a[j] > a[j+1])
         Swap(a[j], a[j+1]); 

Parallel Bubble Sort

Local_loop(lsize)
{
   for (j = 0; j < lsize - 1; j++) 
      if (a[j] > a[j+1])
         Swap(a[j], a[j+1]);
}

Parallel_bubble_sort(id)
{
   lsize = n / nr_proc;
   for (i = 0; i < n; i++) {
      Exchange_with_previous(id);
      Local_loop(lsize);
      Exchange_with_next(id); 
   }
}

Exchange_with_previous(proc_id) {
   // prev_last: local here
   if (proc_id != 0) {
     Recv(prev_last, proc_id . 1);
     Send(a[0], proc_id . 1);
     if (prev_last > a[0])
        a[0] = prev_last;  
   }
}

Exchange_with_next(proc_id) {
   // next_first: local here
   if (proc_id != nr_proc-1) {
      Send(a[lsize-1], proc_id + 1);
      Recv(next_first, proc_id + 1);
      if (next_first < a[lsize-1])
         a[lsize-1] = next_first;
   } 
}

Odd-Even Sort

Odd_even_sort() 
{
   for (phase=0; phasea[i])
               Swap(a[i], a[i-1]); 
      }
      else { 
         for (i=1; ia[i+1])
               Swap(a[i], a[i+1]); 
      }
}

Local_loop(phase, id) {
   if (phase %2 ==0) { 
      if (a[id-1]>a[id])
          Swap(a[id], a[id-1]); 
   }
   else { 
      if (id < n-1 && a[id]>a[id+1])
         Swap(a[id], a[id+1]); 
      } 
   } 
}

// id: all the odd numbers between 1 and n-1
Parallel_odd_even_sort(id) {
   for (phase=0; phase<n; phase++) {
      Local_loop(phase, id);
      Barrier();
   }
}

Computing with Interruption


A divide and conquer model in which we may have to interrupt the execution of the loop based on a particular condition.

An example: the bubble sort.

Sequential algorithm
finished = 0;
while (! finished) {
  finished = 1;
  for (j=0; j<n; j++)
    if (a[j] > a[j+1]) {
      Swap(a[j], a[j+1]);
      finished = 0;
    }
}

Parallel algorithm: previous version:
Master_slave() {
  for (i=n-1; i>0; i--) {
    if (proc_id != 0)
      Exchange_left();
    Interior_loop();
    if (proc_id != nb_proc – 1)
      Exchange_right();
  }
}

We will have a variable tag that will have almost the same meaning as finished. Thus, tag is equal to:

The value of tag will be sent from one process to the next using the tag parameter, which means that in the Exchange_right, the tag will appear in the send as the tag parameter, and in the Exchange_left, in the receive, the tag parameter will be MPI_ANY_TAG, and we'll assign to the tag variable the value of status.MPI_TAG.

For example, in the Exchange_right function, we would have the following send:

MPI_Send(&(a[local_size-1]), 1, MPI_INT, proc_id + 1, tag, MPI_COMM_WORLD);

while in the Exchange_left we would have the following receive:

MPI_Recv(&prev_max, 1, MPI_INT, proc_id - 1, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
tag = status.MPI_TAG;

Note 1. In the function Exchange_left, if the transposition is to be done, the value of tag must also be updated. We don't need to do it in the Exchange_right function because the transposition is done after the value of the tag has been sent so the updated value for tag would be of no use.

Note 2. We must make sure that the last process is sending the information about the array being sorted to the first one only once. For this, we will have a variable stop_sent that will keep trace of the fact that the stop signal has been sent to the first process. Otherwise the last process may be sending the information to the first one more than once, and since the first process is only expecting it once, the last process will be blocked. See the pseudocode for the change.

Interior_loop() {
  for (j=0; j<n; j++)
    if (a[j] > a[j+1]) {
      Swap(a[j], a[j+1]);
      tag = 0;
    }
}

First_process() // proc_id == 0
{
  finished = 0;
  Initiate_receive(dummy, nb_proc-1);
  while (! finished) {
    tag = 1;
    Test_received();
    if (received) {
      tag = 2;
      finished = 1;
    }
    else
      Interior_loop();
    Exchange_right();
  }
}

Intermediate_process() // 0 < proc_id < nb_proc-1
{
  finished = 0;
  while (! finished) {
    Exchange_left(); // receiving tag also
    if (tag == 2)
      finished = 1;
    else
      Interior_loop();
    Exchange_right();
  }
}

Last_process() // proc_id == nb_proc – 1
{
  finished = 0;
  stop_sent = 0;
  while (! finished) {
    Exchange_left(); // receiving tag also
    if (tag == 2)
      finished = 1;
    else {
      Interior_loop();
      if (tag == 1 && !stop_sent) {
        Send(dummy, 0);
        stop_sent = 1;
      }
    }
  }
}