Þ   briarpig  » thorn  » demos  » thread


demos are explained here; a menu at top column right indexes actual topic demos. Here we demo thread.

problem

     When Wil needs multiple threads for a purpose, he prefers to subclass a C++ wrapper abstracting thread details, instead of using the pthread library api directly.

     "Why do you want to do that?" Zé asked. "Doesn't it work just as well to use a pthread api in C and be done?"

     "Yes, it does," Wil granted. "There's no particular reason to do it the way I show on this page. In fact, you see the pthread api in C below — it implements the C++ classes."

     "Oh, that's sly," Zé admired. "You're saying if I don't like your approach, I should do it however I like."

     "Exactly," Wil smiled. "Furthermore, if you don't like any piece I put in the C++ api I use, change it to anything you like more. Go nuts. See what happens when you change the code. In fact, please experiment even if you like it."

     "Sure," Zé nodded. "Of course, I already know everything you're going to say about threads. But you want me to keep playing dumb, right? To motivate the material?"

     "Yes, please do," Wil invited. "But not too dumb, okay? Just ask plausible leading questions."

     "Aw, I was going to go straight to pure ignorance," Zé said. "A question, 'What's a thread?' comes directly to mind."

     Wil grimaced. "Yes, well, I wasn't going to answer that one," he said. "Why don't you take a shot yourself?"

     "I found this tutorial about threads online," Zé noted, "and it takes a cursory stab at defining what a thread is."

     "Thanks," Wil raised his eyebrows. "Maybe readers will read that and other online specs for the pthread api first. I'll just assume docs for pthread usage are easy to find."

main «

     "Sure, but let me note some context," Zé begged. "Every process has at least one thread: the one calling main() we can call the main thread by convention."

     "That's right," Wil nodded. "And it's the only thread if no one else starts another in that process. You can think of the main thread as being the stack, and a flow of control represented by registers saved and restored by context switches. A thread is an execution context."

     "Launching another thread creates a new stack and execution context," Zé elaborated. "But every thread in a process shares access to the address space, so threads can share globals and other common data structures."

     "Yes, if they're careful," Wil held up a finger. "Any state accessed by more than one thread must observe one of several disciplines: 1) immutable, or 2) mutable and thread-safe, or 3) mutable and safe to garble because non-essential."

     "What's a good example of the last one?" Zé asked. "What is safe to let multiple threads alter without protection?"

     "Statistics that aren't very important," Wil suggested. "I often use global counts without any mutex protection when perfect safety is not required, when no code depends on the count being perfect, because it's just for display."

     "Your mutex demo wraps pthread_mutex_t in a C++ class named yx," Zé noted. "So you expect folks to use that C++ wrapper when mutex is needed?"

     "Yes," Wil agreed. "And I wrap pthread_condition_t in a C++ class named yxk. Both are used on this page, so reading the mutex demo is required. Sorry."

     "Are you apologizing for those obscenely short names?" Zé asked. "Because I've been hoping you would."

     "Never," Wil shook his head. "But you can rename yx to ymutex, and yxk to ycondition if you like."

     "I'll cope," Zé sighed. "I'm not dim, you know. I can remember yx means mutex. And what was the RAII guard class that locks it again? Was it yxg for mutex guard?"

     "Bingo," Wil encouraged. "But let's get back to discussing main() since the demo launches new threads there. Real apps and servers might start threads far from main(), but unit tests often get right to business in main()."

     "What else is there to say?" Zé puzzled. "Let me put my noob hat back on. Oh yeah. What happens if main() returns if other threads are running?"

     "Your process ends then," Wil smiled, "whether or not other threads are done. So you'll probably crash. Which looks bad in unit tests. So don't do that. You can't let main() return, even if you can't think of anything useful to do with the original flow of control. But most folks use the main thread as the event thread. Or maybe as the socket i/o thread."

     "Sometimes those are the same thing," Zé observed.

     "Yes, sure, but I'm not going to show a server in this demo," Wil said. "This demo is oriented towards writing a unit test to prove thread-safe code is actually thread-safe, by stressing the daylights out of it with many threads."

     "Yeah, show me a real page cache," Zé suggested.

     "Not today," Wil dismissed. "My example code with several threads here is really trivial. The only interesting part is making main() wait until threads are done."

     "Using a barrier?" Zé asked. "Does your Mac laptop even have pthread_barrier_t supported?"

     "Yes, with barriers," Wil said. "And no, my Mac doesn't have pthread_barrier_t in the pthread api. So this demo also shows a hand-rolled replacement using a yx mutex and yxk condition variable."

     "Righteous," Zé approved. "But go faster—I'm bored."

job «

     "Did you invent another zany name for your thread class?" Zé asked. "Tell me it's not yt for thread."

     "Of course not," Wil laughed. "Class yt is already used in the toy language for tag. The letter t already has too much demand. Spelling it out in fall as ythread would work, but I didn't: the thread base class is named yjob."

     "Oh my dear god," Zé covered his face. "Are you flashing back to punched card batch jobs in the dark ages?"

     "A little," Wil considered. "I had a couple reasons to pick yjob as the class name instead of ythread."

     "I sure hope so," Zé emoted.

     "First," Wil ticked off one finger, "it's not really a thread. It's just some state associated with a thread. It's just a description of what a thread should do: the job assigned to it."

     "Not bad," Zé said. "My urge to hurl is subsiding."

     "Second," Wil continued, "the letter j is used nowhere else in thorn classes. It's a vacuum begging to be used."

     "Um," Zé considered. "Whatever."

     "Third," Wil concluded, "the names of thread classes appear rarely in code. The name hardly matters. A more unique name is slightly more useful than one just a single letter different from pthread. The name yjob is sufficiently weird."

     "You got that right," Zé granted. "I don't think you're going to get a lot of competition there. Okay, let's speed this up. Tell me what this job class is good for."

     "It's a namespace for collecting thread related stuff," Wil began. "And it provides an api to force conventions, just so there's some structure. Any state needed in a thread can be put into member variables of yjob subclasses."

     "For example," Zé supposed, "if you wanted a barrier associated with a thread, you could refer to it by some yjob member variable. That would let you establish a convention for stopping threads cleanly, and main() can rendezvous with all threads who wait on the barrier."

     "You're stealing my thunder again," Wil complained. "Now how can I make that fun without a plot twist?"

     "Doesn't matter," Zé shrugged. "Audience is already asleep, and they're not waking up again unless Valkyries come swooping through here in a Max Payne ripoff."

     "That can be arranged," Wil warned. "As you know perfectly well. Just keep it up and see what karma gets you."

