Skip to content

Commit

Permalink
Merge pull request #294 from tim-griesbach/feature-collective-io
Browse files Browse the repository at this point in the history
Use collective writes for VTU files
  • Loading branch information
scottaiton authored Jan 3, 2024
2 parents af00694 + 4307acc commit adb290e
Showing 1 changed file with 154 additions and 22 deletions.
176 changes: 154 additions & 22 deletions src/patches/clawpatch/fclaw2d_clawpatch_output_vtk.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#define PATCH_DIM 2
#endif

/** Round up quotient x / y. */
#define FCLAW_VTK_CEIL(x, y) (((x) + (y) - 1) / (y)) /**< round up the quotient
x / y; this is a local
macro used for
computations regarding
the VTK patch buffering */

#if REFINE_DIM == 2 && PATCH_DIM == 2

#include <fclaw2d_clawpatch_output_vtk.h>
Expand Down Expand Up @@ -59,6 +66,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#include <fclaw2d_options.h>
#include <fclaw2d_map.h>
#include <sc_io.h>

typedef struct fclaw2d_vtk_state
{
Expand Down Expand Up @@ -91,6 +99,23 @@ typedef struct fclaw2d_vtk_state
MPI_Offset mpibegin;
#endif
char *buf;

/* The following three struct elements are dedicated to manage buffering
* patches.
* We track the number of buffered patches in \b num_buffered_patches and
* if the buffer contains more than \b patch_threshold many patches, the
* patches are written to disk and the buffer is flushed.
* The variable \b num_buffered_patches relates to the number of buffered
* patches during one fclaw2d_vtk_write_field call.
* If \b patch_threshold is -1, there are no intermediate writes but all
* patches during a fclaw2d_vtk_write_field are buffered and then written
* to the disk.
*/
sc_io_sink_t *sink; /**< the sink to manage the patches buffer */
int num_buffered_patches; /**< the number of patches buffered during the
current fclaw2d_vtk_write_field call */
int patch_threshold; /**< maximal number of patches that are buffered during
a call of fclaw2d_vtk_write_field */
}
fclaw2d_vtk_state_t;

Expand Down Expand Up @@ -194,28 +219,71 @@ fclaw2d_vtk_write_header (fclaw2d_domain_t * domain, fclaw2d_vtk_state_t * s)
return retval ? -1 : 0;
}

/**
* @brief Write the buffer to file
/** This function adds to a buffer and writes to file if a threshold is exceeded.
*
* This function assumes that the buffer consists of patch data of the size
* \b psize_field. This means it must hold
* \b s->sink->buffer_bytes % \b s->sink->buffer_bytes == 0.
*
* @param s the vtk state
* @param psize_field the size of the buffer
* \param [in,out] s The VTK state.
* \param [in] psize_field The number of bytes, which are intended to
* add to the buffer.
* \param [in] threshold If the number of patches, which are already
* stored in the buffer + 1 is greater than
* \b threshold, the buffer is flushed to disk.
* Then the new patch is added to the buffer.
* -1 means that there is no threshold and
* the data just added to \b s->sink.
*/
static void
write_buffer (fclaw2d_vtk_state_t * s, int64_t psize_field)
add_to_buffer (fclaw2d_vtk_state_t * s, int64_t psize_field)
{
#ifndef P4EST_ENABLE_MPIIO
size_t retvalz;
FCLAW_ASSERT (s != NULL);

retvalz = fwrite (s->buf, psize_field, 1, s->file);
SC_CHECK_ABORT (retvalz == 1, "VTK file write failed");
#else
int mpiret;
#ifdef P4EST_ENABLE_MPIIO
MPI_Status mpistatus;
#else
size_t retvalz;
#endif
FCLAW_ASSERT (s->patch_threshold == -1 || s->patch_threshold > 0);
FCLAW_ASSERT (((int) s->sink->buffer_bytes) % psize_field == 0);

mpiret = MPI_File_write (s->mpifile, s->buf, psize_field, MPI_BYTE,
&mpistatus);
SC_CHECK_MPI (mpiret);
if (s->patch_threshold != -1
&& (s->num_buffered_patches + 1 > s->patch_threshold))
{
FCLAW_ASSERT ((int) s->sink->buffer_bytes >= psize_field);
FCLAW_ASSERT (((int) s->sink->buffer_bytes) / psize_field ==
s->num_buffered_patches);
/* buffer threshold exceeded */
/* write the current buffer to disk */
#ifdef P4EST_ENABLE_MPIIO
if (s->num_buffered_patches > 0)
{
mpiret =
MPI_File_write_all (s->mpifile, s->sink->buffer->array,
(int) s->sink->buffer_bytes, MPI_BYTE,
&mpistatus);
SC_CHECK_MPI (mpiret);
}
#else
retvalz =
fwrite (s->sink->buffer->array, s->sink->buffer_bytes, 1,
s->file);
SC_CHECK_ABORT (retvalz == 1, "VTK file write failed");
#endif
/* reset the sink */
mpiret = sc_io_sink_complete (s->sink, NULL, NULL);
SC_CHECK_ABORT (mpiret == 0, "VTK buffer completion failed");
/* reset the buffer */
sc_array_reset (s->sink->buffer);
s->sink->buffer_bytes = 0;
s->num_buffered_patches = 0;
}

mpiret = sc_io_sink_write (s->sink, s->buf, (size_t) psize_field);
SC_CHECK_ABORT (mpiret == 0, "VTK buffer write failed");
++s->num_buffered_patches;
}

