ACE Tutorial 014
ACE_Stream Tutorial, Of Sorts


Now we come to main (). In the previous task-chain tutorial every thread pool had to have the same number of threads. This time around, we leverage the construction method of ACE_Stream and ACE_Module to customize the thread-pool size in each ACE_Task of the stream.

Remember EndTask from the previous page? We create one here and push it into the stream to take care of cleaning up the messages. Technically, we could have replaced the default Tail task created by the ACE framework but it seems to make more sense to just push our "tail" onto the stream like the other tasks. The caveat to this method is that you must be sure you don't push () any other Modules behind the EndTask!

Once the stream of modules containing tasks is all setup then we can put () some data into the stream for processing. The clever use of Task::close () makes shutting downt the stream easier than ever. No messing with hangup messages at the application level, just close () when you're done! What could be simpler?



// $Id$

// stream.cxx
//
// Tutorial regarding a way to use ACE_Stream.
//
// written by bob mcwhirter (bob@netwrench.com)
//
//

#include "Task.h"
#include "EndTask.h"
// This is our specialized ACE_Task.

#include <ace/Module.h>
#include <ace/Stream.h>
#include <ace/streams.h>
// These are the neccessary ACE headers.


typedef ACE_Module<ACE_MT_SYNCH> Module;
typedef ACE_Stream<ACE_MT_SYNCH> Stream;
// Just to avoid a lot of typing, typedefs
// are generally a good idea.

int main (int argc, char *argv[])
{
  int numberOfMessages = argc > 1 ? ACE_OS::atoi (argv[1]) : 3;
  // unless otherwise specified, just send three messages
  // down the stream.

  Stream theStream;
  // the ACE_Stream itself.

  // Now, we instantiate 4 different Tasks.  These do not
  // need to be all the same class, but they do need to
  // all derrive from the same flavor of ACE_Task.
  //
  // Also, we instantiate a fifth end-cap Task to clean
  // up Message_Blocks as they reach the end.

  Task *taskOne;
  Task *taskTwo;
  Task *taskThree;
  Task *taskFour;
  Task *taskEnd;

  // Out Task's take two arguments: a name, and the number
  // of threads to dedicate to the task.

  taskOne = new Task ("Task No. 1", 1);
  taskTwo = new Task ("Task No. 2", 3);
  taskThree = new Task ("Task No. 3", 7);
  taskFour = new Task ("Task No. 4", 1);

  // Our EndTask only takes 1 argument, as it actually
  // doesn't spawn any threads for processing.

  taskEnd = new EndTask ("End Task");

  Module *moduleOne;
  Module *moduleTwo;
  Module *moduleThree;
  Module *moduleFour;
  Module *moduleEnd;

  // ACE_Stream accepts ACE_Modules, which are simply a pair of
  // ACE_Tasks.  One is dedicated for writing, while the other
  // is dedicated to reading.  Think of the writing side as
  // downstream, and the reading side as upstream.
  //
  // We're only working with a unidirection Stream today,
  // so we'll only actually install a Task into the write
  // side of the module, effectively downstream.

  moduleOne = new Module ("Module No. 1", taskOne);
  moduleTwo = new Module ("Module No. 2", taskTwo);
  moduleThree = new Module ("Module No. 3", taskThree);
  moduleFour = new Module ("Module No. 4", taskFour);
  moduleEnd = new Module ("Module End", taskEnd);

  // Now we push the Modules onto the Stream.
  // Pushing adds the module to the head, or
  // otherwise prepends it to whatever modules
  // are already installed.

  // So, you need to push () the modules on -backwards-
  // from our viewpoint.

  if (theStream.push (moduleEnd) == -1) {
           ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  if (theStream.push (moduleFour) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  // As we push a Module onto the Stream, it gets opened.
  // When a Module open ()s, it opens the Tasks that it contains.
  //
  // Since we cannot provide an argument to this embedded
  // call to open (), we supplied specified the number of
  // threads in the constructor of our Tasks.

  if (theStream.push (moduleThree) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  if (theStream.push (moduleTwo) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  if (theStream.push (moduleOne) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  // Now that the Modules are open, the Tasks threads should
  // be launching and entering their svc () loop, so we send
  // some messages down the Stream.

  int sent = 1;

  ACE_Message_Block *message;

  while (sent <= numberOfMessages) {

    // First, create ourselves a Message_Block.  See Tutorials 10-13
    // for more information about Message_Blocks and Message_Queues.
    // Note the use of the lock_adapter () to ensure proper
    // serialization.
    ACE_NEW_RETURN (message,
                    ACE_Message_Block (128,
                                       ACE_Message_Block::MB_DATA,
                                       0,
                                       0,
                                       0,
                                       Task::lock_adapter ()),
                    -1);

    // Now, we grab the write-pointer from the Block,
    // and sprintf () our text into it.

    ACE_OS::sprintf (message->wr_ptr (), "Message No. %d", sent);

    // All we have to do now is drop the Message_Block
    // into the Stream.

    // It is always a good idea to duplicate () a Message_Block
    // when you put it into any Message_Queue, as then
    // you can always be allowed to release () your copy
    // without worry.

    if (theStream.put (message->duplicate (), 0) == -1) {
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "put"), -1);
    }

    message->release ();
    ++sent;
  }

  // Now that we've sent our Message_Blocks, close down
  // the Stream.
  //
  // The Stream will automagically delete the Modules and
  // the contained Tasks.  We don't have to do that.
  //
  // This call will block (due to the way we've written our
  // Task class) until all Message_Blocks have cleared the
  // entire Stream, and all associated threads have exited.

  theStream.close ();

  return 0;
}


[Tutorial Index]