Parallel IO with MPI: Difference between revisions

From XdmfWeb
Jump to navigationJump to search
No edit summary
 
(4 intermediate revisions by 3 users not shown)
Line 1: Line 1:
(code and description provided by Will Dicharry)
<pre style="color: red">Note: as of version 1.8.13 hdf5 must be patched as follows to allow for this functionality.</pre>


=== Controlling the I/O process for XdmfHeavyData ===
In the file /hdf5/src/CMakeLists.txt


The XDMF API provides a customization point to allow the heavy data sets
change
to be written in a certain manner, depending on the application
if (NOT HDF5_INSTALL_NO_DEVELOPMENT)
environment and independent of the heavy dataset format.  New I/O
  install (
strategies can be implemented by subclassing the callback classes
      FILES
defined in XdmfHeavyData.h and attaching them to XdmfHeavyData classes.
          ${H5_PUBLIC_HEADERS}
This method has the advantage of applying a common strategy to any
      DESTINATION
XdmfHeavyData subclass.
          ${HDF5_INSTALL_INCLUDE_DIR}
      COMPONENT
          headers
  )
endif (NOT HDF5_INSTALL_NO_DEVELOPMENT)


The system works in the following way:
to
if (NOT HDF5_INSTALL_NO_DEVELOPMENT)
  install (
      FILES
          ${H5_PUBLIC_HEADERS}
          ${H5_PRIVATE_HEADERS}
      DESTINATION
          ${HDF5_INSTALL_INCLUDE_DIR}
      COMPONENT
          headers
  )
endif (NOT HDF5_INSTALL_NO_DEVELOPMENT)


XdmfHeavyData contains the non-virtual methods Open, Close, Read, and
===Distributed Shared Memory===
Write and the virtual methods DoOpen, DoClose, DoRead, and DoWrite.
XdmfHeavyData subclasses should override these to open, close, read, and write a particular heavy data format independent of the application environment.


In addition there are 4 callback base classes XdmfOpenCallback, XdmfCloseCallback, XdmfReadCallback, and XdmfWriteCallback.  The callbacks should be implemented to replace the XdmfHeavyData's own Do methods.  To decorate the existing I/O code, call dataset->Do* within the callback's Do* method.  This allows implementors to define both pre and post callbacks, or even disable the output of a dataset entirely.
By leveraging the h5fd and hdf5 libraries, Xdmf provides an interface by which a user can set up a dsm server that may be interacted with in a manner similar to hdf5.


=== Example ===
Multiple Datasets and filenames are supported as of version 3.3.0.


Suppose we have an MPI environment with 4 processes.  Each process
====Initializing DSM====
contains part of an array with 100 values numbered 0 to 99; process 0
with items 0-24, process 1 with items 25-49, etc.  We would like to
write this data to a single HDF file containing a single array of 100
values, stored in increasing numerical order.  To do this, we will
communicate all of the data to process 0 and let process 0 handle the
actual output of the array.


We start by including the necessary headers and defining a very simple
After starting MPI, the DSM can be started by creating an instance of XdmfHDF5WriterDSM or XdmfHDF5ControllerDSM.
stream buffer utility class that simplifies encoding and decoding data
between processes:


<code>
  int size, id, dsmSize;
  #include <mpi.h>
  dsmSize = 64;//The total size of the DSM being created
  #include <XdmfArray.h>
  std::string newPath = "dsm";
  #include <XdmfHDF.h>
   
   
  /// Simple memory buffer implementation that keeps track of it's stream pointer.
  unsigned int numServersCores = 2;
