Þ   briarpig  » code  » sink


sink

     I wrote about C-based out stream sinks earlier this month. This is a verbatim copy of what appeared in my log. I put a copy here only to clarify license and copyright.

purpose

     The purpose of cy_event and cy_heap is to support a simple kind of priority queue for events delivered under a virtual clock.

12apr09 tastier tomatoes

cy_sink

     I won't say much about the C stream api at all. Read the out demo for explanations. This code is a clone of yo in the out thorn demo (cf «).

typedef struct cy_sink_ cy_sink; /* output sink stream */ typedef int /* like ::write() */ (*cy_sink_write_fn)(cy_sink* s, const void* b, uint32_t n); typedef int (*cy_sink_flush_fn)(cy_sink* s); typedef void (*cy_sink_putc_fn)(cy_sink* s, int c); /* fallback */ struct cy_sink_ { /* 'sink' output stream */ cy_sink_write_fn s_write; cy_sink_flush_fn s_flush; cy_sink_putc_fn s_putc; uint8_t* s_0; /* origin (start of buffer) */ uint8_t* s_p; /* pointer (cursor in buffer) */ uint8_t* s_x; /* extent (one past last buf byte) */ int s_err; /* zero or error */ uint16_t s_tab; /* current indent depth */ uint16_t s_pad; /* reserved (alignment) */ };

static inline void cy_sink_ctor(cy_sink* s, cy_sink_write_fn w, cy_sink_flush_fn f, cy_sink_putc_fn c, cy_iovec_t v) { uint8_t* p = (uint8_t*) v.iov_base; s->s_write = w; s->s_flush = f; s->s_putc = c; if (p && v.iov_len) { /* initial buffer exists? */ s->s_0 = s->s_p = p; s->s_x = p + v.iov_len; /* one past last buf byte */ } else { s->s_0 = s->s_p = s->s_x = 0; /* empty buffer */ } s->s_err = 0; s->s_tab = 0; } /* writev() writes each iovec to sink s using sink_write_do() */ int /* -1 on error; otherwise sum of bytes written */ cy_sink_writev(cy_sink* s, const cy_iovec_t* v, int cnt);

static inline int /* virtual dispatch */ sink_write_do(cy_sink* s, const void* b, uint32_t n) { return (*s->s_write)(s, b, n); }

static inline int /* virtual dispatch */ sink_flush_do(cy_sink* s) { return (*s->s_flush)(s); }

static inline void /* sometimes virtual dispatch if needed */ cy_sink_c(cy_sink* s, int c) { /* write one byte */ /* note: virtual putc() only called when buffer is full: */ if (s->s_p < s->s_x) { /* room for a byte in buf? */ *s->s_p++ = (uint8_t) c; } else { (*s->s_putc)(s, c); /* flush and then handle byte */ } } void cy_sink_1c(cy_sink* s, int c); /* non-inline c() above*/ void cy_sink_2c(cy_sink* s, int a, int b); /* c(a); c(b); */ void cy_sink_3c(cy_sink* s, int a, int b, int c); static inline void cy_sink_line(cy_sink* s) { cy_sink_c(s, '\n'); } /* newline only */ void cy_sink_n(cy_sink* s); /* newline then INDENT to depth */ void cy_sink_tabs(cy_sink* s, unsigned count); /* count tabs */

static inline void /* increase tab depth by one */ cy_sink_t(cy_sink* s) { ++s->s_tab; } static inline void /* increase tab depth by two */ cy_sink_tt(cy_sink* s) { s->s_tab += 2; } static inline void /* untab depth by one */ cy_sink_u(cy_sink* s) { if (s->s_tab) --s->s_tab; } static inline void /* untab depth by two */ cy_sink_uu(cy_sink* s) { if (s->s_tab >= 2) s->s_tab -= 2; } static inline void /* tab, then newline indent */ cy_sink_tn(cy_sink* s) { cy_sink_t(s); cy_sink_n(s); } static inline void /* tab twice, then newline indent */ cy_sink_ttn(cy_sink* s) { cy_sink_tt(s); cy_sink_n(s); } static inline void /* untab, then newline indent */ cy_sink_un(cy_sink* s) { cy_sink_u(s); cy_sink_n(s); } static inline void /* untab twice, then newline indent */ cy_sink_uun(cy_sink* s) { cy_sink_uu(s); cy_sink_n(s); }

