Parallel IO with MPI

From XdmfWeb
Revision as of 14:17, 19 August 2014 by Dave.demarle (talk | contribs)
Jump to navigationJump to search
Note code snippets on this page are from version 2 or 1 and need updating for xdmf 3.

(code, example and description provided by Will Dicharry)

Controlling the I/O process for XdmfHeavyData

The XDMF API provides a customization point to allow the heavy data sets to be written in a certain manner, depending on the application environment and independent of the heavy dataset format. New I/O strategies can be implemented by subclassing the callback classes defined in XdmfHeavyData.h and attaching them to XdmfHeavyData classes. This method has the advantage of applying a common strategy to any XdmfHeavyData subclass.

The system works in the following way:

XdmfHeavyData contains the non-virtual methods Open, Close, Read, and Write and the virtual methods DoOpen, DoClose, DoRead, and DoWrite. XdmfHeavyData subclasses should override these to open, close, read, and write a particular heavy data format independent of the application environment.

In addition there are 4 callback base classes XdmfOpenCallback, XdmfCloseCallback, XdmfReadCallback, and XdmfWriteCallback. The callbacks should be implemented to replace the XdmfHeavyData's own Do methods. To decorate the existing I/O code, call dataset->Do* within the callback's Do* method. This allows implementors to define both pre and post callbacks, or even disable the output of a dataset entirely.

Example

Suppose we have an MPI environment with 4 processes. Each process contains part of an array with 100 values numbered 0 to 99; process 0 with items 0-24, process 1 with items 25-49, etc. We would like to write this data to a single HDF file containing a single array of 100 values, stored in increasing numerical order. To do this, we will communicate all of the data to process 0 and let process 0 handle the actual output of the array.

We start by including the necessary headers and defining a very simple stream buffer utility class that simplifies encoding and decoding data between processes:

#include <mpi.h>
#include <XdmfArray.h>
#include <XdmfHDF.h>

/// Simple memory buffer implementation that keeps track of it's stream pointer.
class Buffer {
private:
 std::size_t m_size;
 char* m_data;
 char* m_put;
 char* m_tell;

public:
 Buffer( std::size_t size ) :
   m_size( size ),
   m_data( new char[size] ),
   m_put( m_data ),
   m_tell( m_data )
 {}

 ~Buffer() {
   delete [] m_data;
 }

 /// copy a contiguous block into the buffer
 void put( const void* data, std::size_t size ) {
   memcpy( m_put, data, size );
   m_put += size;
 }

 /// put a single value into the buffer
 template< typename T >
 void put( const T& t ) {
   std::size_t size = sizeof( T );
   put( &t, size );
 }

 /// copy a contiguous block of data from the buffer to an already
 /// allocated location
 void tell( void* out, std::size_t size ) {
   memcpy( out, m_tell, size );
   m_tell += size;
 }


 /// Copy a single value into the buffer.
 template< typename T >
 T tell() {
   std::size_t tsize = sizeof( T );
   T tmp;
   tell( &tmp, tsize );
   return tmp;
 }

 std::size_t size() const {
   return m_size;
 }

 char* pointer() {
   return m_data;
 }

 void reset() {
   m_put = m_data;
   m_tell = m_data;
 }
};

We haven't yet called on the Xdmf API to do anything, the Buffer class is just a utility that we will use later.

Now, we write our callback class that will customize the process of opening, reading, writing, and closing the heavy dataset. Since our communication strategy must implement a new function for each step, we will write one class that inherits XdmfOpenCallback, XdmfCloseCallback, XdmfWriteCallback, and XdmfReadCallback. Each one of the base classes has a single virtual method that takes a pointer to a XdmfHeavyData and the same arguments that XdmfHeavyData::Open, XdmfHeavyData::Close, XdmfHeavyData::Write, or XdmfHeavyData::Write would take and wraps the synchronization code around the actual XdmfHeavyData methods. Note that we call the virtual DoOpen, DoClose, DoRead, and DoWrite methods rather than the non-virtual Open, Close, Read, and Write methods.