static void
Expand All @@ -226,7 +294,7 @@ write_position_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch,
fclaw2d_vtk_state_t *s = (fclaw2d_vtk_state_t *) g->user;

s->coordinate_cb (g->glob, patch, blockno, patchno, s->buf);
write_buffer (s, s->psize_position);
add_to_buffer (s, s->psize_position);
}

static void
Expand Down Expand Up @@ -322,7 +390,7 @@ write_connectivity_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch,
}
#endif
}
write_buffer (s, s->psize_connectivity);
add_to_buffer (s, s->psize_connectivity);
}

static void
Expand Down Expand Up @@ -354,7 +422,7 @@ write_offsets_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch,
*idata++ = k;
}
}
write_buffer (s, s->psize_offsets);
add_to_buffer (s, s->psize_offsets);
}

static void
Expand All @@ -374,7 +442,7 @@ write_types_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch,
*cdata++ = 12;
#endif
}
write_buffer (s, s->psize_types);
add_to_buffer (s, s->psize_types);
}

static void
Expand All @@ -390,7 +458,7 @@ write_mpirank_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch,
{
*idata++ = domain->mpirank;
}
write_buffer (s, s->psize_mpirank);
add_to_buffer (s, s->psize_mpirank);
}

static void
Expand All @@ -406,7 +474,7 @@ write_blockno_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch,
{
*idata++ = blockno;
}
write_buffer (s, s->psize_blockno);
add_to_buffer (s, s->psize_blockno);
}

static void
Expand Down Expand Up @@ -437,7 +505,7 @@ write_patchno_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch,
*idata++ = gpno;
}
}
write_buffer (s, s->psize_patchno);
add_to_buffer (s, s->psize_patchno);
}

static void
Expand All @@ -449,7 +517,7 @@ write_meqn_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch,
fclaw2d_vtk_state_t *s = (fclaw2d_vtk_state_t *) g->user;

s->value_cb (g->glob, patch, blockno, patchno, s->buf);
write_buffer (s, s->psize_meqn);
add_to_buffer (s, s->psize_meqn);
}