barriers «

     "Let's start with your barrier replacement," Zé suggested. "Did you make it a nested class inside yjob?"

     "Of course," Wil nodded. "Okay, let's start there."

//define yjob_USE_BARRIERS 1 /*use pthread_barrier_t in yjob «*/

     "If you define that compiler switch," Wil explained. "That means you think you can use standard pthread_barrier_t found in pthread.h."

     "And if it's not defined," Zé guessed, "we use your replacement. What's it called? Jbarrier?"

     "How'd you guess that?" Wil squinted at Zé.

     "You start nested classes with the same main letter as the containing class," Zé explained, "but in uppercase."

     "Think you're pretty clever," Wil muttered.

class yjob { // pthread job: basic C++ wrapper for pthreads « public: // nested types inside class yjob

class Jbarrier { // hand-rolled basic pthread_barrier_t public: yx b_mux; // barrier's mutex (yx.h: yx is a mutex) yxk b_cond; // condition (yx.h: yxk is condition) unsigned b_count; // target count of waiters for barrier unsigned b_waiters; // actual number of waiters public: ~Jbarrier(); // same as pthread_barrier_destroy()

Jbarrier(unsigned count) // cf pthread_barrier_wait() « : b_mux(), b_cond(b_mux), b_count(count), b_waiters(0) { } int bwait(); // same as pthread_barrier_wait() };

public: // public member variables #ifdef yjob_USE_BARRIERS // note some pthread libraries don't have pthread_barrier_t typedef pthread_barrier_t Jbarrier_t; // pthread.h defines #else /*yjob_USE_BARRIERS*/ // typedef Jopaque Jbarrier_t; // opaque okay if not needed typedef Jbarrier Jbarrier_t; // if you still NEED barriers #endif /*yjob_USE_BARRIERS*/ Jbarrier_t* j_barrier;

     "If I read this right," Zé studied the code, "You define Jbarrier_t as a typedef for either yjob::Jbarrier or pthread_barrier_t depending on the compiler switch."

     "Yes," Wil beamed. "And member j_barrier points to one. The pointer can be nil. And the pointer is public."

     "I hate the way you make members public like that," Zé pretended, imitating Dex's signature curled lip.

     "Want to see code for Jbarrier now?" Wil asked.

