当前位置: 动力学知识库 > 问答 > 编程问答 >

mpi - What is the right way to "notify" processors without blocking?

问题描述:

Suppose I have a very large array of things and I have to do some operation on all these things.

In case operation fails for one element, I want to stop the work [this work is distributed across number of processors] across all the array.

I want to achieve this while keeping the number of sent/received messages to a minimum.

Also, I don't want to block processors if there is no need to.

How can I do it using MPI?

网友答案:

A possible strategy to derive this global stop condition in a non-blocking fashion is to rely on MPI_Test.

scenario

Consider that each process posts an asynchronous receive of type MPI_INT to its left rank with a given tag to build a ring. Then start your computation. If a rank encounters the stop condition it sends its own rank to its right rank. In the meantime each rank uses MPI_Test to check for the completion of the MPI_Irecv during the computation if it is completed then enter a branch first waiting the message and then transitively propagating the received rank to the right except if the right rank is equal to the payload of the message (not to loop).

This done you should have all processes in the branch, ready to trigger an arbitrary recovery operation.

Complexity

The topology retained is a ring as it minimizes the number of messages at most (n-1) however it augments the propagation time. Other topologies could be retained with more messages but lower spatial complexity for example a tree with a n.ln(n) complexity.

Implementation

Something like this.

int rank, size;
MPI_Init(&argc,&argv);
MPI_Comm_rank( MPI_COMM_WORLD, &rank);
MPI_Comm_size( MPI_COMM_WORLD, &size);

int left_rank = (rank==0)?(size-1):(rank-1);
int right_rank = (rank==(size-1))?0:(rank+1)%size;

int stop_cond_rank;
MPI_Request stop_cond_request;
int stop_cond= 0;

while( 1 )
{
         MPI_Irecv( &stop_cond_rank, 1, MPI_INT, left_rank, 123, MPI_COMM_WORLD, &stop_cond_request);

         /* Compute Here and set stop condition accordingly */

         if( stop_cond )
         {
                 /* Cancel the left recv */
                 MPI_Cancel( &stop_cond_request );
                 if( rank != right_rank )
                            MPI_Send( &rank, 1, MPI_INT, right_rank, 123, MPI_COMM_WORLD ); 

                   break;
         }

         int did_recv = 0;
         MPI_Test( &stop_cond_request, &did_recv, MPI_STATUS_IGNORE );
         if( did_recv )
         {
                  stop_cond = 1;
                  MPI_Wait( &stop_cond_request, MPI_STATUS_IGNORE );
                  if( right_rank != stop_cond_rank )
                            MPI_Send( &stop_cond_rank, 1, MPI_INT, right_rank, 123, MPI_COMM_WORLD );

                   break;
          }
}

if( stop_cond )
{
      /* Handle the stop condition */
}
else
{
      /* Cleanup */
     MPI_Cancel( &stop_cond_request );
}
网友答案:

This seems to be a common question with no easy answer. Both other answer have scalability issues. The ring-communication approach has linear communication cost, while in the one-sided MPI_Win-solution, a single process will be hammered with memory requests from all workers. This may be fine for low number of ranks, but pose issues when increasing the rank count.

Non-blocking collectives can provide a more scalable better solution. The main idea is to post a MPI_Ibarrier on all ranks except on one designated root. This root will listen to point-to-point stop messages via MPI_Irecv and complete the MPI_Ibarrier once it receives it.

The tricky part is that there are four different cases "{root, non-root} x {found, not-found}" that need to be handled. Also it can happen that multiple ranks send a stop message, requiring an unknown number of matching receives on the root. That can be solved with an additional reduction that counts the number of ranks that sent a stop-request.

Here is an example how this can look in C:

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

const int iter_max = 10000;
const int difficulty = 20000;

int find_stuff()
{
    int num_iters = rand() % iter_max;
    for (int i = 0; i < num_iters; i++) {
        if (rand() % difficulty == 0) {
            return 1;
        }
    }
    return 0;
}