static void
Expand All @@ -460,9 +528,12 @@ fclaw2d_vtk_write_field (fclaw2d_global_t * glob, fclaw2d_vtk_state_t * s,
fclaw2d_domain_t *domain = glob->domain;

int64_t bcount;
sc_array_t buffer;
#ifndef P4EST_ENABLE_MPIIO
size_t retvalz;
#else
int i;
int max_num_writes, local_num_writes;
int mpiret;
MPI_Offset mpipos;
#ifdef P4EST_ENABLE_DEBUG
Expand All @@ -472,6 +543,12 @@ fclaw2d_vtk_write_field (fclaw2d_global_t * glob, fclaw2d_vtk_state_t * s,
#endif

s->buf = P4EST_ALLOC (char, psize_field);
/* The buffer array will be resized when required. */
sc_array_init_size (&buffer, 1, psize_field);
s->sink = sc_io_sink_new (SC_IO_TYPE_BUFFER, SC_IO_MODE_APPEND,
SC_IO_ENCODE_NONE, &buffer);
FCLAW_ASSERT (s->num_buffered_patches == 0);
s->num_buffered_patches = 0;
#ifdef P4EST_ENABLE_MPIIO
mpipos = s->mpibegin + offset_field;
if (domain->mpirank > 0)
Expand All @@ -495,12 +572,59 @@ fclaw2d_vtk_write_field (fclaw2d_global_t * glob, fclaw2d_vtk_state_t * s,
retvalz = fwrite (&bcount, s->ndsize, 1, s->file);
SC_CHECK_ABORT (retvalz == 1, "VTK file write failed");
#else
mpiret = MPI_File_write (s->mpifile, &bcount, 1, MPI_LONG, &mpistatus);
mpiret =
MPI_File_write (s->mpifile, &bcount, 1, MPI_LONG, &mpistatus);
SC_CHECK_MPI (mpiret);
#endif
}
fclaw2d_global_iterate_patches (glob, cb, s);

#ifdef P4EST_ENABLE_MPIIO
if (s->num_buffered_patches > 0 || s->patch_threshold == -1)
{
/* write the remaining buffered bytes */
mpiret =
MPI_File_write_all (s->mpifile, buffer.array,
(int) s->sink->buffer_bytes, MPI_BYTE,
&mpistatus);
SC_CHECK_MPI (mpiret);
}

if (s->patch_threshold != -1)
{
/* Ensure that the collective function MPI_File_write_all is called
* equally often on each rank.
*/
max_num_writes =
FCLAW_VTK_CEIL (glob->domain->local_max_patches,
s->patch_threshold);
local_num_writes =
FCLAW_VTK_CEIL (glob->domain->local_num_patches,
s->patch_threshold);
FCLAW_ASSERT (max_num_writes - local_num_writes >= 0);

for (i = 0; i < max_num_writes - local_num_writes; ++i)
{
/* This rank has less patches than the rank that holds the maximal
* number of patches. We compensate this by empty write calls to avoid
* a deadlock.
*/
mpiret =
MPI_File_write_all (s->mpifile, buffer.array, 0, MPI_BYTE,
&mpistatus);
SC_CHECK_MPI (mpiret);
}
}
#else
retvalz = fwrite (buffer.array, s->sink->buffer_bytes, 1, s->file);
SC_CHECK_ABORT (retvalz == 1, "VTK file write failed");
#endif

/* free buffers */
sc_array_reset (&buffer);
sc_io_sink_destroy (s->sink);
P4EST_FREE (s->buf);
s->num_buffered_patches = 0;

#ifdef P4EST_ENABLE_MPIIO
#ifdef P4EST_ENABLE_DEBUG
Expand Down Expand Up @@ -662,6 +786,14 @@ fclaw2d_vtk_write_file (fclaw2d_global_t * glob, const char *basename,
s->offset_end = s->ndsize +
s->offset_meqn + s->psize_meqn * domain->global_num_patches;

s->buf = NULL;
s->sink = NULL;
/* The threshold may be adjusted; see the documentation of
* fclaw2d_vtk_state_t for further information.
*/
s->patch_threshold = -1;
s->num_buffered_patches = 0;

/* write header meta data and check for error */
retval = 0;
if (domain->mpirank == 0)
Expand Down

0 comments on commit adb290e

Please sign in to comment.