static inline void /* virtual dispatch */ cy_sink_s(cy_sink* s, const char* cstr) { /* write C string */ (void) (*s->s_write)(s, cstr, strlen(cstr)); } static inline void /* tab, then newline indent */ cy_sink_ns(cy_sink* s, const char* cstr) { cy_sink_n(s); cy_sink_s(s, cstr); } static inline void /* tab, then newline indent */ cy_sink_sn(cy_sink* s, const char* cstr) { cy_sink_s(s, cstr); cy_sink_n(s); } static inline void /* tab, then newline indent */ cy_sink_nst(cy_sink* s, const char* cstr) { cy_sink_n(s); cy_sink_s(s, cstr); cy_sink_t(s); } static inline void /* tab, then newline indent */ cy_sink_tns(cy_sink* s, const char* cstr) { cy_sink_t(s); cy_sink_n(s); cy_sink_s(s, cstr); } static inline void /* tab, then newline indent */ cy_sink_st(cy_sink* s, const char* cstr) { cy_sink_s(s, cstr); cy_sink_t(s); } static inline void /* tab, then newline indent */ cy_sink_us(cy_sink* s, const char* cstr) { cy_sink_u(s); cy_sink_s(s, cstr); } static inline void /* tab, then newline indent */ cy_sink_uns(cy_sink* s, const char* cstr) { cy_sink_u(s); cy_sink_n(s); cy_sink_s(s, cstr); } void /* basically sink_2c('<', '/'); sink_s(tag); sink_c('>'); */ cy_sink_end(cy_sink* s, const char* tag); /* '<' '/' tag '>' */

static inline void /* untab, then '<' '/' tag '>' */ cy_sink_nend(cy_sink* s, const char* tag) { cy_sink_n(s); cy_sink_end(s, tag); } static inline void /* untab, then '<' '/' tag '>' */ cy_sink_uend(cy_sink* s, const char* tag) { cy_sink_u(s); cy_sink_end(s, tag); } static inline void /* untab, LF indent, then '<' '/' tag '>' */ cy_sink_unend(cy_sink* s, const char* tag) { cy_sink_un(s); cy_sink_end(s, tag); } static inline void /* untab, LF indent, then '<' '/' tag '>' */ cy_sink_uunend(cy_sink* s, const char* tag) { cy_sink_uun(s); cy_sink_end(s, tag); }

static inline void cy_sink_tc(cy_sink* s, int c) { cy_sink_t(s); cy_sink_1c(s, c); } static inline void cy_sink_uc(cy_sink* s, int c) { cy_sink_u(s); cy_sink_1c(s, c); } static inline void cy_sink_ct(cy_sink* s, int c) { cy_sink_1c(s, c); cy_sink_t(s); } static inline void cy_sink_cu(cy_sink* s, int c) { cy_sink_1c(s, c); cy_sink_u(s); } static inline void cy_sink_cn(cy_sink* s, int c) { cy_sink_1c(s, c); cy_sink_n(s); }

static inline void cy_sink_ctn(cy_sink* s, int c) { cy_sink_1c(s, c); cy_sink_t(s); cy_sink_n(s); } static inline void cy_sink_cttn(cy_sink* s, int c) { cy_sink_1c(s, c); cy_sink_tt(s); cy_sink_n(s); } void /* f() is basically the same as printf() */ cy_sink_f(cy_sink* s, const char* fmt, ...); /* f() */ void cy_sink_fn(cy_sink* s, const char* fmt, ...); void cy_sink_ft(cy_sink* s, const char* fmt, ...); void cy_sink_ftn(cy_sink* s, const char* fmt, ...); void cy_sink_nf(cy_sink* s, const char* fmt, ...); void cy_sink_nft(cy_sink* s, const char* fmt, ...);

/* ----- fdsink ----- */ typedef struct cy_fdsink_ cy_fdsink; /* file descriptor sink */ int cy_fdsink_write(cy_sink* s, const void* b, uint32_t n); int cy_fdsink_flush(cy_sink* s); void cy_fdsink_putc(cy_sink* s, int c); #define cy_fdsink_TAG 0x6664736b /*'fdsk'*/ struct cy_fdsink_ { /* file descriptor 'sink' output stream */ cy_sink f_sink; /* base class */ cy_iovec_t f_iov; /* entire local buffer */ uint32_t f_tag; /* must be TAG */ int f_fd; /* file descriptor (where to write) */ uint32_t f_out; /* bytes written to descriptor */ };

static inline void cy_fdsink_ctor(cy_fdsink* s, cy_iovec_t v, int fd) { cy_sink* base = &s->f_sink; cy_sink_ctor(base, cy_fdsink_write, cy_fdsink_flush, cy_fdsink_putc, v); s->f_iov = v; s->f_tag = cy_fdsink_TAG; s->f_fd = fd; /* change to later if you have new one */ s->f_out = 0; }

stream code

     Finally, here's what you put in the implementation:

static int /* internal/private write to descriptor */ _fdsink_write(cy_fdsink* s, const void* buf, unsigned len) { const uint8_t* src = (const uint8_t*) buf; int tries = 0; int err = 0; unsigned outSize = 0; while (len) { if (++tries > 20) { cy_printf("write() tries=%d errno=%d", tries, err); s->f_sink.s_err = err; return outSize; } int did = ::write(s->f_fd, src, len); if (did < 0) { err = errno; if (EAGAIN == err || EINTR == err) continue; /* try again */ cy_printf("write(m_fd=%d, len=%d)=%d errno=%d", (int) s->f_fd, (int) len, did, err); s->f_sink.s_err = err; return -1; } if (did > (int) len) { cy_printf("write(n=%d) actual=%d??", (int) len, did); did = len; /* do not subtract more then len */ } src += did; len -= did; outSize += did; s->f_out += did; } return outSize; }

int /* -1 on error; otherwise sum of bytes written */ cy_fdsink_write(cy_sink* s, const void* buf, uint32_t sz) { unsigned outSize = 0; int actual = 0; cy_fdsink* f = (cy_fdsink*) s; assert(cy_fdsink_TAG == f->f_tag); const uint8_t* src = (const uint8_t*) buf; if (!sz) { return 0; } /* write nothing? */ if (!src) { s->s_err = EINVAL; return -1; } if (cy_unlikely(s->s_p < s->s_0 || s->s_p > s->s_x)) { s->s_err = EINVAL; assert(s->s_p >= s->s_0 && s->s_p <= s->s_x); /* die */ } unsigned room = s->s_x - s->s_p; /* space left in buf */ if (s->s_p == s->s_0) { /* empty? NOTHING in local buf? */ if (sz > f->f_iov.iov_len) { /* cannot fit in buf? */ actual = _fdsink_write(f, src, sz); /* direct */ if (actual < 0) return -1; outSize += (unsigned) actual; } else { /* sz <= iov_len implies it fits in buffer */ memcpy(s->s_p, src, sz); /* safely put in buffer */ s->s_p += sz; outSize += sz; /* advance */ assert(s->s_p <= s->s_x); /* not beyond buf end */ } } else { /* SOME already buffered bytes are present */ if (sz > room) { /* more than fits in space left? */ unsigned more = sz - room; /* excess bytes */ if (room) { /* any more space left at all? */ memcpy(s->s_p, src, room); /* fill, then flush */ s->s_p += room; outSize += room; src += room; } actual = _fdsink_write(f, s->s_0, s->s_p - s->s_0); if (actual < 0) return -1; s->s_p = s->s_0; /* buffer is empty again */ if (sz > f->f_iov.iov_len) { /* cannot fit in buf? */ actual = _fdsink_write(f, src, more); /* direct */ if (actual < 0) return -1; outSize += (unsigned) actual; } else { /* buffer is flushed: remainder now fits */ memcpy(s->s_p, src, more); s->s_p += more; outSize += more; src += more; assert(s->s_p <= s->s_x); /* not beyond buf end */ } } else { /* room >= sz: input bytes fit in space left: */ memcpy(s->s_p, src, sz); s->s_p += sz; outSize += sz; } } return outSize; }

int cy_fdsink_flush(cy_sink* s) { cy_fdsink* f = (cy_fdsink*) s; assert(cy_fdsink_TAG == f->f_tag); if (cy_unlikely(s->s_p < s->s_0)) { assert(s->s_p >= s->s_0); /* die */ s->s_err = EINVAL; return EINVAL; } unsigned size = s->s_p - s->s_0; if (size) { /* anything in buffer? */ if (f->f_fd >= 0) { /* descriptor is valid */ s->s_p = s->s_0; /* empty again: origin */ int actual = _fdsink_write(f, s->s_0, size); if (actual < 0) { return actual; } } } return 0; }

void cy_fdsink_putc(cy_sink* s, int c) { if (0 == cy_fdsink_flush(s)) { if (s->s_p < s->s_x) { *s->s_p++ = (uint8_t) c; } } }

/* ----- sink ----- */ /* writev() writes each iovec to sink s using sink_write_do() */ int /* -1 on error; otherwise sum of bytes written */ cy_sink_writev(cy_sink* s, cy_iovec_t* iov, int cnt) { int sum = 0; for (int i = 0; i < cnt; ++i) { const cy_iovec_t* v = iov + i; int actual = sink_write_do(s, v->iov_base, v->iov_len); if (actual >= 0) sum += actual; else return actual; /* stop on negative */ } return sum; }