class Buffer {
private:
  std::size_t m_size;
  char* m_data;
  char* m_put;
  char* m_tell;
   
   
  public:
  MPI_Comm comm = MPI_COMM_WORLD;
  Buffer( std::size_t size ) :
    m_size( size ),
    m_data( new char[size] ),
    m_put( m_data ),
    m_tell( m_data )
  {}
   
   
  ~Buffer() {
MPI_Init(&argc, &argv);
    delete [] m_data;
  }
   
   
  /// copy a contiguous block into the buffer
MPI_Comm_rank(comm, &id);
  void put( const void* data, std::size_t size ) {
MPI_Comm_size(comm, &size);
    memcpy( m_put, data, size );
    m_put += size;
  }
   
   
  /// put a single value into the buffer
unsigned int dataspaceAllocated = dsmSize/numServersCores;
  template< typename T >
  void put( const T& t ) {
    std::size_t size = sizeof( T );
    put( &t, size );
  }
   
   
  /// copy a contiguous block of data from the buffer to an already
// Splitting MPICommWorld so that a comm that contains the non-Server cores exists.
  /// allocated location
  void tell( void* out, std::size_t size ) {
    memcpy( out, m_tell, size );
    m_tell += size;
  }
   
   
MPI_Comm workerComm;
   
   
  /// Copy a single value into the buffer.
MPI_Group workers, dsmgroup;
  template< typename T >
  T tell() {
    std::size_t tsize = sizeof( T );
    T tmp;
    tell( &tmp, tsize );
    return tmp;
  }
   
   
  std::size_t size() const {
MPI_Comm_group(comm, &dsmgroup);
    return m_size;
int * ServerIds = (int *)calloc((numServersCores), sizeof(int));
  }
unsigned int index = 0;
for(int i=size-numServersCores ; i <= size-1 ; ++i)
{
  ServerIds[index++] = i;
}
   
   
  char* pointer() {
MPI_Group_excl(dsmgroup, index, ServerIds, &workers);
    return m_data;
int testval = MPI_Comm_create(comm, workers, &workerComm);
  }
cfree(ServerIds);
   
   
  void reset() {
// The last two cores in the Comm are regulated to manage the DSM.
    m_put = m_data;
shared_ptr<XdmfHDF5WriterDSM> exampleWriter =  
    m_tell = m_data;
  XdmfHDF5WriterDSM::New(newPath, comm, dataspaceAllocated, size-numServersCores, size-1);
  }
};
</code>


We haven't yet called on the Xdmf API to do anything, the Buffer class
This creates a DSM buffer and manager which must be passed to any new DSM objects in order for those objects to function.
is just a utility that we will use later.


Now, we write our callback class that will customize the process of
After the user is finished with DSM the manager must be disposed of. Not doing this will result in an error.
opening, reading, writing, and closing the heavy dataset.  Since our
communication strategy must implement a new function for each step, we will write one class that inherits XdmfOpenCallback, XdmfCloseCallback,
XdmfWriteCallback, and XdmfReadCallback.  Each one of the base classes
has a single virtual method that takes a pointer to a XdmfHeavyData and
the same arguments that XdmfHeavyData::Open, XdmfHeavyData::Close,
XdmfHeavyData::Write, or XdmfHeavyData::Write would take and wraps the synchronization code around the actual XdmfHeavyData methods. Note that we call the virtual DoOpen, DoClose, DoRead, and DoWrite methods rather than the non-virtual Open, Close, Read, and Write methods.


<code>
  if (id == 0)
  /// Callback implements parallel IO by communicating to process 0
