/*
 * Copyright (C) 2008 GEOLAB Ltd. ( http://www.geo-lab.ru )
 * 
 * Contents: a program to test shared disk I/O performance on clusters
 *
 * Compilation: 
 *
 *   mpicc -D_LARGEFILE64_SOURCE -D_FILE_OFFSET_BITS=64 file_cluster_benchmark.c
 *
 * Usage:
 *
 *   mpirun -np <number-of-nodes> ./file_cluster_benchmark parameters
 *
 * Parameters (space separated, in the following order):
 *     
 *   1. Size of the file to read/write on each node, gigabytes
 *   2. Size of the block every read/write operates with, bytes
 *   3. Read every given block when testing sort-like reading
 *   4. Directory where to create/write/read files
 *
 * Description:
 *
 *    The program is started with the MPI mechanizm on a number of nodes.
 *    For each node it creates a file in the given directory and writes it
 *    by blocks of the given size until file's size exceeds the given value.
 *    The disk write rates are measured, and minium/maximum/average rates
 *    over all nodes are printed out. After that, the program reads these
 *    files successively, and minium/maximum/average read rates are printed
 *    out. Finally, the program starts reading the files on each node in
 *    the so-called "sorting" mode: read block #1, then #101, etc., up
 *    to the end of file, then read blocks #2, #102, and so on. The read
 *    rates for this mode are also printed out. Be prepared that the last
 *    test may take a while (up to 50-100 times slower than in successive
 *    reading).
 *
 * Example:
 *
 *   Test disk I/O for the cluster file system, mounted as /data, on 64 
 *   nodes with 16 GB file for each of them using block size 4096 bytes 
 *   and acccessing every 100th trace in the 'sorting' test:
 *
 *   mpirun -np 64 ./file_cluster_benchmark 16 4096 100 /data
 *
 *   On the SKIF-MSU supercomputer, use "-as one_per_node" parameter
 *   for mpirun to ensure each node runs only one MPI process.
 *
 *   mpirun -np 64 -as one_per_node ...
 *     
 * Author: KuE ( ekurin@geo-lab.ru )
 *
 * Date: 2008-12-28
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <time.h>
#include <mpi.h>

#define PROGRAM "file_cluster_benchmark"
#define BUF_SIZE 1024
#define ONE_KB ((off_t)1024)
#define ONE_MB ((off_t)1048576)
#define ONE_GB ((off_t)1073741824)
#define MAX_MEM_BLOCKS 1024

void print_usage ()
{
    fprintf(stderr,
      "\nUsage: %s <file-size,GB> <block-size,B> <stride> <work-dir>\n\n", 
      PROGRAM);
}

void mem_clean ()
{
  int i = 0, j = 0;
  char * mem_blocks[MAX_MEM_BLOCKS];
  memset(mem_blocks, 0, MAX_MEM_BLOCKS * sizeof(char *));

  for (i = 0; i < MAX_MEM_BLOCKS; i++)
  {
    if ( (mem_blocks[i] = (char *)malloc(ONE_GB)) == NULL )
    {
      /*fprintf(stderr, "mem_clean: info: %d GB allocated\n", i);*/
      break;
    }
    for (j = 0; j < ONE_GB; j++)
      mem_blocks[i][j] = (char)i;
  }

  for (i = 0; i < MAX_MEM_BLOCKS; i++)
    if ( mem_blocks[i] != NULL ) 
      free(mem_blocks[i]);
}

