[saga-rg] SAGA: streams and monitoring

Andre Merzky andre at merzky.net
Thu Feb 23 03:54:51 CST 2006


Hi guys, 

I treid to finalize monitoring in SAGA lately, and in
particular the combination with streams is challengine, as
there is a monitoring mechanism already present in streams
(watch).

Anyway, here is a code example for the current callback
mechanism in the strawman API.  Could you please give some
feedback if that seem sensible to you, or if that would
exclude important use cases etc?

Thanks, Andre.



 Example for async stream server (C++):


    // my callback container class
    class my_cb : public saga::callback
    {
      privat:
        // we keep a stream server and a single client stream
        saga::stream_server ss;
        saga::stream        s;
      
      public:
        // constructor initialises these (note that the
        // client stream should be not connected at this
        // point
        my_cb (saga::stream_server ss_, 
               saga::stream        s_)
        {
          ss = ss_; 
          s   = s_; 
        }

        // the callback gets called on any
        bool callback (saga::metric m)
        {
          // the stream server got an event triggered, and
          // should be able to create a client socket now.
          s = ss.wait ();

          if ( s.status == saga::stream::open )
          {
            // have a client stream, we are done
            // don't call this cb again!
            return (true);
          }

          // no valid client stream obtained: keep this
          // callback alive, and get called again on the
          // next metric event.
          return (false);
        }
     }

     int main ()
     {
       // create a stream server, and an not connected
       // stream
       saga::stream_server ss;
       saga::stream        s;

       // give both to our callback class, and register that
       // callback with the 'client_connect' metric of the
       // server.  That causes the callback to be invoked on
       // every change of that metric, i.e. on every event
       // that changes that metric, i.e. on every client
       // connect attempt.
       my_cb cb (ss, s);
       ss.add_callback ("client_connect", cb);

       // now we serve incoming clients forever
       while ( true )
       {
         // check if a new client is connected 
         // the stream status would then be Open
         if ( s.status == saga::stream::Open )
         {
           // a client got conncted!
           // handle open socket
           s.write ("You say hello, I say good bye!\r\n", 32); 

           // and close stream
           s.close ();

           // the stream is not Open anymore.  We re-add the
           // callback, and hence wait for the next client
           // to connect.
           ss.add_callback ("client_connect", cb);
         }
         else
         {
           // no client yet, idle, or do something useful
           sleep (1);
         }
       }
 
       // we should never get here
       return (-1); 
     }

Note that the example above looks long, but is only ~25
lines (nonempty/no-comment).  

Also note, that the callback get called immediately after
getting added to the metric if a client connected to the ss
before the cb got added - no client connections pass
unhandled.  That is similar to a stream server returning
from waitForClient immediately if a client connected
earlier.

Also note that locking should be applied - there are race
conditions if 2 clients connect at the same time.  Well, its
just an example...

Cheers, Andre.





 class my_cb : public saga::callback
 {
     saga::stream_server ss;
     saga::stream        s;
   
   public:
     my_cb (saga::stream_server ss_, saga::stream s_) {
       ss = ss_; s = s_; 
     }

     bool callback (saga::metric m) {
       s = ss.wait ();
       if ( s.status == saga::stream::open ) {
         return (true);
       }
       return (false);
     }
  }

  int main ()
  {
    saga::stream_server ss;
    saga::stream        s;
    my_cb               cb (ss, s);
    ss.add_callback ("client_connect", cb);

    while ( true ) {
      if ( s.status == saga::stream::Open ) {
        s.write ("You say hello, I say good bye!\r\n", 32); 
        s.close ();
        ss.add_callback ("client_connect", cb);
      } 
      else {
        sleep (1);
      }
    }
    return (-1); 
  }
-- 
"So much time, so little to do..."  -- Garfield





More information about the saga-rg mailing list