class CommunicationCallback :
  public XdmfOpenCallback,
  public XdmfWriteCallback,
  public XdmfCloseCallback,
  public XdmfReadCallback
  {
  {
private:
  exampleWriter->stopDSM();
  int mCommRank;
  }
  int mCommSize;
public:
 
  /// Constructor initializes the Number of processors and local process
  /// ID.
  CommunicationCallback() {
    MPI_Comm_size( MPI_COMM_WORLD, &mCommSize );
    MPI_Comm_rank( MPI_COMM_WORLD, &mCommRank );
  }
 
  /// Reimplemented from XdmfOpenCallback::DoOpen.  Only rank 0 is going
  /// to read or write, so only rank 0 will open the file.
  XdmfInt32 DoOpen(
    XdmfHeavyData* ds,
    XdmfConstString name,
    XdmfConstString access )
  {
    if ( mCommRank == 0 ) {
      // Call the actual DoOpen method from XdmfHeavyData
      return ds->DoOpen( name, access );
    } else {
      // Not rank 0, nothing to do.
      return XDMF_SUCCESS;
    }
  }
 
  /// Reimplemented from XdmfCloseCallback::DoClose.  Again, only rank 0
  /// needs to do anything.
  XdmfInt32 DoClose( XdmfHeavyData* ds )
  {
    if ( mCommRank == 0 ) {
      // Call the heavy dataset's native close method.
      return ds->DoClose();
    } else {
      // not rank 0, nothing to do.
      return XDMF_SUCCESS;
    }
  }
 
  /// Reimplemented from XdmfWriteCallback::DoWrite.  If the local
  /// process ID is 0, then we expect to receive data from everyone
  /// else.  Otherwise, we send data.  Rank 0 does all of the writing.
  XdmfInt32 DoWrite( XdmfHeavyData* ds, XdmfArray* array )
  {
    // This implementation assumes process 0 has the same data
    // size as everyone else
    // set up a stream buffer that is large enough to hold the data to
    // be sent or received.  We require enough space for the rank,
    // start, stride, and count for the array plus enough space to hold
    // the actual array data.
    XdmfInt64 start[1], stride[1], count[1];
    XdmfInt32 slab_rank = ds->GetHyperSlab( start, stride, count );
    std::size_t slab_info_size =
      sizeof( XdmfInt32 ) // slab rank
      + slab_rank * sizeof( XdmfInt64 ) * 3; // start, stride, and count
    Buffer buf( slab_info_size + array->GetCoreLength() );
    // If the local process ID is nonzero, pack the buffer and send to
    // process 0.
    if ( mCommRank != 0 ) {
      // copy rank and slab information into the buffer.
      buf.put( slab_rank );
      for ( int i = 0; i < slab_rank; ++i ) {
        buf.put( start[i] );
        buf.put( stride[i] );
        buf.put( count[i] );
      }
      // copy the actual data into the buffer.
      buf.put( array->GetDataPointer(), array->GetCoreLength() );
      // send to rank 0 in the global communicator.
      MPI_Send(
        buf.pointer(),
        buf.size(),
        MPI_BYTE,
        0,
        0,
        MPI_COMM_WORLD );
    } else {
      // Local process ID is 0, so we write local data then receive and
      // write remote data.
 
      // first, it's easy to write the local data
      ds->DoWrite( array );
 
      int processes_received = 1; // local data for process 0 is written
 
      // loop until the data for all processes has been received.
      while ( processes_received < mCommSize ) {
        // Fill the buffer with incoming data.  We are assuming here
        // that all processes contain the same amount of data.
        MPI_Recv(
          buf.pointer(),
          buf.size(),
          MPI_BYTE,
          MPI_ANY_SOURCE,
          0,
          MPI_COMM_WORLD,
          0 );
        processes_received++;
 
        // unpack the buffer
        buf.reset(); // reset the stream pointer to the beginning
        // we mean rank in the linear algebra sense here
        slab_rank = buf.tell< XdmfInt32 >();
        // start, stride, and count are next.
        for( int i = 0; i < slab_rank; ++i ) {
          start[i] = buf.tell< XdmfInt64 >();
          stride[i] = buf.tell< XdmfInt64 >();
          count[i] = buf.tell< XdmfInt64 >();
        }
        // select the correct hyper slab in the heavy data set.
        ds->SelectHyperSlab( start, stride, count );
        // allocate an array to hold the off-core data.
        XdmfArray* recv = new XdmfArray;
        recv->CopyShape( array );
        // place the received data into the array.
        buf.tell( recv->GetDataPointer(), recv->GetCoreLength() );
        // write the off core data to the dataset by calling the heavy
        // data's DoWrite method.
        ds->DoWrite( recv );
        delete recv;
      }
    }
    return XDMF_SUCCESS;
  }
 
  /// Reimplemented from XdmfReadCallback::DoRead.  Read the data from a
  /// heavy data source into an array only if the local process ID is 0.
  /// Otherwise, do nothing.
  XdmfArray* DoRead( XdmfHeavyData* ds, XdmfArray* array )
  {
    if ( mCommRank == 0 ) {
      // defer to the actual heavy data implementation of DoRead.
      return ds->DoRead( array );
    } else {
      // return an empty array.
      return NULL;
    }
  }
}; // end of class CommunicationCallback
</code>
 
