Page 1 of 1

C++ STL modification + thread scheduling

Posted: Sun Jan 29, 2006 1:03 pm
by Candy
Uhm... to start, I'm not sure I can explain this clearly and I'm not at all sure whether I'm placing this thread correctly.


I'm modifying the C++ STL to support generic typed streams and generic modifying filters between them. In short, this modifies the default basic_istream/ostream/iostream in a way that they now include a new base class that supports a small portion of their original function, and a new subclass of them is created (as well as from streambuf) that supports encapsulating a gen_istream to a basic_istream. This means that all basic_*stream types can be used as gen_*stream types and that all gen_*stream types that conform to a character-based stream can be converted back into a basic_*stream.

I modified it like this because the basic_*stream classes all assume you're using characters. I wanted normal typed streams that would intermix closely with the default streams, so I separated out the char-specific part (which was pretty much in fact). I didn't do this on the streambuf-level since that would still require me to make either freely-typed streambufs that couldn't be used as stream and that had to implement two-way communication (which would incur loads of overhead) or to split up streambuf which would make a lot of existing code incompatible - as well as making it very awkward to use and requiring buffering.


Now, I've been messing with it to start supporting multiple reads from a single stream at once, which makes my current problem. I'm trying to get this generic for the future, so it must support multithreading things that can be done in parallel. A small example:

Mass-encoding files to OGG files and writing them back

The type flow goes from disk_file -> wavfile -> wav sound object -> ogg object -> ogg file -> saved diskfile. There are three filters involved, a file reader (ifstream), a file writer (ofstream) and a conversion filter. Code example:

Code: Select all

gen_istream<char> *if = new ifstream("wavfile");
gen_ostream<char> *of = new ofstream("oggfile");
gen_istream<wav_sample> *i = new inputfilter(get_filter("file", "wav"), if);
gen_istream<ogg_unit> *ogg = new inputfilter(get_filter("wav", "ogg", "encode-hq"), i);
gen_istream<char> *afterconv = new inputfilter(get_filter("ogg", "file"), ogg);
afterconv >> of;
This constructs a series of filters that in combination, convert a file from wav file type to ogg file type. I'm still missing a few details, I put in somewhere that you could insert parameters to each filter but I think I've lost that somewhere along the way. The point of the problem is now in how this is handled.

The easiest way to see this is as a series of functions, each that calls the next and only needs its first N outputs for output. There are multiple ways to implement this. The simplest is by reading in all from the input, then running each sample through the first filter, then through the second etc. all in series. This is slow and very unparallel. The other extreme is to make a fifo in between each filter (for each stream) and to run threads that fill the fifo's. This is very scalable, but relatively slow (due to loads of thread switching on low-cpu-count machines).

I'm trying to find a middle-way that also works when this method is heavily used, without causing low-cpu-count computers to slow down too much in processing or to be too slow on N-cpu machines. Right now, I'm tempting toward the N-fifo solution and to let the thread scheduler/switcher handle this problem (which would place this thread in OSdev). I'm not sure whether that's the best way, so I'm wondering what your ideas toward this problem are?

PS: when I'm fairly done with this C++ library modification I'll release both the library and the modification into public domain so that the idea might be actually used to lift the software industry to another level. Stand on my shoulders, not on my toes.

Re:C++ STL modification + thread scheduling

Posted: Sun Jan 29, 2006 2:40 pm
by Candy
Since this probably won't fit in the previous message, the code I'm considering for the inputstream and the output stream:

Code: Select all

template <typename I, typename O>
class inputfilter : public gen_istream<O> {
    public:
   inputfilter(filter<I, O> *f, gen_istream<I> *s) : f(f), s(s) {
      // let the fifo fill itself in the background
      thr = tcall(((*)(gen_istream<I> *, fifo<I>::iterator &, streamsize))s->getn, 
         s, input.begin(), -1);
      // this approach is faster on multi-cpu computers and equally fast on single-cpu computers
   }

   virtual ~inputfilter() {
      tkill(thr);
      // iterate through fifo ourselves and remove remaining entries
      for (fifo<I>::iterator i = input->begin(); i != input->end(); ++i) {
         delete *i;
      }
      delete f;
      delete s;
   }

   virtual O *get() {
      I *obj = *input->begin();
      input->pop_front();
      O *out = f->process(obj);
      delete obj;
      return out;
   }

   // to allow reading to an iterator or temporary storage
   // used internally to fill a FIFO and to keep it full
   virtual template <class T>
   streamsize getn(T&it, streamsize count) {
      for (i=0; i<count; i++) {
         *it = get();
         ++it;
      }
   }

