|
sink
I wrote about C-based out stream sinks earlier this month. This is a verbatim copy of what appeared in my log. I put a copy here only to clarify license and copyright.
purpose
The purpose of cy_event and cy_heap is to support a simple kind of priority queue for events delivered under a virtual clock. 12apr09
¶
tastier tomatoes
cy_sink ¶ I won't say much about the C stream api at all. Read the out demo for explanations. This code is a clone of yo in the out thorn demo (cf «). typedef struct cy_sink_ cy_sink; /* output sink stream */
typedef int /* like ::write() */
(*cy_sink_write_fn)(cy_sink* s, const void* b, uint32_t n);
typedef int (*cy_sink_flush_fn)(cy_sink* s);
typedef void (*cy_sink_putc_fn)(cy_sink* s, int c); /* fallback */
struct cy_sink_ { /* 'sink' output stream */
cy_sink_write_fn s_write;
cy_sink_flush_fn s_flush;
cy_sink_putc_fn s_putc;
uint8_t* s_0; /* origin (start of buffer) */
uint8_t* s_p; /* pointer (cursor in buffer) */
uint8_t* s_x; /* extent (one past last buf byte) */
int s_err; /* zero or error */
uint16_t s_tab; /* current indent depth */
uint16_t s_pad; /* reserved (alignment) */
};
static inline void
cy_sink_ctor(cy_sink* s, cy_sink_write_fn w, cy_sink_flush_fn f,
cy_sink_putc_fn c, cy_iovec_t v) {
uint8_t* p = (uint8_t*) v.iov_base;
s->s_write = w;
s->s_flush = f;
s->s_putc = c;
if (p && v.iov_len) { /* initial buffer exists? */
s->s_0 = s->s_p = p;
s->s_x = p + v.iov_len; /* one past last buf byte */
}
else {
s->s_0 = s->s_p = s->s_x = 0; /* empty buffer */
}
s->s_err = 0;
s->s_tab = 0;
}
/* writev() writes each iovec to sink s using sink_write_do() */
int /* -1 on error; otherwise sum of bytes written */
cy_sink_writev(cy_sink* s, const cy_iovec_t* v, int cnt);
static inline int /* virtual dispatch */
sink_write_do(cy_sink* s, const void* b, uint32_t n) {
return (*s->s_write)(s, b, n);
}
static inline int /* virtual dispatch */
sink_flush_do(cy_sink* s) {
return (*s->s_flush)(s);
}
static inline void /* sometimes virtual dispatch if needed */
cy_sink_c(cy_sink* s, int c) { /* write one byte */
/* note: virtual putc() only called when buffer is full: */
if (s->s_p < s->s_x) { /* room for a byte in buf? */
*s->s_p++ = (uint8_t) c;
}
else {
(*s->s_putc)(s, c); /* flush and then handle byte */
}
}
void cy_sink_1c(cy_sink* s, int c); /* non-inline c() above*/
void cy_sink_2c(cy_sink* s, int a, int b); /* c(a); c(b); */
void cy_sink_3c(cy_sink* s, int a, int b, int c);
static inline void
cy_sink_line(cy_sink* s) { cy_sink_c(s, '\n'); } /* newline only */
void cy_sink_n(cy_sink* s); /* newline then INDENT to depth */
void cy_sink_tabs(cy_sink* s, unsigned count); /* count tabs */
static inline void /* increase tab depth by one */
cy_sink_t(cy_sink* s) { ++s->s_tab; }
static inline void /* increase tab depth by two */
cy_sink_tt(cy_sink* s) { s->s_tab += 2; }
static inline void /* untab depth by one */
cy_sink_u(cy_sink* s) { if (s->s_tab) --s->s_tab; }
static inline void /* untab depth by two */
cy_sink_uu(cy_sink* s) { if (s->s_tab >= 2) s->s_tab -= 2; }
static inline void /* tab, then newline indent */
cy_sink_tn(cy_sink* s) { cy_sink_t(s); cy_sink_n(s); }
static inline void /* tab twice, then newline indent */
cy_sink_ttn(cy_sink* s) { cy_sink_tt(s); cy_sink_n(s); }
static inline void /* untab, then newline indent */
cy_sink_un(cy_sink* s) { cy_sink_u(s); cy_sink_n(s); }
static inline void /* untab twice, then newline indent */
cy_sink_uun(cy_sink* s) { cy_sink_uu(s); cy_sink_n(s); }
static inline void /* virtual dispatch */
cy_sink_s(cy_sink* s, const char* cstr) { /* write C string */
(void) (*s->s_write)(s, cstr, strlen(cstr));
}
static inline void /* tab, then newline indent */
cy_sink_ns(cy_sink* s, const char* cstr) {
cy_sink_n(s); cy_sink_s(s, cstr);
}
static inline void /* tab, then newline indent */
cy_sink_sn(cy_sink* s, const char* cstr) {
cy_sink_s(s, cstr); cy_sink_n(s);
}
static inline void /* tab, then newline indent */
cy_sink_nst(cy_sink* s, const char* cstr) {
cy_sink_n(s); cy_sink_s(s, cstr); cy_sink_t(s);
}
static inline void /* tab, then newline indent */
cy_sink_tns(cy_sink* s, const char* cstr) {
cy_sink_t(s); cy_sink_n(s); cy_sink_s(s, cstr);
}
static inline void /* tab, then newline indent */
cy_sink_st(cy_sink* s, const char* cstr) {
cy_sink_s(s, cstr); cy_sink_t(s);
}
static inline void /* tab, then newline indent */
cy_sink_us(cy_sink* s, const char* cstr) {
cy_sink_u(s); cy_sink_s(s, cstr);
}
static inline void /* tab, then newline indent */
cy_sink_uns(cy_sink* s, const char* cstr) {
cy_sink_u(s); cy_sink_n(s); cy_sink_s(s, cstr);
}
void /* basically sink_2c('<', '/'); sink_s(tag); sink_c('>'); */
cy_sink_end(cy_sink* s, const char* tag); /* '<' '/' tag '>' */
static inline void /* untab, then '<' '/' tag '>' */
cy_sink_nend(cy_sink* s, const char* tag) {
cy_sink_n(s); cy_sink_end(s, tag);
}
static inline void /* untab, then '<' '/' tag '>' */
cy_sink_uend(cy_sink* s, const char* tag) {
cy_sink_u(s); cy_sink_end(s, tag);
}
static inline void /* untab, LF indent, then '<' '/' tag '>' */
cy_sink_unend(cy_sink* s, const char* tag) {
cy_sink_un(s); cy_sink_end(s, tag);
}
static inline void /* untab, LF indent, then '<' '/' tag '>' */
cy_sink_uunend(cy_sink* s, const char* tag) {
cy_sink_uun(s); cy_sink_end(s, tag);
}
static inline void
cy_sink_tc(cy_sink* s, int c) { cy_sink_t(s); cy_sink_1c(s, c); }
static inline void
cy_sink_uc(cy_sink* s, int c) { cy_sink_u(s); cy_sink_1c(s, c); }
static inline void
cy_sink_ct(cy_sink* s, int c) { cy_sink_1c(s, c); cy_sink_t(s); }
static inline void
cy_sink_cu(cy_sink* s, int c) { cy_sink_1c(s, c); cy_sink_u(s); }
static inline void
cy_sink_cn(cy_sink* s, int c) { cy_sink_1c(s, c); cy_sink_n(s); }
static inline void
cy_sink_ctn(cy_sink* s, int c) {
cy_sink_1c(s, c); cy_sink_t(s); cy_sink_n(s);
}
static inline void
cy_sink_cttn(cy_sink* s, int c) {
cy_sink_1c(s, c); cy_sink_tt(s); cy_sink_n(s);
}
void /* f() is basically the same as printf() */
cy_sink_f(cy_sink* s, const char* fmt, ...); /* f() */
void cy_sink_fn(cy_sink* s, const char* fmt, ...);
void cy_sink_ft(cy_sink* s, const char* fmt, ...);
void cy_sink_ftn(cy_sink* s, const char* fmt, ...);
void cy_sink_nf(cy_sink* s, const char* fmt, ...);
void cy_sink_nft(cy_sink* s, const char* fmt, ...);
/* ----- fdsink ----- */
typedef struct cy_fdsink_ cy_fdsink; /* file descriptor sink */
int cy_fdsink_write(cy_sink* s, const void* b, uint32_t n);
int cy_fdsink_flush(cy_sink* s);
void cy_fdsink_putc(cy_sink* s, int c);
#define cy_fdsink_TAG 0x6664736b /*'fdsk'*/
struct cy_fdsink_ { /* file descriptor 'sink' output stream */
cy_sink f_sink; /* base class */
cy_iovec_t f_iov; /* entire local buffer */
uint32_t f_tag; /* must be TAG */
int f_fd; /* file descriptor (where to write) */
uint32_t f_out; /* bytes written to descriptor */
};
static inline void
cy_fdsink_ctor(cy_fdsink* s, cy_iovec_t v, int fd) {
cy_sink* base = &s->f_sink;
cy_sink_ctor(base, cy_fdsink_write, cy_fdsink_flush,
cy_fdsink_putc, v);
s->f_iov = v;
s->f_tag = cy_fdsink_TAG;
s->f_fd = fd; /* change to later if you have new one */
s->f_out = 0;
}
stream code ¶ Finally, here's what you put in the implementation: static int /* internal/private write to descriptor */
_fdsink_write(cy_fdsink* s, const void* buf, unsigned len) {
const uint8_t* src = (const uint8_t*) buf;
int tries = 0;
int err = 0;
unsigned outSize = 0;
while (len) {
if (++tries > 20) {
cy_printf("write() tries=%d errno=%d", tries, err);
s->f_sink.s_err = err;
return outSize;
}
int did = ::write(s->f_fd, src, len);
if (did < 0) {
err = errno;
if (EAGAIN == err || EINTR == err)
continue; /* try again */
cy_printf("write(m_fd=%d, len=%d)=%d errno=%d",
(int) s->f_fd, (int) len, did, err);
s->f_sink.s_err = err;
return -1;
}
if (did > (int) len) {
cy_printf("write(n=%d) actual=%d??", (int) len, did);
did = len; /* do not subtract more then len */
}
src += did;
len -= did;
outSize += did;
s->f_out += did;
}
return outSize;
}
int /* -1 on error; otherwise sum of bytes written */
cy_fdsink_write(cy_sink* s, const void* buf, uint32_t sz) {
unsigned outSize = 0;
int actual = 0;
cy_fdsink* f = (cy_fdsink*) s;
assert(cy_fdsink_TAG == f->f_tag);
const uint8_t* src = (const uint8_t*) buf;
if (!sz) { return 0; } /* write nothing? */
if (!src) { s->s_err = EINVAL; return -1; }
if (cy_unlikely(s->s_p < s->s_0 || s->s_p > s->s_x)) {
s->s_err = EINVAL;
assert(s->s_p >= s->s_0 && s->s_p <= s->s_x); /* die */
}
unsigned room = s->s_x - s->s_p; /* space left in buf */
if (s->s_p == s->s_0) { /* empty? NOTHING in local buf? */
if (sz > f->f_iov.iov_len) { /* cannot fit in buf? */
actual = _fdsink_write(f, src, sz); /* direct */
if (actual < 0) return -1;
outSize += (unsigned) actual;
}
else { /* sz <= iov_len implies it fits in buffer */
memcpy(s->s_p, src, sz); /* safely put in buffer */
s->s_p += sz; outSize += sz; /* advance */
assert(s->s_p <= s->s_x); /* not beyond buf end */
}
}
else { /* SOME already buffered bytes are present */
if (sz > room) { /* more than fits in space left? */
unsigned more = sz - room; /* excess bytes */
if (room) { /* any more space left at all? */
memcpy(s->s_p, src, room); /* fill, then flush */
s->s_p += room;
outSize += room; src += room;
}
actual = _fdsink_write(f, s->s_0, s->s_p - s->s_0);
if (actual < 0) return -1;
s->s_p = s->s_0; /* buffer is empty again */
if (sz > f->f_iov.iov_len) { /* cannot fit in buf? */
actual = _fdsink_write(f, src, more); /* direct */
if (actual < 0) return -1;
outSize += (unsigned) actual;
}
else { /* buffer is flushed: remainder now fits */
memcpy(s->s_p, src, more);
s->s_p += more;
outSize += more; src += more;
assert(s->s_p <= s->s_x); /* not beyond buf end */
}
}
else { /* room >= sz: input bytes fit in space left: */
memcpy(s->s_p, src, sz);
s->s_p += sz;
outSize += sz;
}
}
return outSize;
}
int cy_fdsink_flush(cy_sink* s) {
cy_fdsink* f = (cy_fdsink*) s;
assert(cy_fdsink_TAG == f->f_tag);
if (cy_unlikely(s->s_p < s->s_0)) {
assert(s->s_p >= s->s_0); /* die */
s->s_err = EINVAL;
return EINVAL;
}
unsigned size = s->s_p - s->s_0;
if (size) { /* anything in buffer? */
if (f->f_fd >= 0) { /* descriptor is valid */
s->s_p = s->s_0; /* empty again: origin */
int actual = _fdsink_write(f, s->s_0, size);
if (actual < 0) {
return actual;
}
}
}
return 0;
}
void cy_fdsink_putc(cy_sink* s, int c) {
if (0 == cy_fdsink_flush(s)) {
if (s->s_p < s->s_x) {
*s->s_p++ = (uint8_t) c;
}
}
}
/* ----- sink ----- */
/* writev() writes each iovec to sink s using sink_write_do() */
int /* -1 on error; otherwise sum of bytes written */
cy_sink_writev(cy_sink* s, cy_iovec_t* iov, int cnt) {
int sum = 0;
for (int i = 0; i < cnt; ++i) {
const cy_iovec_t* v = iov + i;
int actual = sink_write_do(s, v->iov_base, v->iov_len);
if (actual >= 0)
sum += actual;
else
return actual; /* stop on negative */
}
return sum;
}
void cy_sink_1c(cy_sink* s, int c) { /* non-inline c() */
cy_sink_c(s, c); /* inline */
}
void cy_sink_2c(cy_sink* s, int a, int b) { /* c(a); c(b); */
cy_sink_c(s, a); /* inline */
cy_sink_c(s, b); /* inline */
}
void cy_sink_3c(cy_sink* s, int a, int b, int c) {
cy_sink_c(s, a); /* inline */
cy_sink_c(s, b); /* inline */
cy_sink_c(s, c); /* inline */
}
void cy_sink_n(cy_sink* s) { /* newline then INDENT to depth */
cy_sink_c(s, '\n'); /* newline */
cy_sink_tabs(s, s->s_tab); /* indent */
}
void /* basically sink_2c('<', '/'); sink_s(tag); sink_c('>'); */
cy_sink_end(cy_sink* s, const char* tag) { /* '<' '/' tag '>' */
cy_sink_c(s, '<'); /* inline */
cy_sink_c(s, '/'); /* inline */
sink_write_do(s, tag, strlen(tag));
cy_sink_c(s, '>'); /* inline */
}
static const char* cy_blanks = /* ALL BLANKS on next line with comment ruler: */
" ";
/* 123456789_123456789_123456789_123456789_123456789_123456789_123456789_12 */
#define cy_blanks_len 72 /*length of cy_blanks above*/
void cy_sink_tabs(cy_sink* s, unsigned tab) {
unsigned sz = tab * 2;
if (sz > 1024) /* too big? */
sz = 1024;
while (sz) {
unsigned quantum =
(sz > cy_blanks_len)? cy_blanks_len: sz;
sink_write_do(s, cy_blanks, quantum);
sz -= quantum;
}
}
void /* f() is basically the same as printf() */
cy_sink_f(cy_sink* s, const char* fmt, ...) { /* f() */
char temp[ 2048 + 2 ];
va_list args;
va_start(args,fmt);
vsnprintf(temp, 2048, fmt, args);
va_end(args);
temp[2048] = 0; /* ensure end nul */
sink_write_do(s, temp, strlen(temp));
}
void cy_sink_fn(cy_sink* s, const char* fmt, ...) {
char temp[ 2048 + 2 ];
va_list args;
va_start(args,fmt);
vsnprintf(temp, 2048, fmt, args);
va_end(args);
temp[2048] = 0; /* ensure end nul */
sink_write_do(s, temp, strlen(temp));
cy_sink_n(s); /* newline indent */
}
void cy_sink_ft(cy_sink* s, const char* fmt, ...) {
char temp[ 2048 + 2 ];
va_list args;
va_start(args,fmt);
vsnprintf(temp, 2048, fmt, args);
va_end(args);
temp[2048] = 0; /* ensure end nul */
sink_write_do(s, temp, strlen(temp));
cy_sink_t(s); /* tab */
}
void cy_sink_ftn(cy_sink* s, const char* fmt, ...) {
char temp[ 2048 + 2 ];
va_list args;
va_start(args,fmt);
vsnprintf(temp, 2048, fmt, args);
va_end(args);
temp[2048] = 0; /* ensure end nul */
sink_write_do(s, temp, strlen(temp));
cy_sink_t(s); /* tab */
cy_sink_n(s); /* newline indent */
}
void cy_sink_nf(cy_sink* s, const char* fmt, ...) {
char temp[ 2048 + 2 ];
va_list args;
va_start(args,fmt);
vsnprintf(temp, 2048, fmt, args);
va_end(args);
temp[2048] = 0; /* ensure end nul */
cy_sink_n(s); /* newline indent */
sink_write_do(s, temp, strlen(temp));
}
void cy_sink_nft(cy_sink* s, const char* fmt, ...) {
char temp[ 2048 + 2 ];
va_list args;
va_start(args,fmt);
vsnprintf(temp, 2048, fmt, args);
va_end(args);
temp[2048] = 0; /* ensure end nul */
cy_sink_n(s);
sink_write_do(s, temp, strlen(temp));
cy_sink_t(s); /* tab */
}
menu
Here's a menu of pages on cy code.
license
See license and copyright for code here. For more context, see the cy page.
comments
Compared to a thorn demo, I explain cy code less: I care little whether folks use or grasp cy source. But since I aim to get ideas across, I reveal a point to code constructs so you see intentions. If you ask: What was this for? That's the only question I address: why a thing was done. If you what to know how code works or what loose ends remain, figure it out.
color coding
Library source code appears appears in amber (orange/brown): amber is_source(code* c);
Source .cpp code appears in red: void cy_logf(int, const char* f, ...) {
char temp[ 2048 + 4 ];
va_list args;
va_start(args,f);
vsnprintf(temp, 2048, f, args);
va_end(args);
temp[2048] = 0;
printf("%s\n", temp);
}
Sample test code is purple: o << "# purple=test green=stdout" << cy_newl;
Printed output on stdout is green: # purple=test green=stdout
I know these aren't the best color cues. (Amber and green might appear the same hue to color blind folks. I have excellent color discrimination myself.) |