int main (int argc, char ** argv)
{
  int ret = 0;
  int fsize_gb = 0; /* file size, gigabytes */
  int bsize = 0; /* block size, bytes */
  int stride = 0; /* stride, blocks*/
  char wd[BUF_SIZE]; /* working directory */

  char buf[BUF_SIZE];
  struct stat statbuf;
  int st = 0, n = 0, i = 0, fd = -1, j = 0, from = 0;
  char * block = NULL;
  time_t t_start, t_stop, t_elapsed;
  off_t bytes = 0;
  double rate = 0.0, rate_min = 0.0, rate_max = 0.0, rate_avg = 0.0;
  double * rates = NULL;
  char node_name[MPI_MAX_PROCESSOR_NAME];
  char * node_names = NULL;

  if ( argc == 5 )
  {
    /* 
     * get/check/init program parameters
     */
    fsize_gb = atoi(argv[1]);
    bsize = atoi(argv[2]);
    stride = atoi(argv[3]);
    strncpy(wd, argv[4], BUF_SIZE - 1);

    if ( fsize_gb < 1 )
    {
      fprintf(stderr, "%s: error: file size is less than 1 GB\n", PROGRAM);
      return 1;
    }

    if ( bsize < 1 )
    {
      fprintf(stderr, "%s: error: block size is less than 1 byte\n", PROGRAM);
      return 1;
    }

    n = ONE_GB * (off_t)fsize_gb / bsize;
    block = (char *)malloc(bsize);
    if ( block == NULL )
    {
      fprintf(stderr, "%s: error: malloc failed for block: %s\n", 
          PROGRAM, strerror(errno));
      return 1;
    }

    if ( stride < 2 )
    {
      fprintf(stderr, "%s: error: stride is less than 2\n", PROGRAM);
      return 1;
    }

    st = stat (wd, &statbuf);
    if ( st != 0 )
    {
      fprintf(stderr, "%s: error: accessing %s: %s\n", 
          PROGRAM, wd, strerror(errno));
      return 1;
    }
    else if ( ! S_ISDIR(statbuf.st_mode) )
    {
      fprintf(stderr, "%s: error: %s is not a directory\n", PROGRAM, wd);
      return 1;
    }
    if ( access (wd, W_OK) != 0 )
    {
      fprintf(stderr, "%s: error: cannot write to directory %s\n", 
          PROGRAM, wd);
      return 1;
    }

    /*
     * start MPI
     */
    if ( MPI_Init (&argc, &argv) == MPI_SUCCESS )
    {
      int comm_size = 1;
      int rank = 0;
      MPI_Comm_size (MPI_COMM_WORLD, &comm_size);
      MPI_Comm_rank (MPI_COMM_WORLD, &rank);
      node_names = (char *)calloc(comm_size * MPI_MAX_PROCESSOR_NAME,
          sizeof(char) );
      memset(node_name, 0, MPI_MAX_PROCESSOR_NAME);
      MPI_Get_processor_name(node_name, &i);
      MPI_Gather(node_name, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, 
          node_names, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, 0, 
          MPI_COMM_WORLD);
      rates = (double *)calloc(comm_size, sizeof(double));
      /*
       * form file name according to the rank
       */
      strncpy(buf, wd, BUF_SIZE - 1);
      if ( buf[strlen(buf) - 1] != '/' ) strncat(buf, "/", BUF_SIZE - 1);
      snprintf (buf + strlen(buf), BUF_SIZE - 1, "%-d", rank);
      /*
       * open file for writing
       */
      fd = open (buf, O_CREAT | O_WRONLY | O_LARGEFILE | O_TRUNC,
         S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
      if ( fd != -1 )
      {
        /*
         * write file
         */
        MPI_Barrier(MPI_COMM_WORLD);
        t_start = time (NULL);
        for (i = 0, bytes = 0; i < n; i++, bytes += bsize)
        {
          if ( write (fd, block, bsize) < bsize ) 
          {
            fprintf (stderr, "%s: error: writing %s failed: %s\n", 
              PROGRAM, buf, strerror(errno));
            ret = 1;
            break;
          }
        }
        close(fd); fd = -1;
        t_stop = time (NULL);
        /*
         * gather from all nodes and printout write performance figures
         */
        t_elapsed = t_stop - t_start;
        rate = (double)bytes / t_elapsed / (double)ONE_MB;
        MPI_Reduce (&rate, &rate_min, 1, MPI_DOUBLE, MPI_MIN, 0, 
            MPI_COMM_WORLD);
        MPI_Reduce (&rate, &rate_max, 1, MPI_DOUBLE, MPI_MAX, 0, 
            MPI_COMM_WORLD);
        MPI_Reduce (&rate, &rate_avg, 1, MPI_DOUBLE, MPI_SUM, 0, 
            MPI_COMM_WORLD);
        rate_avg /= (double)comm_size;
        MPI_Gather(&rate, 1, MPI_DOUBLE, rates, 1, MPI_DOUBLE, 0, 
            MPI_COMM_WORLD);
        if ( rank == 0 )
        {
          printf("\nWriting %d GB by %d nodes:\n\n", fsize_gb, comm_size);
          printf("\tmin rate = %-.1lf MB/sec\n", rate_min);
          printf("\tmax rate = %-.1lf MB/sec\n", rate_max);
          printf("\tavg rate = %-.1lf MB/sec\n\n", rate_avg);
          for (i = 0; i < comm_size; i++)
          {
            printf("\tnode = %-d\thost = %-s\trate = %-.1lf MB/sec\n", 
                i, node_names + i * MPI_MAX_PROCESSOR_NAME, rates[i]);
          }
          printf("\n");
        }
        if ( ret == 0 ) /* no previous error */
        {
          /*
           * re-open file for reading
           */
          fd = open (buf, O_RDONLY | O_LARGEFILE);
          if ( fd != -1 )
          {        
            /*
             * read file successively
             */
            mem_clean ();
            MPI_Barrier(MPI_COMM_WORLD);
            t_start = time (NULL);
            for (i = 0, bytes = 0; i < n; i++, bytes += bsize)
            {
              if ( read (fd, block, bsize) < bsize ) 
              {
                fprintf (stderr, "%s: error: reading %s failed: %s\n", 
                  PROGRAM, buf, strerror(errno));
                ret = 1;
                break;
              }
            }
            t_stop = time (NULL);
            /*
             * gather from all nodes and printout read performance figures
             */
            t_elapsed = t_stop - t_start;
            rate = (double)bytes / t_elapsed / (double)ONE_MB;
            MPI_Reduce (&rate, &rate_min, 1, MPI_DOUBLE, MPI_MIN, 0, 
                MPI_COMM_WORLD);
            MPI_Reduce (&rate, &rate_max, 1, MPI_DOUBLE, MPI_MAX, 0, 
                MPI_COMM_WORLD);
            MPI_Reduce (&rate, &rate_avg, 1, MPI_DOUBLE, MPI_SUM, 0, 
                MPI_COMM_WORLD);
            rate_avg /= (double)comm_size;
            MPI_Gather(&rate, 1, MPI_DOUBLE, rates, 1, MPI_DOUBLE, 0, 
                MPI_COMM_WORLD);
            if ( rank == 0 )
            {
              printf("\nSuccessive reading %d GB by %d nodes:\n\n", 
                  fsize_gb, comm_size);
              printf("\tmin rate = %-.1lf MB/sec\n", rate_min);
              printf("\tmax rate = %-.1lf MB/sec\n", rate_max);
              printf("\tavg rate = %-.1lf MB/sec\n\n", rate_avg);
              for (i = 0; i < comm_size; i++)
              {
                printf("\tnode = %-d\thost = %-s\trate = %-.1lf MB/sec\n", 
                    i, node_names + i * MPI_MAX_PROCESSOR_NAME, rates[i]);
              }
              printf("\n");
            }
            /*
             * read file like in sorting
             */
            mem_clean ();
            MPI_Barrier(MPI_COMM_WORLD);
            t_start = time (NULL);
            for (i = 0, from = 0, j = 0, bytes = 0; i < n; 
                i++, j++, bytes += bsize)
            {
              off_t pos = 0;
              int num = from + j * stride;
              if ( num >= n )
              {
                from++;
                j = 0;
                num = from;
              }
              pos = (off_t)num * bsize;
              if ( lseek(fd, pos, SEEK_SET) == (off_t)-1 ) 
              {
                fprintf(stderr, "%s: error: lseek on %s failed: %s\n",
                    PROGRAM, buf, strerror(errno));
                ret = 1;
                break;
              }
              if ( read (fd, block, bsize) < bsize ) 
              {
                fprintf (stderr, "%s: error: reading %s failed: %s\n", 
                  PROGRAM, buf, strerror(errno));
                ret = 1;
                break;
              }
            }
            close(fd); fd = -1;
            t_stop = time (NULL);
            /*
             * gather from all nodes and printout read performance figures
             */
            t_elapsed = t_stop - t_start;
            rate = (double)bytes / t_elapsed / (double)ONE_MB;
            MPI_Reduce (&rate, &rate_min, 1, MPI_DOUBLE, MPI_MIN, 0, 
                MPI_COMM_WORLD);
            MPI_Reduce (&rate, &rate_max, 1, MPI_DOUBLE, MPI_MAX, 0, 
                MPI_COMM_WORLD);
            MPI_Reduce (&rate, &rate_avg, 1, MPI_DOUBLE, MPI_SUM, 0, 
                MPI_COMM_WORLD);
            rate_avg /= (double)comm_size;
            MPI_Gather(&rate, 1, MPI_DOUBLE, rates, 1, MPI_DOUBLE, 0, 
                MPI_COMM_WORLD);
            if ( rank == 0 )
            {
              printf("\nSort-like reading %d GB by %d nodes:\n\n", 
                  fsize_gb, comm_size);
              printf("\tmin rate = %-.1lf MB/sec\n", rate_min);
              printf("\tmax rate = %-.1lf MB/sec\n", rate_max);
              printf("\tavg rate = %-.1lf MB/sec\n\n", rate_avg);
              for (i = 0; i < comm_size; i++)
              {
                printf("\tnode = %-d\thost = %-s\trate = %-.1lf MB/sec\n", 
                    i, node_names + i * MPI_MAX_PROCESSOR_NAME, rates[i]);
              }
              printf("\n");
            }
          }
          else
          {
            fprintf (stderr, "%s: error: opening %s for reading failed: %s\n", 
                PROGRAM, buf, strerror(errno));
            ret = 1;
          }
        }
      }
      else
      {
        fprintf (stderr, "%s: error: opening %s for writing failed: %s\n", 
            PROGRAM, buf, strerror(errno));
        ret = 1;
      }
      /*
       * stop MPI environment
       */
      MPI_Finalize ();
    }
    else
    {
      fprintf (stderr, "%s: error: MPI_Init failed\n", PROGRAM);
      ret = 1;
    }
  }
  else
  {
    fprintf (stderr, "%s: error: too few parameters\n", PROGRAM);
    print_usage ();
    ret = 1;
  }
  if ( block != NULL ) free(block);
  if ( rates != NULL ) free(rates);
  if ( node_names != NULL ) free(node_names);
  return ret;
}
Hosted by uCoz