/// Callback implements parallel IO by communicating to process 0
class CommunicationCallback :
 public XdmfOpenCallback,
 public XdmfWriteCallback,
 public XdmfCloseCallback,
 public XdmfReadCallback
{
private:
 int mCommRank;
 int mCommSize;
public:
 
 /// Constructor initializes the Number of processors and local process
 /// ID.
 CommunicationCallback() {
   MPI_Comm_size( MPI_COMM_WORLD, &mCommSize );
   MPI_Comm_rank( MPI_COMM_WORLD, &mCommRank );
 }
 
 /// Reimplemented from XdmfOpenCallback::DoOpen.  Only rank 0 is going
 /// to read or write, so only rank 0 will open the file.
 XdmfInt32 DoOpen(
   XdmfHeavyData* ds,
   XdmfConstString name,
   XdmfConstString access )
 {
   if ( mCommRank == 0 ) {
     // Call the actual DoOpen method from XdmfHeavyData
     return ds->DoOpen( name, access );
   } else {
     // Not rank 0, nothing to do.
     return XDMF_SUCCESS;
   }
 }
 
 /// Reimplemented from XdmfCloseCallback::DoClose.  Again, only rank 0
 /// needs to do anything.
 XdmfInt32 DoClose( XdmfHeavyData* ds )
 {
   if ( mCommRank == 0 ) {
     // Call the heavy dataset's native close method.
     return ds->DoClose();
   } else {
     // not rank 0, nothing to do.
     return XDMF_SUCCESS;
   }
 }
 
 /// Reimplemented from XdmfWriteCallback::DoWrite.  If the local
 /// process ID is 0, then we expect to receive data from everyone
 /// else.  Otherwise, we send data.  Rank 0 does all of the writing.
 XdmfInt32 DoWrite( XdmfHeavyData* ds, XdmfArray* array )
 {
   // This implementation assumes process 0 has the same data
   // size as everyone else

   // set up a stream buffer that is large enough to hold the data to
   // be sent or received.  We require enough space for the rank,
   // start, stride, and count for the array plus enough space to hold
   // the actual array data.
   XdmfInt64 start[1], stride[1], count[1];
   XdmfInt32 slab_rank = ds->GetHyperSlab( start, stride, count );
   std::size_t slab_info_size =
     sizeof( XdmfInt32 ) // slab rank
     + slab_rank * sizeof( XdmfInt64 ) * 3; // start, stride, and count
   Buffer buf( slab_info_size + array->GetCoreLength() );

   // If the local process ID is nonzero, pack the buffer and send to
   // process 0.
   if ( mCommRank != 0 ) {
     // copy rank and slab information into the buffer.
     buf.put( slab_rank );
     for ( int i = 0; i < slab_rank; ++i ) {
       buf.put( start[i] );
       buf.put( stride[i] );
       buf.put( count[i] );
     }
     // copy the actual data into the buffer.
     buf.put( array->GetDataPointer(), array->GetCoreLength() );
     // send to rank 0 in the global communicator.
     MPI_Send(
       buf.pointer(),
       buf.size(),
       MPI_BYTE,
       0,
       0,
       MPI_COMM_WORLD );
   } else {
     // Local process ID is 0, so we write local data then receive and
     // write remote data.
 
     // first, it's easy to write the local data
     ds->DoWrite( array );
 
     int processes_received = 1; // local data for process 0 is written
 
     // loop until the data for all processes has been received.
     while ( processes_received < mCommSize ) {
       // Fill the buffer with incoming data.  We are assuming here
       // that all processes contain the same amount of data.
       MPI_Recv(
         buf.pointer(),
         buf.size(),
         MPI_BYTE,
         MPI_ANY_SOURCE,
         0,
         MPI_COMM_WORLD,
         0 );
       processes_received++;
 
       // unpack the buffer
       buf.reset(); // reset the stream pointer to the beginning
       // we mean rank in the linear algebra sense here
       slab_rank = buf.tell< XdmfInt32 >();
       // start, stride, and count are next.
       for( int i = 0; i < slab_rank; ++i ) {
         start[i] = buf.tell< XdmfInt64 >();
         stride[i] = buf.tell< XdmfInt64 >();
         count[i] = buf.tell< XdmfInt64 >();
       }
       // select the correct hyper slab in the heavy data set.
       ds->SelectHyperSlab( start, stride, count );
       // allocate an array to hold the off-core data.
       XdmfArray* recv = new XdmfArray;
       recv->CopyShape( array );
       // place the received data into the array.
       buf.tell( recv->GetDataPointer(), recv->GetCoreLength() );
       // write the off core data to the dataset by calling the heavy
       // data's DoWrite method.
       ds->DoWrite( recv );
       delete recv;
     }
   }

   return XDMF_SUCCESS;
 }
 
 /// Reimplemented from XdmfReadCallback::DoRead.  Read the data from a
 /// heavy data source into an array only if the local process ID is 0.
 /// Otherwise, do nothing.
 XdmfArray* DoRead( XdmfHeavyData* ds, XdmfArray* array )
 {
   if ( mCommRank == 0 ) {
     // defer to the actual heavy data implementation of DoRead.
     return ds->DoRead( array );
   } else {
     // return an empty array.
     return NULL;
   }
 }
}; // end of class CommunicationCallback