    private:
   fifo<I> input;
   tid_t thr;
   filter<I, O> *f;
   gen_istream<I> *s;
};

template <typename I, typename O>
class outputfilter : public gen_ostream<I> {
    public:
   outputfilter(filter<I, O> *f, gen_ostream<O> *s) : f(f), s(s) {
      thr = tcall(((*)(gen_ostream<I> *, fifo<O>::iterator &, streamsize))s->putn,
         s, input.begin(), -1);
   }

   ~outputfilter() {
      tkill(thr);
      for (fifo<O>::iterator i = output.begin(); i != output.end(); ++i) {
         delete *i;
      }
      delete f;
      delete s;
   }

        virtual put(I *i) {
      O *obj = f->process(i);
      input.push_back(obj);
   }

        virtual template <class T>
        streamsize putn(T&it, streamsize count) {
                for (i=0; i<count; i++) {
                        put(*it);
                        ++it;
                }
   }
    private:
   fifo<O> output;
   tid_t thr;
        filter<I, O> *f;
        gen_ostream<O> *s;
};
The idea is, on creation of an input/outputfilter to start a thread on the layer below it continually pulling/pushing data. As soon as data is needed it only needs to go through the last layer of filtering (changing) that's available before it is used directly. You could tcall a read to a fifo to make it "instantly available" if you'd like that. I think this works with small overhead on a single-cpu system (you tcall threads, they are started after this thread blocks, they preload data and if necessary signal the other thread, so you at most lose 2N-2 thread switches where N is the amount of filters. In case of an N-cpu computer all work is done in parallel, possibly with some threads not being used actively.

I think this method also profits from gang scheduling and computers with M < N cpu's. If you only have M threads, N-M threads are not active. As soon as one of the M threads blocks (due to lower load) one of the others can be started on that cpu. That way, the cpu availability is still quite good...


I think that in this case it's better to create too many threads and to let the scheduler use gang scheduling and similar techniques to solve it.

Any ideas or comment on the scheduling bit? Or on any other bit?

Re:C++ STL modification + thread scheduling

Posted: Sun Jan 29, 2006 2:54 pm
by Candy
Candy wrote: This constructs a series of filters that in combination, convert a file from wav file type to ogg file type. I'm still missing a few details, I put in somewhere that you could insert parameters to each filter but I think I've lost that somewhere along the way. The point of the problem is now in how this is handled.
I've found the bit that did parameter handling again:

Code: Select all

gen_istream<char> *if = new ifstream("wavfile");
gen_ostream<char> *of = new ofstream("oggfile");
filter<char, wav_sample> *cw = get_filter("file", "wav");
filter<wav_sample, ogg_unit> *wo = get_filter("wav", "ogg", "encode-hq");
filter<ogg_unit, char> *oc = get_filter("ogg", "char");

map<string, string> *oggparams = new map<string, string>;
oggparams->insert(make_pair("type", "vbr"));
oggparams->insert(make_pair("q", "7"));

gen_istream<char> *afterconv = oc->process(wo->process(cw->process(if), oggparams));
afterconv >> of;
This sets type=vbr and q=7 for the wav->ogg converter.

I haven't detailed a bit of the idea yet. You can also chain together multiple filters to get a single filter that does the same work. This is at the moment without fifo'ing, but I might add that:

Code: Select all

filter<A, B> *abfilter = get_filter("A", "B");
filter<B, C> *bcfilter = get_filter("B", "C");

filter<A, C> *acfilter = multifilter<A,B,C>(abfilter, bcfilter);
It might compile without <A,B,C>, didn't try that yet. Probably does, makes the code less confusing. This way, you can ask the OS to make a filter from type A to type B without indicating what's between it, and it'll just do it for you.

My strong advice on the parameters is to include no third parameter (third == "") for simple conversion functions that do a reversible conversion from type A to type B. If there is a factor of loss or some form of transformation, make the third non-"".

There is also a fourth parameter in get_filter that I'm hoping not to use often. It's called "creator" and it allows you to select a given author for a given filter. Use is of course strongly discouraged.

The filters themselves will be hosted centrally on a server that hands them out (or sells them, if commercial venues enter the picture). You'll get an overview of several ways to get what you want and you'll be able to choose components. No more vendor-lock-in or even application-lock-in for specific document formats, any app should be able to process any file format corresponding to its data types. That way, the discussion on file formats is based on the file formats, not on the software coming with it.

Please, shoot holes in my idea :)