Parallel IO with MPI: Difference between revisions
No edit summary |
No edit summary |
||
Line 37: | Line 37: | ||
#include <XdmfArray.h> | #include <XdmfArray.h> | ||
#include <XdmfHDF.h> | #include <XdmfHDF.h> | ||
/// Simple memory buffer implementation that keeps track of it's stream pointer. | /// Simple memory buffer implementation that keeps track of it's stream pointer. | ||
class Buffer { | class Buffer { | ||
Line 45: | Line 45: | ||
char* m_put; | char* m_put; | ||
char* m_tell; | char* m_tell; | ||
public: | public: | ||
Buffer( std::size_t size ) : | Buffer( std::size_t size ) : | ||
Line 53: | Line 53: | ||
m_tell( m_data ) | m_tell( m_data ) | ||
{} | {} | ||
~Buffer() { | ~Buffer() { | ||
delete [] m_data; | delete [] m_data; | ||
} | } | ||
/// copy a contiguous block into the buffer | /// copy a contiguous block into the buffer | ||
void put( const void* data, std::size_t size ) { | void put( const void* data, std::size_t size ) { | ||
Line 63: | Line 63: | ||
m_put += size; | m_put += size; | ||
} | } | ||
/// put a single value into the buffer | /// put a single value into the buffer | ||
template< typename T > | template< typename T > | ||
Line 70: | Line 70: | ||
put( &t, size ); | put( &t, size ); | ||
} | } | ||
/// copy a contiguous block of data from the buffer to an already | /// copy a contiguous block of data from the buffer to an already | ||
/// allocated location | /// allocated location | ||
Line 77: | Line 77: | ||
m_tell += size; | m_tell += size; | ||
} | } | ||
/// Copy a single value into the buffer. | /// Copy a single value into the buffer. | ||
template< typename T > | template< typename T > | ||
Line 87: | Line 87: | ||
return tmp; | return tmp; | ||
} | } | ||
std::size_t size() const { | std::size_t size() const { | ||
return m_size; | return m_size; | ||
} | } | ||
char* pointer() { | char* pointer() { | ||
return m_data; | return m_data; | ||
} | } | ||
void reset() { | void reset() { | ||
m_put = m_data; | m_put = m_data; | ||
Line 126: | Line 126: | ||
int mCommSize; | int mCommSize; | ||
public: | public: | ||
/// Constructor initializes the Number of processors and local process | /// Constructor initializes the Number of processors and local process | ||
/// ID. | /// ID. | ||
Line 133: | Line 133: | ||
MPI_Comm_rank( MPI_COMM_WORLD, &mCommRank ); | MPI_Comm_rank( MPI_COMM_WORLD, &mCommRank ); | ||
} | } | ||
/// Reimplemented from XdmfOpenCallback::DoOpen. Only rank 0 is going | /// Reimplemented from XdmfOpenCallback::DoOpen. Only rank 0 is going | ||
/// to read or write, so only rank 0 will open the file. | /// to read or write, so only rank 0 will open the file. | ||
Line 149: | Line 149: | ||
} | } | ||
} | } | ||
/// Reimplemented from XdmfCloseCallback::DoClose. Again, only rank 0 | /// Reimplemented from XdmfCloseCallback::DoClose. Again, only rank 0 | ||
/// needs to do anything. | /// needs to do anything. | ||
Line 162: | Line 162: | ||
} | } | ||
} | } | ||
/// Reimplemented from XdmfWriteCallback::DoWrite. If the local | /// Reimplemented from XdmfWriteCallback::DoWrite. If the local | ||
/// process ID is 0, then we expect to receive data from everyone | /// process ID is 0, then we expect to receive data from everyone | ||
Line 205: | Line 205: | ||
// Local process ID is 0, so we write local data then receive and | // Local process ID is 0, so we write local data then receive and | ||
// write remote data. | // write remote data. | ||
// first, it's easy to write the local data | // first, it's easy to write the local data | ||
ds->DoWrite( array ); | ds->DoWrite( array ); | ||
int processes_received = 1; // local data for process 0 is written | int processes_received = 1; // local data for process 0 is written | ||
// loop until the data for all processes has been received. | // loop until the data for all processes has been received. | ||
while ( processes_received < mCommSize ) { | while ( processes_received < mCommSize ) { | ||
Line 224: | Line 224: | ||
0 ); | 0 ); | ||
processes_received++; | processes_received++; | ||
// unpack the buffer | // unpack the buffer | ||
buf.reset(); // reset the stream pointer to the beginning | buf.reset(); // reset the stream pointer to the beginning | ||
Line 251: | Line 251: | ||
return XDMF_SUCCESS; | return XDMF_SUCCESS; | ||
} | } | ||
/// Reimplemented from XdmfReadCallback::DoRead. Read the data from a | /// Reimplemented from XdmfReadCallback::DoRead. Read the data from a | ||
/// heavy data source into an array only if the local process ID is 0. | /// heavy data source into an array only if the local process ID is 0. | ||
Line 274: | Line 274: | ||
<code> | <code> | ||
char const * const kDatasetName = "FILE:TestFile.h5:/XdmfHDFMPI"; | char const * const kDatasetName = "FILE:TestFile.h5:/XdmfHDFMPI"; | ||
int main( int argc, char* argv[] ) { | int main( int argc, char* argv[] ) { | ||
// Initialize MPI | // Initialize MPI | ||
MPI_Init( &argc, &argv ); | MPI_Init( &argc, &argv ); | ||
// determine the local process ID | // determine the local process ID | ||
int rank; | int rank; | ||
MPI_Comm_rank( MPI_COMM_WORLD, &rank ); | MPI_Comm_rank( MPI_COMM_WORLD, &rank ); | ||
// Create an HDF dataset to write out to. | // Create an HDF dataset to write out to. | ||
XdmfHDF* H5 = new XdmfHDF(); | XdmfHDF* H5 = new XdmfHDF(); | ||
Line 291: | Line 291: | ||
H5->setWriteCallback( cb ); | H5->setWriteCallback( cb ); | ||
H5->setCloseCallback(cb ); | H5->setCloseCallback(cb ); | ||
// Allocate an array of size 25 and fill it with values depending | // Allocate an array of size 25 and fill it with values depending | ||
// on the local process ID (process 0 has 0-24, process 1 has | // on the local process ID (process 0 has 0-24, process 1 has | ||
Line 299: | Line 299: | ||
MyData->SetNumberOfElements( 25 ); | MyData->SetNumberOfElements( 25 ); | ||
MyData->Generate( rank * 25, rank*25 + 24 ); | MyData->Generate( rank * 25, rank*25 + 24 ); | ||
// Set the dataset's type to the array's type | // Set the dataset's type to the array's type | ||
H5->CopyType( MyData ); | H5->CopyType( MyData ); | ||
Line 321: | Line 321: | ||
// Close the dataset. | // Close the dataset. | ||
H5->Close(); | H5->Close(); | ||
// writing is complete, now we have a quick test to ensure the data | // writing is complete, now we have a quick test to ensure the data | ||
// survives a round trip. | // survives a round trip. | ||
bool failure = false; | bool failure = false; | ||
// Create a new dataset to read from | // Create a new dataset to read from | ||
XdmfHDF* H5In = new XdmfHDF(); | XdmfHDF* H5In = new XdmfHDF(); | ||
Line 336: | Line 336: | ||
// Read the data into an array. | // Read the data into an array. | ||
XdmfArray* result = H5In->Read(); | XdmfArray* result = H5In->Read(); | ||
// if the read was successful, make sure the values for all processes | // if the read was successful, make sure the values for all processes | ||
// are in the file. | // are in the file. | ||
Line 346: | Line 346: | ||
} | } | ||
} | } | ||
MPI_Finalize(); | MPI_Finalize(); | ||
delete H5; | delete H5; | ||
delete cb; | delete cb; | ||
Line 354: | Line 354: | ||
delete H5In; | delete H5In; | ||
delete result; | delete result; | ||
if ( failure ) { | if ( failure ) { | ||
return -1; | return -1; |
Revision as of 10:17, 26 June 2009
(code and description provided by Will Dicharry)
Controlling the I/O process for XdmfHeavyData
The XDMF API provides a customization point to allow the heavy data sets to be written in a certain manner, depending on the application environment and independent of the heavy dataset format. New I/O strategies can be implemented by subclassing the callback classes defined in XdmfHeavyData.h and attaching them to XdmfHeavyData classes. This method has the advantage of applying a common strategy to any XdmfHeavyData subclass.
The system works in the following way:
XdmfHeavyData contains the non-virtual methods Open, Close, Read, and Write and the virtual methods DoOpen, DoClose, DoRead, and DoWrite. XdmfHeavyData subclasses should override these to open, close, read, and write a particular heavy data format independent of the application environment.
In addition there are 4 callback base classes XdmfOpenCallback, XdmfCloseCallback, XdmfReadCallback, and XdmfWriteCallback. The callbacks should be implemented to replace the XdmfHeavyData's own Do methods. To decorate the existing I/O code, call dataset->Do* within the callback's Do* method. This allows implementors to define both pre and post callbacks, or even disable the output of a dataset entirely.
Example
Suppose we have an MPI environment with 4 processes. Each process contains part of an array with 100 values numbered 0 to 99; process 0 with items 0-24, process 1 with items 25-49, etc. We would like to write this data to a single HDF file containing a single array of 100 values, stored in increasing numerical order. To do this, we will communicate all of the data to process 0 and let process 0 handle the actual output of the array.
We start by including the necessary headers and defining a very simple stream buffer utility class that simplifies encoding and decoding data between processes:
#include <mpi.h>
#include <XdmfArray.h>
#include <XdmfHDF.h>
/// Simple memory buffer implementation that keeps track of it's stream pointer.
class Buffer {
private:
std::size_t m_size;
char* m_data;
char* m_put;
char* m_tell;
public:
Buffer( std::size_t size ) :
m_size( size ),
m_data( new char[size] ),
m_put( m_data ),
m_tell( m_data )
{}
~Buffer() {
delete [] m_data;
}
/// copy a contiguous block into the buffer
void put( const void* data, std::size_t size ) {
memcpy( m_put, data, size );
m_put += size;
}
/// put a single value into the buffer
template< typename T >
void put( const T& t ) {
std::size_t size = sizeof( T );
put( &t, size );
}
/// copy a contiguous block of data from the buffer to an already
/// allocated location
void tell( void* out, std::size_t size ) {
memcpy( out, m_tell, size );
m_tell += size;
}
/// Copy a single value into the buffer.
template< typename T >
T tell() {
std::size_t tsize = sizeof( T );
T tmp;
tell( &tmp, tsize );
return tmp;
}
std::size_t size() const {
return m_size;
}
char* pointer() {
return m_data;
}
void reset() {
m_put = m_data;
m_tell = m_data;
}
};
We haven't yet called on the Xdmf API to do anything, the Buffer class is just a utility that we will use later.
Now, we write our callback class that will customize the process of opening, reading, writing, and closing the heavy dataset. Since our communication strategy must implement a new function for each step, we will write one class that inherits XdmfOpenCallback, XdmfCloseCallback, XdmfWriteCallback, and XdmfReadCallback. Each one of the base classes has a single virtual method that takes a pointer to a XdmfHeavyData and the same arguments that XdmfHeavyData::Open, XdmfHeavyData::Close, XdmfHeavyData::Write, or XdmfHeavyData::Write would take and wraps the synchronization code around the actual XdmfHeavyData methods. Note that we call the virtual DoOpen, DoClose, DoRead, and DoWrite methods rather than the non-virtual Open, Close, Read, and Write methods.
/// Callback implements parallel IO by communicating to process 0
class CommunicationCallback :
public XdmfOpenCallback,
public XdmfWriteCallback,
public XdmfCloseCallback,
public XdmfReadCallback
{
private:
int mCommRank;
int mCommSize;
public:
/// Constructor initializes the Number of processors and local process
/// ID.
CommunicationCallback() {
MPI_Comm_size( MPI_COMM_WORLD, &mCommSize );
MPI_Comm_rank( MPI_COMM_WORLD, &mCommRank );
}
/// Reimplemented from XdmfOpenCallback::DoOpen. Only rank 0 is going
/// to read or write, so only rank 0 will open the file.
XdmfInt32 DoOpen(
XdmfHeavyData* ds,
XdmfConstString name,
XdmfConstString access )
{
if ( mCommRank == 0 ) {
// Call the actual DoOpen method from XdmfHeavyData
return ds->DoOpen( name, access );
} else {
// Not rank 0, nothing to do.
return XDMF_SUCCESS;
}
}
/// Reimplemented from XdmfCloseCallback::DoClose. Again, only rank 0
/// needs to do anything.
XdmfInt32 DoClose( XdmfHeavyData* ds )
{
if ( mCommRank == 0 ) {
// Call the heavy dataset's native close method.
return ds->DoClose();
} else {
// not rank 0, nothing to do.
return XDMF_SUCCESS;
}
}
/// Reimplemented from XdmfWriteCallback::DoWrite. If the local
/// process ID is 0, then we expect to receive data from everyone
/// else. Otherwise, we send data. Rank 0 does all of the writing.
XdmfInt32 DoWrite( XdmfHeavyData* ds, XdmfArray* array )
{
// This implementation assumes process 0 has the same data
// size as everyone else
// set up a stream buffer that is large enough to hold the data to
// be sent or received. We require enough space for the rank,
// start, stride, and count for the array plus enough space to hold
// the actual array data.
XdmfInt64 start[1], stride[1], count[1];
XdmfInt32 slab_rank = ds->GetHyperSlab( start, stride, count );
std::size_t slab_info_size =
sizeof( XdmfInt32 ) // slab rank
+ slab_rank * sizeof( XdmfInt64 ) * 3; // start, stride, and count
Buffer buf( slab_info_size + array->GetCoreLength() );
// If the local process ID is nonzero, pack the buffer and send to
// process 0.
if ( mCommRank != 0 ) {
// copy rank and slab information into the buffer.
buf.put( slab_rank );
for ( int i = 0; i < slab_rank; ++i ) {
buf.put( start[i] );
buf.put( stride[i] );
buf.put( count[i] );
}
// copy the actual data into the buffer.
buf.put( array->GetDataPointer(), array->GetCoreLength() );
// send to rank 0 in the global communicator.
MPI_Send(
buf.pointer(),
buf.size(),
MPI_BYTE,
0,
0,
MPI_COMM_WORLD );
} else {
// Local process ID is 0, so we write local data then receive and
// write remote data.
// first, it's easy to write the local data
ds->DoWrite( array );
int processes_received = 1; // local data for process 0 is written
// loop until the data for all processes has been received.
while ( processes_received < mCommSize ) {
// Fill the buffer with incoming data. We are assuming here
// that all processes contain the same amount of data.
MPI_Recv(
buf.pointer(),
buf.size(),
MPI_BYTE,
MPI_ANY_SOURCE,
0,
MPI_COMM_WORLD,
0 );
processes_received++;
// unpack the buffer
buf.reset(); // reset the stream pointer to the beginning
// we mean rank in the linear algebra sense here
slab_rank = buf.tell< XdmfInt32 >();
// start, stride, and count are next.
for( int i = 0; i < slab_rank; ++i ) {
start[i] = buf.tell< XdmfInt64 >();
stride[i] = buf.tell< XdmfInt64 >();
count[i] = buf.tell< XdmfInt64 >();
}
// select the correct hyper slab in the heavy data set.
ds->SelectHyperSlab( start, stride, count );
// allocate an array to hold the off-core data.
XdmfArray* recv = new XdmfArray;
recv->CopyShape( array );
// place the received data into the array.
buf.tell( recv->GetDataPointer(), recv->GetCoreLength() );
// write the off core data to the dataset by calling the heavy
// data's DoWrite method.
ds->DoWrite( recv );
delete recv;
}
}
return XDMF_SUCCESS;
}
/// Reimplemented from XdmfReadCallback::DoRead. Read the data from a
/// heavy data source into an array only if the local process ID is 0.
/// Otherwise, do nothing.
XdmfArray* DoRead( XdmfHeavyData* ds, XdmfArray* array )
{
if ( mCommRank == 0 ) {
// defer to the actual heavy data implementation of DoRead.
return ds->DoRead( array );
} else {
// return an empty array.
return NULL;
}
}
}; // end of class CommunicationCallback
One of the useful features of the above code is that it relies only on the abstract portion of the XdmfHeavyData interface. Therefore, it is general enough to be used when writing to any heavy dataset (either HDF or MySQL). In the future, if other heavy data output formats are added, the same strategy could apply to them as well.
Now, we write a main program that generates the data, writes to an HDF dataset, and reads from the same file to verify the round trip.
char const * const kDatasetName = "FILE:TestFile.h5:/XdmfHDFMPI";
int main( int argc, char* argv[] ) {
// Initialize MPI
MPI_Init( &argc, &argv );
// determine the local process ID
int rank;
MPI_Comm_rank( MPI_COMM_WORLD, &rank );
// Create an HDF dataset to write out to.
XdmfHDF* H5 = new XdmfHDF();
// Attach an instance of the CommunicationCallback class defined above
// to customize opening, writing, and closing the dataset.
CommunicationCallback* cb = new CommunicationCallback;
H5->setOpenCallback( cb );
H5->setWriteCallback( cb );
H5->setCloseCallback(cb );
// Allocate an array of size 25 and fill it with values depending
// on the local process ID (process 0 has 0-24, process 1 has
// 25-49,...)
XdmfArray* MyData = new XdmfArray();
MyData->SetNumberType( XDMF_FLOAT32_TYPE );
MyData->SetNumberOfElements( 25 );
MyData->Generate( rank * 25, rank*25 + 24 );
// Set the dataset's type to the array's type
H5->CopyType( MyData );
// Set up the dimensions of the dataset.
XdmfInt64 dims[1];
dims[0] = 100;
H5->SetShape( 1, dims );
// Set up the hyper slab for the dataset. Each process will select a
// distinct portion of the dataset (process 0 has 0-24, process 1
// 25-49,...)
XdmfInt64 start[1], stride[1], count[1];
start[0] = rank * 25;
stride[0] = 1;
count[0] = 25;
// select the slab in the dataset corresponding to the local process.
H5->SelectHyperSlab( start, stride, count );
// Open the dataset.
H5->Open( kDatasetName, "w" );
// Write the array.
H5->Write( MyData );
// Close the dataset.
H5->Close();
// writing is complete, now we have a quick test to ensure the data
// survives a round trip.
bool failure = false;
// Create a new dataset to read from
XdmfHDF* H5In = new XdmfHDF();
// Use the same callback.
H5In->setReadCallback( cb );
H5In->setOpenCallback( cb );
H5In->setCloseCallback( cb );
// Open the dataset for reading.
H5In->Open( kDatasetName, "r" );
// Read the data into an array.
XdmfArray* result = H5In->Read();
// if the read was successful, make sure the values for all processes
// are in the file.
if ( result ) {
for ( size_t i = 0; i < 100; ++i ) {
float value = result->GetValueAsFloat32( i );
std::cout << i << " " << value << std::endl;
failure = ( value != i );
}
}
MPI_Finalize();
delete H5;
delete cb;
delete MyData;
delete H5In;
delete result;
if ( failure ) {
return -1;
} else {
return 0;
}
};
Running the program in an MPI environment with 4 processes produces a single HDF5 file TestFile.h5 with a single dataset XdmfHDFMPI containing the numbers 0 to 99. The full source of this example is available in the Xdmf source tree.