One of the useful features of the above code is that it relies only on the abstract portion of the XdmfHeavyData interface.  Therefore, it is general enough to be used when writing to any heavy dataset (either HDF or MySQL).  In the future, if other heavy data output formats are added, the same strategy could apply to them as well.
 
Now, we write a main program that generates the data, writes to an HDF dataset, and reads from the same file to verify the round trip.
 
<code>
char const * const kDatasetName = "FILE:TestFile.h5:/XdmfHDFMPI";
int main( int argc, char* argv[] ) {
  // Initialize MPI
  MPI_Init( &argc, &argv );
  // determine the local process ID
  int rank;
  MPI_Comm_rank( MPI_COMM_WORLD, &rank );
  // Create an HDF dataset to write out to.
  XdmfHDF* H5 = new XdmfHDF();
  // Attach an instance of the CommunicationCallback class defined above
  // to customize opening, writing, and closing the dataset.
  CommunicationCallback* cb = new CommunicationCallback;
  H5->setOpenCallback( cb );
  H5->setWriteCallback( cb );
  H5->setCloseCallback(cb );
  // Allocate an array of size 25 and fill it with values depending
  // on the local process ID (process 0 has 0-24, process 1 has
  // 25-49,...)
  XdmfArray* MyData = new XdmfArray();
  MyData->SetNumberType( XDMF_FLOAT32_TYPE );
  MyData->SetNumberOfElements( 25 );
  MyData->Generate( rank * 25, rank*25 + 24 );
   
  // Set the dataset's type to the array's type
  H5->CopyType( MyData );
  // Set up the dimensions of the dataset.
  XdmfInt64 dims[1];
  dims[0] = 100;
  H5->SetShape( 1, dims );
  // Set up the hyper slab for the dataset.  Each process will select a
  // distinct portion of the dataset (process 0 has 0-24, process 1
  // 25-49,...)
  XdmfInt64 start[1], stride[1], count[1];
  start[0] = rank * 25;
  stride[0] = 1;
  count[0] = 25;
  // select the slab in the dataset corresponding to the local process.
  H5->SelectHyperSlab( start, stride, count );
  // Open the dataset.
  H5->Open( kDatasetName, "w" );
  // Write the array.
  H5->Write( MyData );
  // Close the dataset.
  H5->Close();
  // writing is complete, now we have a quick test to ensure the data
  // survives a round trip.
  bool failure = false;
  // Create a new dataset to read from
  XdmfHDF* H5In = new XdmfHDF();
  // Use the same callback.
  H5In->setReadCallback( cb );
  H5In->setOpenCallback( cb );
  H5In->setCloseCallback( cb );
  // Open the dataset for reading.
  H5In->Open( kDatasetName, "r" );
  // Read the data into an array.
  XdmfArray* result = H5In->Read();
  // if the read was successful, make sure the values for all processes
  // are in the file.
  if ( result ) {
    for ( size_t i = 0; i < 100; ++i ) {
      float value = result->GetValueAsFloat32( i );
      std::cout << i << " " << value << std::endl;
      failure = ( value != i );
    }
  }
   
   
  MPI_Finalize();
MPI_Barrier(comm);
   
   
  delete H5;
//the dsmManager must be deleted or else there will be a segfault
  delete cb;
exampleWriter->deleteManager();
  delete MyData;
  delete H5In;
  delete result;
   
   
  if ( failure ) {
MPI_Finalize();
    return -1;
  } else {
    return 0;
  }
};
</code>