One of the useful features of the above code is that it relies only on the abstract portion of the XdmfHeavyData interface. Therefore, it is general enough to be used when writing to any heavy dataset (either HDF or MySQL). In the future, if other heavy data output formats are added, the same strategy could apply to them as well.

Now, we write a main program that generates the data, writes to an HDF dataset, and reads from the same file to verify the round trip.

char const * const kDatasetName = "FILE:TestFile.h5:/XdmfHDFMPI";

int main( int argc, char* argv[] ) {
 // Initialize MPI
 MPI_Init( &argc, &argv );

 // determine the local process ID
 int rank;
 MPI_Comm_rank( MPI_COMM_WORLD, &rank );

 // Create an HDF dataset to write out to.
 XdmfHDF* H5 = new XdmfHDF();
 // Attach an instance of the CommunicationCallback class defined above
 // to customize opening, writing, and closing the dataset.
 CommunicationCallback* cb = new CommunicationCallback;
 H5->setOpenCallback( cb );
 H5->setWriteCallback( cb );
 H5->setCloseCallback(cb );

 // Allocate an array of size 25 and fill it with values depending
 // on the local process ID (process 0 has 0-24, process 1 has
 // 25-49,...)
 XdmfArray* MyData = new XdmfArray();
 MyData->SetNumberType( XDMF_FLOAT32_TYPE );
 MyData->SetNumberOfElements( 25 );
 MyData->Generate( rank * 25, rank*25 + 24 );

 // Set the dataset's type to the array's type
 H5->CopyType( MyData );
 // Set up the dimensions of the dataset.
 XdmfInt64 dims[1];
 dims[0] = 100;
 H5->SetShape( 1, dims );
 // Set up the hyper slab for the dataset.  Each process will select a
 // distinct portion of the dataset (process 0 has 0-24, process 1
 // 25-49,...)
 XdmfInt64 start[1], stride[1], count[1];
 start[0] = rank * 25;
 stride[0] = 1;
 count[0] = 25;
 // select the slab in the dataset corresponding to the local process.
 H5->SelectHyperSlab( start, stride, count );
 // Open the dataset.
 H5->Open( kDatasetName, "w" );
 // Write the array.
 H5->Write( MyData );
 // Close the dataset.
 H5->Close();

 // writing is complete, now we have a quick test to ensure the data
 // survives a round trip.
 bool failure = false;

 // Create a new dataset to read from
 XdmfHDF* H5In = new XdmfHDF();
 // Use the same callback.
 H5In->setReadCallback( cb );
 H5In->setOpenCallback( cb );
 H5In->setCloseCallback( cb );
 // Open the dataset for reading.
 H5In->Open( kDatasetName, "r" );
 // Read the data into an array.
 XdmfArray* result = H5In->Read();

 // if the read was successful, make sure the values for all processes
 // are in the file.
 if ( result ) {
   for ( size_t i = 0; i < 100; ++i ) {
     float value = result->GetValueAsFloat32( i );
     std::cout << i << " " << value << std::endl;
     failure = ( value != i );
   }
 }

 MPI_Finalize();

 delete H5;
 delete cb;
 delete MyData;
 delete H5In;
 delete result;

 if ( failure ) {
   return -1;
 } else {
   return 0;
 }
};

Running the program in an MPI environment with 4 processes produces a single HDF5 file TestFile.h5 with a single dataset XdmfHDFMPI containing the numbers 0 to 99. The full source of this example is available in the Xdmf source tree.