9 IO Blocking and Multiplexing
Published:
Nonblocking IO
Making slow (blocking forever) syscalls nonblocking
- Call
open()
with `O_NONBLOCK - Call
fcntl()
(file control) to turn ono_NONBLOCK
file status flag on already opened fd- Will be recorded in the file table entry
- Returns
-1
and setserrno
toEAGAIN
if were to block
Wrapper code to set the flag: get control, and then set control
#include "apue.h"
#include <fcntl.h>
void set_fl(int fd, int flags) /* flags are file status flags to turn on */
{
int val;
// get control
// second argument: control: GET flag
// third arg: args (if needed)
if ((val = fcntl(fd, F_GETFL, 0)) < 0)
err_sys("fcntl F_GETFL error");
val |= flags; /* turn on flags */
// SET flag
if (fcntl(fd, F_SETFL, val) < 0)
err_sys("fcntl F_SETFL error");
}
Clear (turn off) the flag will be the same thing
Example: reading a file into a large buffer (5M)
char buf[500000];
int main(void)
{
int ntowrite, nwrite;
char *ptr;
ntowrite = read(STDIN_FILENO, buf, sizeof(buf));
fprintf(stderr, "read %d bytes\n", ntowrite);
set_fl(STDOUT_FILENO, O_NONBLOCK); /* set nonblocking for output*/
ptr = buf;
while (ntowrite > 0) { // keep looping to write to stdout
errno = 0;
nwrite = write(STDOUT_FILENO, ptr, ntowrite);
fprintf(stderr, "nwrite = %d, errno = %d\n", nwrite, errno);
if (nwrite > 0) {
ptr += nwrite;
ntowrite -= nwrite;
}
}
clr_fl(STDOUT_FILENO, O_NONBLOCK); /* clear nonblocking */
}
Expected behavior: single write, that doesn’t fail (for aststdout
)
For slow device (E. fifo pipe, with output cat
ed to another terminal), write()
will fail many times, since the pipe is slow. It will sometimes able to write a partial number of bytes, and keep trying
- If
O_NONBLOCK
not set,write()
will just block until write is complete. - However, now
write()
will return immediately, reporting a failure
select()
To avoid write()
errors or blocks E. nc
performs two loops at the same time: for reading and writing from/to sockets.
- Two blocking IOs at the same time:
select()
select()
tells you which IOs are available for R/W
fd_set
: a bit vector representing file descriptors (like array of integers with 1024 bits)
void FD_ZERO(fd_set *fdset); // initialize fdset
void FD_SET(int fd, fd_set *fdset); // adds fd to fdset
void FD_CLR(int fd, fd_set *fdset); // removes fd from fdset
int FD_ISSET(int fd, fd_set *fdset); // tests if fd is in fdset
#include <sys/select.h>
int select(int maxfdp1, // max fd plus 1, or simply FD_SETSIZE
/* main params */
fd_set *readfds, // see if they're ready for reading
fd_set *writefds, // see if they're ready for writing
fd_set *exceptfds, // see if exceptional condition
struct timeval *tvptr); // timeout
Returns: count of ready descriptors, 0 on timeout, –1 on error select()
will turn on/off fd_set
to indicate if each individual fd
is ready or not
- Exceptional conditions are NOT errors. Certain fds (E. TCP) supports “out of band/urgent” data
- You also need a copy of the original
fd_set
, becauseselect()
will modify it to indicate the active set - Always get interrupted on signals (
select()
will fail witherrno=EINTR
) Parameters: maxfdp1
: max fd plus 1
poll()
has a better API
Example program
implements a select()
server with the following features:
- Single-threaded, but serves multiple clients that can connect and disconnect at any time
- Logs messages received from clients to
stdout
- The server shuts down when “quit” is entered on
stdin
3 things: - Read from
stdin
- Read from network connection(s)
- Handle listening socket to accept new connections
logger.c
// no op for sig_hander
static void sig_handler(int sig) {}
int main(int argc, char **argv)
{
int i;
int sock;
char buf[100];
socklen_t size;
struct sockaddr_in client_name;
fd_set active_fd_set, read_fd_set;
// boiler plate: socket(), bind(), listen()
// 1st thing we do, so sock (server) likely be #3
sock = init_server_socket(atoi(argv[1]));
// catch SIGINT
struct sigaction act, oact;
act.sa_handler = sig_handler;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
act.sa_flags |= SA_RESTART;
if (sigaction(SIGINT, &act, &oact) < 0)
die("sigaction failed");
// start with empty fd_set
FD_ZERO(&active_fd_set);
FD_SET(sock, &active_fd_set); // add servsock
FD_SET(STDIN_FILENO, &active_fd_set); // add stdin
while (1) { // inf loop
read_fd_set = active_fd_set; // make a COPY
if (select(FD_SETSIZE, &read_fd_set, NULL, NULL, NULL) < 0) {
if (errno == EINTR) {
fprintf(stderr, "%s\n", "select interrupted");
continue;
}
die("select failed");
}
// if select() returns, some the fd_set of mutated
for (i = 0; i < FD_SETSIZE; ++i) { // iterate all possilb fds
if (FD_ISSET(i, &read_fd_set)) { // if some i is set
if (i == sock) {
size = sizeof(client_name);
int new = accept(sock, // create a new fd
(struct sockaddr *) &client_name, &size);
if (new < 0) {die("accept failed");}
else {
fprintf(stderr, "new connection from %s (fd:%d)\n",
inet_ntoa(client_name.sin_addr), new);
FD_SET(new, &active_fd_set); // sets clntsock to new fd
}
}
else if (i == STDIN_FILENO) {
buf[0] = '\0';
read(i, buf, sizeof(buf));
if (strncmp(buf, "quit\n", 5) == 0) {exit(0);}
else {fprintf(stderr, "unknown command\n");}
}
else { // clients for later connections
int r = read(i, buf, sizeof(buf));
if (r < 0) {die("read failed");}
else if (r == 0) {
fprintf(stderr, "connection closed (fd:%d)\n", i);
close(i);
FD_CLR(i, &active_fd_set);
} else {
write(STDOUT_FILENO, buf, r);
}
}
}
}
}
}