Running the program in an MPI environment with 4 processes produces a single HDF5 file TestFile.h5 with a single dataset XdmfHDFMPI containing the numbers 0 to 99.  The full source of this example is available in the Xdmf source tree.
A full example program can be found in the Xdmf source at: Xdmf/core/dsm/tests/Cxx/DSMLoopTest.cpp

Latest revision as of 13:17, 13 March 2017

Note: as of version 1.8.13 hdf5 must be patched as follows to allow for this functionality.

In the file /hdf5/src/CMakeLists.txt

change

if (NOT HDF5_INSTALL_NO_DEVELOPMENT)
  install (
      FILES
          ${H5_PUBLIC_HEADERS}
      DESTINATION
          ${HDF5_INSTALL_INCLUDE_DIR}
      COMPONENT
          headers
  )
endif (NOT HDF5_INSTALL_NO_DEVELOPMENT)

to

if (NOT HDF5_INSTALL_NO_DEVELOPMENT)
  install (
      FILES
          ${H5_PUBLIC_HEADERS}
          ${H5_PRIVATE_HEADERS}
      DESTINATION
          ${HDF5_INSTALL_INCLUDE_DIR}
      COMPONENT
          headers
  )
endif (NOT HDF5_INSTALL_NO_DEVELOPMENT)

Distributed Shared Memory

By leveraging the h5fd and hdf5 libraries, Xdmf provides an interface by which a user can set up a dsm server that may be interacted with in a manner similar to hdf5.

Multiple Datasets and filenames are supported as of version 3.3.0.

Initializing DSM

After starting MPI, the DSM can be started by creating an instance of XdmfHDF5WriterDSM or XdmfHDF5ControllerDSM.

int size, id, dsmSize;
dsmSize = 64;//The total size of the DSM being created
std::string newPath = "dsm";

unsigned int numServersCores = 2;

MPI_Comm comm = MPI_COMM_WORLD;

MPI_Init(&argc, &argv);

MPI_Comm_rank(comm, &id);
MPI_Comm_size(comm, &size);

unsigned int dataspaceAllocated = dsmSize/numServersCores;

// Splitting MPICommWorld so that a comm that contains the non-Server cores exists.

MPI_Comm workerComm;

MPI_Group workers, dsmgroup;

MPI_Comm_group(comm, &dsmgroup);
int * ServerIds = (int *)calloc((numServersCores), sizeof(int));
unsigned int index = 0;
for(int i=size-numServersCores ; i <= size-1 ; ++i)
{
  ServerIds[index++] = i;
}

MPI_Group_excl(dsmgroup, index, ServerIds, &workers);
int testval = MPI_Comm_create(comm, workers, &workerComm);
cfree(ServerIds);

// The last two cores in the Comm are regulated to manage the DSM.
shared_ptr<XdmfHDF5WriterDSM> exampleWriter = 
  XdmfHDF5WriterDSM::New(newPath, comm, dataspaceAllocated, size-numServersCores, size-1);

This creates a DSM buffer and manager which must be passed to any new DSM objects in order for those objects to function.

After the user is finished with DSM the manager must be disposed of. Not doing this will result in an error.

if (id == 0)
{
  exampleWriter->stopDSM();
}

MPI_Barrier(comm);

//the dsmManager must be deleted or else there will be a segfault
exampleWriter->deleteManager();

MPI_Finalize();

A full example program can be found in the Xdmf source at: Xdmf/core/dsm/tests/Cxx/DSMLoopTest.cpp