     "Destructor and bwait()?" Zé asked. "Go for it."

int yjob::Jbarrier::bwait() { // cf pthread_barrier_wait() « yxg guard(b_mux); // lock the mutex in this scope if (++b_waiters >= b_count) { // reached target count? b_waiters = 0; b_cond.kbroadcast(); return 1; // only one thread gets positive return } // wait for condition to signal via broadcast b_cond.kwait(); // wait forever until broadcast occurs return 0; } yjob::Jbarrier::~Jbarrier() { // cf pthread_barrier_destroy() // destructors for b_cond and b_mux members execute here }

     "Does bwait() act just like pthread_barrier_wait()?" Zé asked. "I guess I should read the docs."

     "Yes, please read pthread_barrier_wait() docs on your time," Wil pleaded. "This works in my tests."

     "I'd like to see the test code," Zé said. "But I suppose we should finish the yjob api first, huh?"

state «

     "Let's finish member variables," Wil suggested. "So far you've seen j_barrier point to a barrier instance."

     "How much more state does a thread need?" Zé asked.

     "Watch your terminology," Wil cautioned. "We're not going to look at the state of a thread — we're going to look a members in a job, which is state associated with a thread."

     "You're so bleeping pedantic," Zé accused.

     "Just don't get confused," Wil warned. "There's plenty to get confused about. Almost all yjob member variables are devoted to tracking how a thread got started, and how you plan to stop it later, or rendezvous with others."

     "Except for the j_tid copy of a thread id," Zé noted.

class yjob { // (continued) // ... public: Jbarrier_t* j_barrier; // if non-nil, wait on thread stop protected: pthread_t j_tid; // pthread_self() post thread creation int j_stopped; // nonzero only when stopped int _jerr(const char* what) const; // report errno error public: bool j_autodelete; volatile int j_running; int j_ready; int j_detached;

     "As you noted," Wil explained, "j_tid will hold a copy of the thread's id — after the thread is started."

     "Those all look like they track little fiddling details," Zé muttered. "Maybe you don't need all of them?"

     "Excellent questions," Wil encouraged. "I suggest you run tests — see ones on this page — and see what happens when you drop these members on a case by case basis."

     "Why is j_running the only one declared volatile," Zé wondered. "Is it special?"

     "I was trying to get a thread test to behave on my Mac," Wil scratched his ear. "The thread's main loop checks the value of j_running and keeps going as long as it's not zero."

     "What happened?" Zé prompted. "Mind if I ask?"

     "When j_running is set to zero from another thread, the change is not currently observed," Wil sighed.

     "Even after you made it volatile?" Zé marveled.

     "Yeah," Wil nodded. "So my guess is the compiler isn't emitting correct code for volatiles. Or something."

     "So what do you see in the debugger?" Zé asked.

     "The debugger gets confused and says the stack is corrupt," Wil grimaced. "I used to have trouble with gdb under threads on Linux a few years ago. But not lately. Maybe the Xcode debugger or gdb on the Mac needs work."

     "How do you deal with that problem?" Zé asked.

     "I stop threads another way," Wil shrugged. "You'll see in the subclass samples."

job api «

     "How do you instantiate yjob?" Zé asked.

public: yjob(bool autoDelete);

     "New one on the heap," Wil explained. "Use a constructor for a subclass, since you need to override jrun()."

     "What does autoDelete mean?" Zé puzzled.

     "When the thread stops running, it deletes the yjob instance when j_autodelete is true," Wil summarized.

public: // getters pthread_t jtid() { return j_tid; } « int jstopped() { return j_stopped; } «

     "I get that jtid() is a copy of pthread_self() once the thread gets underway," Zé started. "But what does stopped mean? Is it nonzero when the thread is running?"

     "Something like that," Wil nodded. "It's one of the more useless members in yjob, but it's another detail to check for consistency when debugging."

     "Then I'll ignore it," Zé waved.

int jdetached() const { return j_detached; } « Jbarrier_t* jbarrier() { return j_barrier; } «

     "What's a detached thread?" Zé asked.

     "A detached thread can't be joined using pthread_join() later," Wil explained. "I never join threads, and I don't know anyone who does."

     "How do you get a value back from a thread then?" Zé asked. "What if a thread generates values?"

     "I transmit data using events, or thread-safe queues," Wil shrugged. "The idea of joining threads implies you're willing to start a thread and end it for short term tasks."

     "Sounds expensive," Zé pondered. "Wouldn't starting a thread have significant cost?"

     "Like I said," Wil reminded. "I don't know anyone who joins threads — most uses are long lived threads, typically in thread pools. Blocking an unused thread is simpler than ending it."

     "Is a barrier required?" Zé asked. "Can nil be kept in j_barrier? What would that do?"

     "A barrier is not required, so nil is fine," Wil explained. "If you never really plan to stop your threads, what use would a barrier be, anyway? But in unit tests, I pass a barrier into my constructors, because I plan the test to end."

void jstop(); // end execution int jstart(size_t stack, bool detach=true);

     "Does jstop() really terminate a thread?" Zé asked.

     "No, it just asks a thread to stop by setting j_running to zero," Wil explained. "Then a thread should stop running the next time it notices and finds a good point to exit."

     "You already said the changed value of j_running is somehow missed, is that right?" Zé asked.

     "Yeah," Wil confirmed. "So don't put much mental energy into j_running and jstop(), unless you want to debug the problem. I sure don't."

     "Does jstart() really start a thread?" Zé asked.

     "Yes, it does," Wil agreed. "It uses pthread_create() to start the thread, by passing a pointer to a global C function — named ymu_job_start() in this version — which then calls virtual method jrun() once the thread is underway."

     "So jstart() starts a thread using a pthread api, then calls virtual jrun() to run subclass specific code?" Zé restated. "And the size of the stack is stack bytes?"

     "Exactly," Wil confirmed. "The default stack size is one megabyte if you use jgo() below. But that's pretty big. A lot of threads will consider a quarter meg much more space than needed. You should size carefully."

     "What happens if I run out of stack?" Zé asked.

     "Don't do that," Wil instructed. "I haven't tried that. I assume you'd crash since the stack might not be able to grow in an unbounded fashion in most pthread runtimes."

enum { e_1MB_less_1pg = (1020 * 1024) }; // default stacksize int jgo(bool detach=true) { // default start « return jstart((size_t) e_1MB_less_1pg, detach); } public: // virtual API virtual ~yjob(); virtual void* jrun(); }; // yjob C++ wrapper for pthread API

     "The only virtual methods are destructor and jrun()," Zé asked. "Seems a little sparse."

     "Look at it this way," Wil said. "To describe a new kind of thread subclass, all you need to override is one method: the one saying what the thread does when it's running."

     "What should a thread do when running?" Zé prompted.

     "Whatever it wants," Wil shrugged. "Typically they loop forever doing any work requested, until you explicitly ask them to stop. Tests might end on time limits."

     "I see those are not pure virtual," Zé noted. "Where's the default implementation of jrun()? Is it already useful?"

     "No it's not useful," Wil warned. "It just sleeps and complains that you should have overriden the method."

yjob.cpp «

     "Let's look at the non-inline code," Wil suggested. "Here's the default jrun() before you override it."

void* yjob::jrun() { « j_tid = ::pthread_self(); ylog(1, "yjob::jrun(): tid=%#lx OVERRIDE?", (long) j_tid); int count = 0; do { // subclass forgot to override: just log in a loop: ::usleep(1000000); if (++count > 15) { // another 15 seconds passed? count = 0; ylog(1, "yjob::jrun(): tid=%#lx LOOP", (long) j_tid); } } while(j_running); return 0; }

     "Now check out the destructor," Wil pointed. "Notice how I use j_stopped to try keeping the job alive while spin looping if the thread still seems to be going."

     "Spinning is a bad idea," Zé observed.

     "Yes, and destroying a thread's job state while it's still running is an even worse idea," Wil countered. "The purpose here is to generate a diagnostic."

yjob::~yjob() { « long long spin = 0; // spin counter long tid = (long) j_tid; long self = (long) pthread_self(); while ( !j_stopped ) { if ((++spin & 0xFFFF) == spin) { ylog(1, "%#lx: tid=%#lx spin=%lld", self, tid, spin); } } --g_yjob_threads; ylog(1, "%#lx: tid=%#lx ~yjob() bye (threads left=%d)", self, tid, g_yjob_threads); }

     "I see you decremented a count of threads," Zé studied. "But I never saw you define that counter, or increment it."

     "Time for the constructor," Wil agreed.

yjob::yjob(bool autoDelete) { « j_barrier = 0; j_stopped = 1; // default to stop until actually started j_running = 0; j_autodelete = autoDelete; j_ready = 0; j_detached = 1; ++g_yjob_threads; ++g_yjob_thread_total; }

     "Those thread counters are not thread-safe, are they?" Zé asked. "That bother you?"

     "No, they're not thread safe," Wil granted. "And I don't really care, since these counts only get used in log messages."

     "But you could use atomic integers from the atomic demo instead," Zé guessed. "If correctness was crucial."

     "Yes," Wil nodded. "I just wanted to show an example here of statistics that aren't thread-safe that won't ruin the world if they ever get garbled. Small point, I guess."

int yjob::_jerr(const char* what) const { « int e = errno; ylog(1, "%s() errno=%d", what, e); return -1; }

     "Above you see the method logging an error message when a pthread call goes bad," Wil pointed.

     "Show me starting a thread," Zé prompted. "I'm getting impatient. I don't care if it's hairy."

     "I'm glad you don't care," Wil smiled. "Because I'm not going to explain it much."

int yjob::jstart(size_t stacksize, bool detach) { « pthread_attr_t attr; int e = ::pthread_attr_init(&attr); if (e != 0) { return _jerr("pthread_attr_init"); } e = ::pthread_attr_setstacksize(&attr, stacksize); if (e !=0) { return _jerr("pthread_attr_setstacksize"); } if (detach) { e = ::pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); if (e != 0) { return _jerr("pthread_attr_setdetachstate"); } j_detached = 1; } else { // joinable (possible to call pthread_join()) j_autodelete = false; j_detached = 0; } j_stopped = 0; // jstop() has not been called e = ::pthread_create(&j_tid, &attr, ymu_job_start, this); if (e) { // error during create? ylog(1, "jstart() errno=%d", errno); j_stopped = 1; // act as though jstop() was called } j_ready = 1; // method jstart() is done touching thread return e; }

     "I have several questions," Zé admitted. "Like, why does it matter whether j_ready is zero or not? And is ymu_job_start a function pointer?"

     "The j_ready member can be used to help show when the launching thread is done touching the state in the job instance," Wil explained. "I use j_ready to spin loop in ymu_job_start below until j_ready becomes nonzero."

     "So it's like a one time rendezvous at thread launch time?" Zé guessed. "Is it safer?"

     "Well, in principle the thread could run, finish executing, then destroy the job instance in a race," Wil explained. "So it's a good idea to have the original thread done with the job before a new thread makes progress."

     "Uh, huh," Zé pondered. "Thread scheduling is funky."

     "The purpose of the ymu_job_start function pointer is to satisfy the pthread_create() api," Wil explained.

     "Oh, I see," Zé googled for pthread docs. "The new thread calls the function whose pointer is passed, and it gets only one argument — the job here is that argument?"

     "Yes," Wil nodded. "Any other args you want to pass to a thread can be members of a job subclass."

void* ymu_job_start(void* threadArg) { // thread starter « mu::yjob* job = (mu::yjob*)threadArg; while (!job->j_ready) // until jstart() sets to nonzero ::usleep(20000); job->j_running = 1; void* outArg = job->jrun(); mu::yjob::Jbarrier_t* barrier = job->jbarrier(); job->jstop(); // j_stopped = 1 so yjob::yjob() can finish if (job->jdetached()) { // wait here if barrier is provided if (job->j_autodelete) delete job; if (barrier) { #ifdef yjob_USE_BARRIERS ylog(1, "ymu_job_start() waits on barrier"); ::pthread_barrier_wait(barrier); // pthread barrier #else /*yjob_USE_BARRIERS*/ // when barrier is opaque, you might want to log: // mu::ylog(1, "ymu_job_start() BARRIERS unused!"); barrier->bwait(); // use hand-rolled yjob::Jbarrier #endif /*yjob_USE_BARRIERS*/ } } else { // joinable? // stay alive or else main thread can't pthread_join() } return outArg; }

     "Oh, let's see," Zé read the code. "The new thread spins until j_ready is set to nonzero, if necessary. Then it calls jrun(), which is presumably the useful lifetime of the thread."

     "Correct," Wil agreed. "So when jrun() returns, we call jstop() (see below) whether or not called before, to ensure the destructor thinks it's safe to finish running."

     "Then if it's detached, it maybe autodeletes, and maybe waits on a barrier?" Zé asked. "So you pull the barrier out first before destroying the job?"

     "Yes," Wil nodded. "If you want to rendezvous on thread termination, you can install a barrier so it waits when it stops running. Everyone using the same barrier can rendezvous. I moved the autodelete step before the barrier wait during testing. Can you guess why?"

     "Did the destructor never get called?" Zé guessed.

     "That's right," Wil nodded. "The log message in the job destructor never printed. Telling me the thread never got scheduled again after the barrier rendezvous."

     "Cool," Zé considered. "No, I meant to say creepy. I take it threads might never be scheduled unless someone forces other threads to wait?"

void yjob::jstop() { « // ymu_job_start() calls jstop() when jrun() returns j_stopped = 1; // permit yjob::~yjob() to end spin loop j_running = 0; // tell jrun() to stop looping }

     "Yes," Wil confirmed. "And that's why I started using barriers in unit tests, because I saw really bizarre behavior when running a couple dozen threads. Sometimes one thread would never, ever run at all, until all the other threads started stopping after a test ended."

     "Oops," Zé chirped. "I suppose that was bad?"

     "Yeah," Wil spread his hands. "Because I'd be tearing down my test state, and one of the threads would start running finally. So the test would crash."

     "I take it most of this demo is about barriers as a result," Zé guessed. "How to start while planning to stop?"

     "Yes, I guess so," Wil shrugged. "I suggest you start with threads shown column right. Once you can start and stop pools of threads without a problem, then make the threads do something more complex."

A submenu for demos appears below, letting you go to the page on a topic written as a demo (as the demos page defines it).

menu

     thorn: todo, names, fd, iovec, assert, log, run, hex, crc, buf, in, out, quote, escape, compare, file, deck, cow, arc, blob, tree, slice, rand, time, stat, hash, heap, node, primes, page, book, pile, stack, atomic, lock, mutex, thread « Þ, map, meter, list, iter, ctype

     (mu: toy, peg, imm, tag, box, symbol, token, number, bigint, class, method, reader, writer, eval, env, vm, gc, world, pcode, compiler, asm, lathe, lisp, smalltalk, design, weight, jar, card, harp, debug, profile)

     Some demos are stubs: todo is a demo guide. See toy for mu updates on language pages; names introduces naming schemes.


     Many demos are yet stubs; see todo for a demo guide. Also see names for an overview of naming schemes.

output «

     "I added the following wprintf() method to the fd demo, so I can use it in this demo," Wil said.

     "But why?" asked Zé. "What's wrong with using fprintf() to write to stdout? Incidentally, I hate pretending I don't already know the answer."

     "Thanks for being a sport," Wil laughed. "Any buffered i/o api isn't thread-safe unless there's a way to get mutex, so threads writing concurrently won't garble content and step on each other. There's more than one way to avoid that."

     "What's good about wprintf() below?" Zé prompted.

int yfdw::wprintf(const char* fmt, ...) const { « char buf[4096 + 4]; // temp buffer va_list args; va_start(args,fmt); vsnprintf(buf, 4096, fmt, args); // room at end for nul va_end(args); buf[4096] = 0; // whether or not vsnprintf() wrote a nul return ::write(w_fd, buf, ::strlen(buf)); }

     "A direct system call to write() using a descriptor is thread safe," Wil explained. "Assuming a context switch to the kernel, that is. Since all the work is done in a system call, there's no race in the local process. At least, I know folks who assume that."

     "And you're assuming it right now, aren't you?" Zé needled.

     "Sure, why not?" Wil shrugged. "Anyway, in sample code below I use wprintf() as my api to print in what's intended to be a thread-safe manner. Your mileage may vary."

job subclassing «

     "Okay, show me a subclass of yjob," Zé insisted. "Right now. My patience is over. Where's the beef?"

     "Good timing," Wil said. "I was about to show you the first subclass anyway. Here's an example of a minimal thread job, which just runs for a specified number of seconds, sleeping once a second and printing a message."

class ysleepingthread : public yjob { // example test thread « public: // public only to avoid getters and setters: unsigned j_seconds; // max seconds thread should run « unsigned j_sleeps; // number of times thread slept « i32* j_atomic; // atomic counter shared by threads « public: // runtime is limited by seconds only because tests using // thread pools for multi-threaded testing typically have // some end condition, and we use time to simulate that:

ysleepingthread(Jbarrier_t* b, unsigned secs, i32* at) « : yjob(true), j_seconds(secs), j_sleeps(0), j_atomic(at) { j_barrier = b; // set inherited member to arg b early } public: // virtual API virtual ~ysleepingthread(); virtual void* jrun(); // loop until seconds pass };

ysleepingthread::~ysleepingthread() { « }

     "What's the j_atomic integer pointer for?" Zé asked. "A gratuitous use of thread-safe integer incrementing from the atomic demo? Do all the threads share that one counter?"

     "That's right," Wil nodded. "The sample main() below passes the same integer address to all the threads running a separate instance of ysleepingthread. They all increment this one counter. It's just a simple demonstration that threads can cooperate with some shared mutable state if they like."

     "Let's see the jrun() method now," Zé urged. "That's where I'll see what the thread does, right?"

     "Exactly," Wil nodded. "Here it is. All it does is loop until the number of seconds passed to the constructor have passed. It sleeps a second and checks the time, then prints one or another message, depending on whether it aims to stop."

void* ysleepingthread::jrun() { « j_tid = ::pthread_self(); // yfdw used for unbuffered output to STDOUT_FILENO: yfdw wout(yfdw::we_stdout); // writes to stdout time_t startTime = 0; ::time(&startTime); // for future reference do { ::usleep(1000000); ++j_sleeps; yinc_vi32(j_atomic); time_t now = 0; ::time(&now); int elapsed = now - startTime; if (elapsed > (int) j_seconds) { // hit time limit? // unbuffered write needs no thread-safety here: wout.wprintf("bye: tid=%#lx sleeps=%u secs=%d\n", (long) j_tid, (unsigned) j_sleeps, elapsed); j_running = 0; // end while loop } else { wout.wprintf("run: tid=%#lx sleeps=%u secs=%d\n", (long) j_tid, (unsigned) j_sleeps, elapsed); } } while(j_running); return 0; }

     "Why do you use the time() method here?" Zé asked. "Don't you have a more efficient way to see elapsed time?"

     "I didn't want to write the time demo just to use time apis here," Wil explained. "This is harmless as sample code. It's not like performance matters here, you know."

     "Okay, now all we need is a main() to instantiate some ysleepingthread job instances," Zé anticipated. "Then you can start the threads for them?"

     "Coming right up," Wil agreed. "Below I create three threads, each running up to three seconds past the start time. I create a barrier instance on the stack and pass it to each of the job constructors. It's pretty minimal."

int main(int argc, char * const argv[]) { « long mid = (long) pthread_self(); // main's thread ID yfdw wout(yfdw::we_stdout); // writes to stdout wout.wprintf("%#lx: begin sleeping thread test...\n", mid); const unsigned N = 3; // number of threads in test ysleepingthread* threadVec[N]; const unsigned secs = 3; // number of seconds for test i32 count = 0; // for atomic counds in threads #ifdef yjob_USE_BARRIERS pthread_barrier_t egress; // barrier type in pthread.h pthread_barrier_init(&egress, NULL, N + 1 /*1 for me*/); #else /*yjob_USE_BARRIERS*/ yjob::Jbarrier egress(N + 1); // +1 for myself #endif /*yjob_USE_BARRIERS*/ unsigned i = 0; for ( ; i < N; ++i) { // create each thread wrapper first threadVec[i] = new ysleepingthread(&egress, secs, &count); } wout.wprintf("%#lx: starting %u threads\n", mid, N); for (i = 0; i < N; ++i) { // actually start each thread threadVec[i]->jgo(); // default yjob::jstart() } #ifdef yjob_USE_BARRIERS pthread_barrier_wait(&egress); #else /*yjob_USE_BARRIERS*/ egress.bwait(); // wait until all threads stop running #endif /*yjob_USE_BARRIERS*/ wout.wprintf("%#lx: ...end thread test (count=%d)\n", mid, (int) count); return 0; }

     "Why do you use yjob_USE_BARRIERS ifdefs?" Zé asked. "Couldn't you just define Jbarrier to use pthread_barrier_t when that compiler switch is defined?"

     "That would look a lot simpler, wouldn't it?" Wil nodded. "It would also slightly obfuscate what happens, and I really wanted to show detail directly when possible."

     "Whatever," Zé dismissed. "What appears on stdout?"

0xa000d000: begin sleeping thread test... 0xa000d000: starting 3 threads run: tid=0x1800a00 sleeps=1 secs=1 run: tid=0x1800e00 sleeps=1 secs=1 run: tid=0x1801200 sleeps=1 secs=1 run: tid=0x1800a00 sleeps=2 secs=2 run: tid=0x1800e00 sleeps=2 secs=2 run: tid=0x1801200 sleeps=2 secs=2 run: tid=0x1800e00 sleeps=3 secs=3 run: tid=0x1800a00 sleeps=3 secs=3 run: tid=0x1801200 sleeps=3 secs=3 bye: tid=0x1800a00 sleeps=4 secs=4 bye: tid=0x1800e00 sleeps=4 secs=4 ylog(1) 0x1800e00: tid=0x1800e00 ~yjob() bye (threads left=2) ylog(1) 0x1800a00: tid=0x1800a00 ~yjob() bye (threads left=1) bye: tid=0x1801200 sleeps=4 secs=4 ylog(1) 0x1801200: tid=0x1801200 ~yjob() bye (threads left=0) 0xa000d000: ...end thread test (count=12) ylog(1) pthread_cond_destroy() => 16='Resource busy', errno=0

     "There you go," Wil pointed. "About what you expected, right? Except for that last line, of course."

     "Yeah," Zé arched an eyebrow. "What's that about? One of the condition variable destructors complained?"

     "Uh, huh," Wil confirmed. "The destructor for egress runs when main() returns, and apparently the condition variable is still busy — that's what pthread_cond_destroy() returns."

     "You actually check error values from pthread calls?" Zé marveled. "Are you some kind of nutball? I'd hide that output if I were you. Why did you show that here?"

     "It's interesting and it's not doing me any harm," Wil shrugged. "So it's something to think about."

     "It's just irritating," Zé complained. "Got any more job subclasses to show me?

     "Yeah," Wil nodded. "Next I'll show you how to make a thread pool reading requests from a queue."

thread pools «

     "Clue me in," Zé sang in a perfunctory manner. "What's a thread pool? A pool of threads providing a service?"

     "More or less," Wil agreed. "A thread pool is just a set of threads that block until you give one something to do. In this case I'm using a blocking queue from the mutex demo."

     "Oh god," Zé rolled his eyes. "Are you going to use that yxlqt<> template class? Isn't that a horrible name?"

     Wil scratched his ear. "Well it was just throw-away sample code," he apologized. "Now that I'm actually using it, maybe I should have named it yblockingqueue<T> instead."

     "Those letters in the name mean mutex list queue template don't they?" Zé asked. "It could be worse. What does it do again?"

     "The most important thing happens when you try to pop the queue and it's empty," Wil explained. "Then you block until someone pushes something in the queue."

     "I get it," Zé said. "That way your pool of threads can block on trying to pop the request queue. I guess a thread wakes every time you push a request in the queue?"

     "Yep," Wil confirmed. "Sometimes folks call that sort of thread a worker thread, so that's what I named the next job subclass. It's a job for a worker thread that blocks on a request queue."

class yworkerthread : public yjob { // for use in thread pool « public: struct Jrequest { // request type yworkerthread::Jrequest « int r_val; Jrequest() : r_val(-1) { } // negative on empty construct Jrequest(int v) : r_val(v) { } Jrequest(unsigned v) : r_val((int) v) { } operator int() const { return r_val; } };

     "This nested Jrequest request class is the value pushed into the request queue?" Zé asked.

     "Exactly," Wil nodded. "You're batting a thousand."

protected: // yx.h: yxlqt<T> is mutexed (STL) list queue template yxlqt<Jrequest>* j_request_queue; // list of Jrequest « int j_sum; // (gratuitous) sum of requests « public: yworkerthread(yxlqt<Jrequest>* queue, bool autoDelete=true) : yjob(autoDelete), j_request_queue(queue), j_sum(0) { }

yworkerthread(yxlqt<Jrequest>* queue, Jbarrier_t* b) « : yjob(true), j_request_queue(queue), j_sum(0) { j_barrier = b; // set inherited member to arg b early } public: // virtual API virtual ~yworkerthread(); virtual void* jrun(); // loop on request queue };

yworkerthread::~yworkerthread() { « }

     "Looks like you pass both a request queue and a barrier to the constructor," Zé observed. "Is that the usual convention: pass your job subclass anything needed to operate, in the constructor?"

     "Sure," Wil shrugged. "Why make it more complex?"

     "Okay," Zé rubbed his hands. "Let's see the jrun() method called when the thread starts."

     "Alright," Wil said. "Here's what you'll see: the thread loops, blocking on the queue each time until a request can be popped. A sleep call is added just to make on thread too busy to service other requests for a moment."

void* yworkerthread::jrun() { « j_tid = ::pthread_self(); // yfdw used for unbuffered output to STDOUT_FILENO: yfdw wout(yfdw::we_stdout); // writes to stdout do { ::usleep(40000); Jrequest req = j_request_queue->qpop();

if (req.r_val < 0) { // interpret negative as stop? « wout.wprintf("run: tid=%#lx sum=%d val=%d NEG: stop\n", (long) j_tid, (int) j_sum, (int) req.r_val); return 0; } else { j_sum += req.r_val; wout.wprintf("run: tid=%#lx val=%d sum=%d\n", (long) j_tid, (int) req.r_val, (int) j_sum); } } while(j_running); return 0; }

     "It looks like a negative request value causes thread termination," Zé guessed. "Is that how a worker stops?"

     "In this case, yes," Wil confirmed. "Sometimes I make thread pools terminate by sending a stop request to the queue: at least N of them for N threads using the queue."

     "Is that what you do in main() here?" Zé asked.

     "Yes," Wil said. "Once again I start three threads, all workers this time using the same barrier and request queue. There's a couple commented out bits of code. In the output afterward I'll show what happens when you run this main() with the commented code left in place."

int main(int argc, char * const argv[]) { « long mid = (long) pthread_self(); // main's thread ID yfdw wout(yfdw::we_stdout); // writes to stdout wout.wprintf("%#lx: begin worker thread test...\n", mid); const unsigned N = 3; // number of threads in test yworkerthread* workerVec[N]; #ifdef yjob_USE_BARRIERS pthread_barrier_t egress; // barrier type in pthread.h pthread_barrier_init(&egress, NULL, N + 1 /*1 for me*/); #else /*yjob_USE_BARRIERS*/ yjob::Jbarrier egress(N + 1); // +1 for myself #endif /*yjob_USE_BARRIERS*/ // mutexed list queue template: queue of Jrequest requests: yxlqt<yworkerthread::Jrequest> queue; // for each worker unsigned i = 0; for ( ; i < N; ++i) { // create each thread wrapper first workerVec[i] = new yworkerthread(&queue, &egress); } wout.wprintf("%#lx: starting %u workers\n", mid, N); for (i = 0; i < N; ++i) { // actually start each thread workerVec[i]->jgo(); // default yjob::jstart() } const unsigned R = 9; // number of initial requests wout.wprintf("%#lx: sending %u work requests\n", mid, R); for (i = 1; i <= R; ++i) { // send half dozen requests yworkerthread::Jrequest req(i); queue.qpush(req); // push in queue, waking one worker

/*::usleep(60000); // cf stdout with sleep between*/ }

/* cf using jstop() wout.wprintf("%#lx: disabling workers\n", mid); for (i = 0; i < N; ++i) { // tell each worker to stop workerVec[i]->jstop(); // set j_running to false } */ wout.wprintf("%#lx: tickle workers one last time\n", mid); for (i = 0; i < 2*N; ++i) { // wake each worker last time yworkerthread::Jrequest req(-1); queue.qpush(req); // push in queue, waking one worker } #ifdef yjob_USE_BARRIERS pthread_barrier_wait(&egress); #else /*yjob_USE_BARRIERS*/ egress.bwait(); // wait until all threads stop running #endif /*yjob_USE_BARRIERS*/ wout.wprintf("%#lx: ...end thread test\n", mid); return 0; }

     "The stdout output from that appears below," Wil pointed. "Do you see the interesting part? Look at it carefully. When do the threads start running?"

0xa000d000: begin worker thread test... 0xa000d000: starting 3 workers 0xa000d000: sending 9 work requests 0xa000d000: disabling workers 0xa000d000: tickle workers one last time run: tid=0x1800a00 val=1 sum=1 run: tid=0x1800e00 val=2 sum=2 run: tid=0x1801200 val=3 sum=3 run: tid=0x1800a00 val=4 sum=5 run: tid=0x1800e00 val=5 sum=7 run: tid=0x1801200 val=6 sum=9 run: tid=0x1800a00 val=7 sum=12 run: tid=0x1800e00 val=8 sum=15 run: tid=0x1801200 val=9 sum=18 run: tid=0x1800a00 sum=12 val=-1 NEG: stop run: tid=0x1800e00 sum=15 val=-1 NEG: stop ylog(1) 0x1800a00: tid=0x1800a00 ~yjob() bye (threads left=2) run: tid=0x1801200 sum=18 val=-1 NEG: stop ylog(1) 0x1800e00: tid=0x1800e00 ~yjob() bye (threads left=1) ylog(1) 0x1801200: tid=0x1801200 ~yjob() bye (threads left=0) 0xa000d000: ...end thread test ylog(1) pthread_cond_destroy() => 16='Resource busy', errno=0

     "It looks like the workers don't run until after you push the final stop requests with negative values," Zé said.

     "Yes," Wil agreed. "I suspect if I added a print message before the barrier wait in main(), I'd see none of the threads run until main() blocks on the barrier."

     "I see what you mean about threads not being scheduled until they're forced to run," Zé was thoughtful.

     "So next I added that sleep after pushing each request," Wil explained. "That output is shown next below."

0xa000d000: begin worker thread test... (with-sleep-between) 0xa000d000: starting 3 workers 0xa000d000: sending 9 work requests run: tid=0x1800a00 val=1 sum=1 run: tid=0x1800e00 val=2 sum=2 run: tid=0x1801200 val=3 sum=3 run: tid=0x1800a00 val=4 sum=5 run: tid=0x1800e00 val=5 sum=7 run: tid=0x1801200 val=6 sum=9 run: tid=0x1800a00 val=7 sum=12 run: tid=0x1800e00 val=8 sum=15 run: tid=0x1801200 val=9 sum=18 0xa000d000: tickle workers one last time run: tid=0x1800a00 sum=12 val=-1 NEG: stop ylog(1) 0x1800a00: tid=0x1800a00 ~yjob() bye (threads left=2) run: tid=0x1800e00 sum=15 val=-1 NEG: stop ylog(1) 0x1800e00: tid=0x1800e00 ~yjob() bye (threads left=1) run: tid=0x1801200 sum=18 val=-1 NEG: stop ylog(1) 0x1801200: tid=0x1801200 ~yjob() bye (threads left=0) 0xa000d000: ...end thread test ylog(1) pthread_cond_destroy() => 16='Resource busy', errno=0

     "Uh, huh," Zé smiled. "Here the worker threads start running. They print messages before the message in main() saying it will next tickle the workers one last time."

     "I've got one more test run to show," Wil warned.

     "I was wondering why you said 'tickle' instead of just stating your plan to stop the threads," Zé remarked.

     "Originally I called jstop() on each thread before I pushed the last negative request," Wil explained. "This was before I added the sleep after pushing each request. The following strange output was the result. It's a mite disturbing."

0xa000d000: begin worker thread test... (using jstop()) 0xa000d000: starting 3 workers 0xa000d000: sending 9 work requests 0xa000d000: disabling workers 0xa000d000: tickle workers one last time run: tid=0x1801200 val=2 sum=2 run: tid=0x1800a00 val=1 sum=1 ylog(1) 0x1801200: tid=0x1801200 ~yjob() bye (threads left=1) ylog(1) 0x1800a00: tid=0x1800a00 ~yjob() bye (threads left=2) run: tid=0x1800e00 val=3 sum=3 run: tid=0x1800e00 val=4 sum=7 run: tid=0x1800e00 val=5 sum=12 run: tid=0x1800e00 val=6 sum=18 run: tid=0x1800e00 val=7 sum=25 run: tid=0x1800e00 val=8 sum=33 run: tid=0x1800e00 val=9 sum=42 run: tid=0x1800e00 sum=42 val=-1 NEG: stop ylog(1) 0x1800e00: tid=0x1800e00 ~yjob() bye (threads left=0) 0xa000d000: ...end thread test ylog(1) pthread_cond_destroy() => 16='Resource busy', errno=0

     "Whoa," Zé exclaimed. "Two of the threads stop right away after getting their first requests. Because jstop() set j_running to zero? Then why does the third thread keep going?"

     "Good question," Wil rubbed his chin. "I don't have the slightest idea. If two threads stopped early, all of them should have stopped. But one consumes all the requests first."

     "Maybe that last thread felt sorry for you," Zé teased.

     "If you look at the very first run's output," Wil directed Zé to the output, "you see it also prints a 'disabling workers' message. That run also used jstop() to stop the threads. But that time none of them stopped. Kinda creepy, huh?"

     "Yeah, weird," Zé agreed. "I see why you commented out the loop where you call jstop()."

     "The negative request values are good enough," Wil said.

     "Don't you want to know what happened?" Zé asked.

     "Yeah, but Xcode wasn't able to debug the threads," Wil shrugged. "It thought the stacks were wrong when I hit break points. And it was just sample code."

     "So you gave up, you slacker," Zé accused.

     "Guilty as charged," Wil admitted. "Check out those first two messages printed by ~yjob() above. Notice anything interesting about the order of left=1 and left=2?"

     "Cool!" Zé crooned. "It looks like the wprintf() calls resolved in the opposite order they started. Or something."

     "Or something," Wil agreed. "Demo's over. How's your karma? Feel like a visit from Valkyries? I could fill up the rest of this column with fiction. Do you feel lucky, punk?"

     "'Are you threatening me?'" Zé asked in his Cornholio voice, imitating a Beavis and Butthead character. His hands drifted out to his side and hovered over holsters for imaginary six guns.

     "You change gears too fast," Wil complained.

     "Duh," Zé rejoined. "Always ready for nonsense."

     "That's why you're a happy ne'er-do-well and I'm a work-a-holic," Wil observed. "Hmm, want to trade?"

     "Not a chance," Zé laughed. "Hey, I told Eli I'd take him on a shadow walk to show him Ged's city under Yggdrasil."

     Wil's face screwed up in puzzlement. "What?"

     "Never mind," Zé waved. "We're going to swap dream stories next time the gang meets. Do you still recall that one you mentioned? Was it Yoda?"

     "Oh yeah," Wil tried to recall. "I dreamed I was a clone trooper, and Yoda got on my case. Said I was always worried more about enemies, and I should look to friends for inspiration."

     "I can't picture you as a clone trooper," Zé admitted.

     "Yeah, it was kinda disturbing," Wil confided.

     "But taking advice from Yoda is fine," Zé teased.

     "Only when I'm sleeping," Wil countered.

     "Ask him to teach you to use the force next time," Zé suggested. "His Kung Fu's better than yours."

     "I'm too old to begin the training," Wil said.

license «

     All this code is available only under the BriarPig mu-babel license described fully on the rights page. You do not have permission to reprint this page in any way. Neither feeds nor repackaging is allowed. You can link this page if you want folks to read it.