[saga-rg] Re: saga update... (iii)

Andre Merzky andre at merzky.net
Sat Mar 18 04:55:42 CST 2006


Ok, lets do the async call, again with an example taken from
the NinfG tutorial (pi_client_multi.c, attached with changed
indentation).  The SAGA version, again in C++, is below:

-----------------------------------------------------------
#include <saga.hpp>

string func_name   = "pi/pi_trial";
string config_file = "file:///client.conf";

int main (int argc, char *argv[])
{
  if ( argc < 3 )
  {
    std::cout << "\n\t";
    std::cout << "USAGE: " << argv[0] << " TIMES HOSTNAME\n";
    std::cout << "USAGE: " << argv[0] << " TIMES HOSTNAME1 HOSTANME2...\n\n";
    exit (2);
  }

  // initialize vars from CL arguments
  int  n     = argc - 2;
  long times = atol (argv[1]) / n;

  std::vector <std::string> hosts;

  for ( int i = 0; i < n; i++ )
  {
    hosts[i] = (argv[i+2]);
  }

  try 
  {
    std::vector <long>   count;
    saga::task_container tc;

    for ( int i = 0; i < n; i++ )
    {
      // create rpc handle
      saga::rpc  handle (hosts[i], func_name, config_file);

      // perform async call
      saga::task t = handle.call <saga::async> (i, times, &count[i]);

      // add task to task container
      tc.add (t);
    }

    // wait tasks (for ever, for all)
    tc.wait (-1.0, saga::task::All);

    // Compute and display pi.
    int sum = 0;
    for ( i = 0; i < n; i++ )
    {
      sum += count[i];
    }

    std::cout << "PI = " 
              << 4.0 * (sum / ((double) times * n)))
              << std::endl;
  }
  catch ( const saga::exception & e )
  {
    std::cerr << "Found saga error: " 
              << e.what () 
              << std::endl;
    exit (2);
  }

  return (0);
}
-----------------------------------------------------------

I realised that I forgot the config_file argument to the
handle creation in the last (synchroneous) example - it
should look like above.  That actually _is_ a difference to
GridRPC: as the session in SAGA is not RPC specific, it
makes no sense to have a RPC specific config file for a
session.  Hence, the config file would need to be attached
to the handle creation.

The example code above lives from the task container: that
allows to collect multiple asynchroneous calls to be handled
together.  

Best regards, 

  Andre.


PS.: as a side note, we do have the above form of RPC
included in our saga reference implementation. However, 
it does not yet forward the calls to a NinfG or GridSolve
adaptor.  If anyone in this group has interest in connecting
the reference implementation to a real GridRPC backend,
please let us know :-)

-- 
"So much time, so little to do..."  -- Garfield
-------------- next part --------------
/* This is sample program for Ninf-G, calculate PI on multi servers */
#include "grpc.h"

char * func_name   = "pi/pi_trial";
char * config_file = "client.conf";
int    port = 0;

int main (int argc, char *argv[])
{
  grpc_function_handle_t * handles;
  grpc_sessionid_t       * ids    = NULL;
  grpc_error_t             result = GRPC_NO_ERROR;
  char                  ** hosts  = NULL;
  double                   pi;
  long                   * count = NULL;
  long                     times, sum;
  int                      n;
  int                      i, j;

  if ( argc < 3 )
  {
    fprintf (stderr, "USAGE: %s TIMES HOSTNAME\n",               argv[0]);
    fprintf (stderr, "USAGE: %s TIMES HOSTNAME1 HOSTANME2...\n", argv[0]);
    exit (2);
  }

  n     = argc - 2;
  hosts = (char **) malloc (sizeof (char *) * n);
  
  if ( hosts == NULL ) 
  {
    fprintf (stderr, "malloc: Can't allocate the storage for hostname.\n");
    exit (2);
  }

  for ( i = 2, j = 0; j < n; i++, j++ )
  {
    hosts[j] = NULL;
    hosts[j] = strdup (argv[i]);
    
    if ( *hosts == NULL )
    {
      fprintf (stderr, "strdup: Can't allocate the storage for hostname.\n");
      exit (2);
    }
  }

  times = atol (argv[1]) / n;

  handles = (grpc_function_handle_t *) malloc 
    (sizeof (grpc_function_handle_t  ) * n);
  
  if ( handles == NULL ) 
  {
    fprintf (stderr, "malloc: Can't allocate the storage for handles.\n");
    exit (2);
  }

  count = (long *) malloc (sizeof (long) * n);
  
  if ( count == NULL ) 
  {
    fprintf (stderr, "malloc: Can't allocate the storage.\n");
    exit (2);
  }

  ids = (grpc_sessionid_t *) malloc (sizeof (grpc_sessionid_t) * n);
  
  if ( ids == NULL )
  {
    fprintf (stderr, "malloc: Can't allocate the storage.\n");
    exit (2);
  }

  /* Initialize */
  result = grpc_initialize (config_file);
  
  if ( result != GRPC_NO_ERROR )
  {
    fprintf (stderr, "grpc_initialize() error. (%s)\n",
             grpc_error_string (result));
    exit (2);
  }

  /* Initialize Function handles */
  for ( i = 0; i < n; i++ )
  {
    result = grpc_function_handle_init (&handles[i], hosts[i], func_name);
    
    if ( result != GRPC_NO_ERROR )
    {
      fprintf (stderr, "grpc_function_handle_init() error. (%s)\n",
               grpc_error_string (result));
      exit (2);
    }      
  }

  /* Asynchronous call */
  for ( i = 0; i < n; i++ )
  {
    result = grpc_call_async (&handles[i], &ids[i], i, times, &count[i]);
    
    if ( result != GRPC_NO_ERROR )
    {
      fprintf (stderr, "grpc_call_async() error. (%s)\n",
               grpc_error_string (result));
      exit (2);
    }
  }

  /* wait for session */
  result = grpc_wait_all ();

  if ( result != GRPC_NO_ERROR )
  {
    fprintf (stderr, "grpc_wait_all() error. (%s)\n",
             grpc_error_string (result));
    exit (2);
  }

  /* Destruct Function handles */
  for ( i = 0; i < n; i++ )
  {
    result = grpc_function_handle_destruct (&handles[i]);
    
    if ( result != GRPC_NO_ERROR ) 
    {
      fprintf (stderr, "grpc_function_handle_destruct() error. (%s)\n",
               grpc_error_string (result));
      exit (2);
    }
  }

  /* Compute and display pi. */
  for ( i = 0, sum = 0; i < n; i++ )
  {
    sum += count[i];
  }

  printf ("PI = %f\n", 4.0 * (sum / ((double) times * n)));

  /* Finalize */
  result = grpc_finalize ();

  if ( result != GRPC_NO_ERROR )
  {
    fprintf (stderr, "grpc_finalize() error. (%s)\n",
             grpc_error_string (result));
    exit (2);
  }

  for ( i = 0; i < n; i++ )
  {
    free (hosts[i]);
  }

  free (hosts);
  free (handles);
  free (count);
  free (ids);

  return 0;
}



More information about the saga-rg mailing list