void cy_sink_1c(cy_sink* s, int c) { /* non-inline c() */ cy_sink_c(s, c); /* inline */ } void cy_sink_2c(cy_sink* s, int a, int b) { /* c(a); c(b); */ cy_sink_c(s, a); /* inline */ cy_sink_c(s, b); /* inline */ } void cy_sink_3c(cy_sink* s, int a, int b, int c) { cy_sink_c(s, a); /* inline */ cy_sink_c(s, b); /* inline */ cy_sink_c(s, c); /* inline */ } void cy_sink_n(cy_sink* s) { /* newline then INDENT to depth */ cy_sink_c(s, '\n'); /* newline */ cy_sink_tabs(s, s->s_tab); /* indent */ } void /* basically sink_2c('<', '/'); sink_s(tag); sink_c('>'); */ cy_sink_end(cy_sink* s, const char* tag) { /* '<' '/' tag '>' */ cy_sink_c(s, '<'); /* inline */ cy_sink_c(s, '/'); /* inline */ sink_write_do(s, tag, strlen(tag)); cy_sink_c(s, '>'); /* inline */ }

static const char* cy_blanks = /* ALL BLANKS on next line with comment ruler: */ " "; /* 123456789_123456789_123456789_123456789_123456789_123456789_123456789_12 */

#define cy_blanks_len 72 /*length of cy_blanks above*/ void cy_sink_tabs(cy_sink* s, unsigned tab) { unsigned sz = tab * 2; if (sz > 1024) /* too big? */ sz = 1024; while (sz) { unsigned quantum = (sz > cy_blanks_len)? cy_blanks_len: sz; sink_write_do(s, cy_blanks, quantum); sz -= quantum; } }

void /* f() is basically the same as printf() */ cy_sink_f(cy_sink* s, const char* fmt, ...) { /* f() */ char temp[ 2048 + 2 ]; va_list args; va_start(args,fmt); vsnprintf(temp, 2048, fmt, args); va_end(args); temp[2048] = 0; /* ensure end nul */ sink_write_do(s, temp, strlen(temp)); } void cy_sink_fn(cy_sink* s, const char* fmt, ...) { char temp[ 2048 + 2 ]; va_list args; va_start(args,fmt); vsnprintf(temp, 2048, fmt, args); va_end(args); temp[2048] = 0; /* ensure end nul */ sink_write_do(s, temp, strlen(temp)); cy_sink_n(s); /* newline indent */ }

void cy_sink_ft(cy_sink* s, const char* fmt, ...) { char temp[ 2048 + 2 ]; va_list args; va_start(args,fmt); vsnprintf(temp, 2048, fmt, args); va_end(args); temp[2048] = 0; /* ensure end nul */ sink_write_do(s, temp, strlen(temp)); cy_sink_t(s); /* tab */ } void cy_sink_ftn(cy_sink* s, const char* fmt, ...) { char temp[ 2048 + 2 ]; va_list args; va_start(args,fmt); vsnprintf(temp, 2048, fmt, args); va_end(args); temp[2048] = 0; /* ensure end nul */ sink_write_do(s, temp, strlen(temp)); cy_sink_t(s); /* tab */ cy_sink_n(s); /* newline indent */ }

void cy_sink_nf(cy_sink* s, const char* fmt, ...) { char temp[ 2048 + 2 ]; va_list args; va_start(args,fmt); vsnprintf(temp, 2048, fmt, args); va_end(args); temp[2048] = 0; /* ensure end nul */ cy_sink_n(s); /* newline indent */ sink_write_do(s, temp, strlen(temp)); } void cy_sink_nft(cy_sink* s, const char* fmt, ...) { char temp[ 2048 + 2 ]; va_list args; va_start(args,fmt); vsnprintf(temp, 2048, fmt, args); va_end(args); temp[2048] = 0; /* ensure end nul */ cy_sink_n(s); sink_write_do(s, temp, strlen(temp)); cy_sink_t(s); /* tab */ }

menu

     Here's a menu of pages on cy code.

  • vector - std::vector clone
  • bheap - binary min heap
  • label - async descriptors
  • misc - bunch of basic utils
  • pool - C-based vats and pools
  • deck - scatter/gather suite
  • sink - C-based out stream api
  • row - a new deck rewrite

license

     See license and copyright for code here. For more context, see the cy page.

comments

     Compared to a thorn demo, I explain cy code less: I care little whether folks use or grasp cy source. But since I aim to get ideas across, I reveal a point to code constructs so you see intentions.

     If you ask: What was this for? That's the only question I address: why a thing was done. If you what to know how code works or what loose ends remain, figure it out.

color coding

     Library source code appears appears in amber (orange/brown):

amber is_source(code* c);

     Source .cpp code appears in red:

void cy_logf(int, const char* f, ...) { char temp[ 2048 + 4 ]; va_list args; va_start(args,f); vsnprintf(temp, 2048, f, args); va_end(args); temp[2048] = 0; printf("%s\n", temp); }

     Sample test code is purple:

o << "# purple=test green=stdout" << cy_newl;

     Printed output on stdout is green:

# purple=test green=stdout

     I know these aren't the best color cues. (Amber and green might appear the same hue to color blind folks. I have excellent color discrimination myself.)