const int stop_tag = 42;
const int root = 0;

int forward_stop(MPI_Request* root_recv_stop, MPI_Request* all_recv_stop, int found_count)
{
    int flag;
    MPI_Status status;
    if (found_count == 0) {
        MPI_Test(root_recv_stop, &flag, &status);
    } else {
        // If we find something on the root, we actually wait until we receive our own message.
        MPI_Wait(root_recv_stop, &status);
        flag = 1;
    }
    if (flag) {
        printf("Forwarding stop signal from %d\n", status.MPI_SOURCE);
        MPI_Ibarrier(MPI_COMM_WORLD, all_recv_stop);
        MPI_Wait(all_recv_stop, MPI_STATUS_IGNORE);
        // We must post some additional receives if multiple ranks found something at the same time
        MPI_Reduce(MPI_IN_PLACE, &found_count, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
        for (found_count--; found_count > 0; found_count--) {
            MPI_Recv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &status);
            printf("Additional stop from: %d\n", status.MPI_SOURCE);
        }
        return 1;
    }
    return 0;
}

int main()
{
    MPI_Init(NULL, NULL);

    int rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    srand(rank);

    MPI_Request root_recv_stop;
    MPI_Request all_recv_stop;
    if (rank == root) {
        MPI_Irecv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &root_recv_stop);
    } else {
        // You may want to use an extra communicator here, to avoid messing with other barriers
        MPI_Ibarrier(MPI_COMM_WORLD, &all_recv_stop);
    }

    while (1) {
        int found = find_stuff();
        if (found) {
            printf("Rank %d found something.\n", rank);
            // Note: We cannot post this as blocking, otherwise there is a deadlock with the reduce
            MPI_Request req;
            MPI_Isend(NULL, 0, MPI_CHAR, root, stop_tag, MPI_COMM_WORLD, &req);
            if (rank != root) {
                // We know that we are going to receive our own stop signal.
                // This avoids running another useless iteration
                MPI_Wait(&all_recv_stop, MPI_STATUS_IGNORE);
                MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
                MPI_Wait(&req, MPI_STATUS_IGNORE);
                break;
            }
            MPI_Wait(&req, MPI_STATUS_IGNORE);
        }
        if (rank == root) {
            if (forward_stop(&root_recv_stop, &all_recv_stop, found)) {
                break;
            }
        } else {
            int stop_signal;
            MPI_Test(&all_recv_stop, &stop_signal, MPI_STATUS_IGNORE);
            if (stop_signal)
            {
                MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
                printf("Rank %d stopping after receiving signal.\n", rank);
                break;
            }
        }
    };

    MPI_Finalize();
}

While this is not the simplest code, it should:

  • Introduce no additional blocking
  • Scale with the implementation of a barrier (usually O(log N))
  • Have a worst-case-latency from one found, to all stop of 2 * loop time ( + 1 p2p + 1 barrier + 1 reduction).
  • If many/all ranks find a solution at the same time, it still works but may be less efficient.
网友答案:

That is a question I've asked myself a few times without finding any completely satisfactory answer... The only thing I thought of (beside MPI_Abort() that does it but which is a bit extreme) is to create an MPI_Win storing a flag that will be raise by whichever process facing the problem, and checked by all processes regularly to see if they can go on processing. This is done using non-blocking calls, the same way as described in this answer.

The main weaknesses of this are:

  1. This depends on the processes to willingly check the status of the flag. There is no immediate interruption of their work to notifying them.
  2. The frequency of this checking must be adjusted by hand. You have to find the trade-off between the time you waste processing data while there's no need to because something happened elsewhere, and the time it takes to check the status...

In the end, what we would need is a way of defining a callback action triggered by an MPI call such as MPI_Abort() (basically replacing the abort action by something else). I don't think this exists, but maybe I overlooked it.

分享给朋友:
您可能感兴趣的文章:
随机阅读: