9 IO Blocking and Multiplexing
Published:
Nonblocking IO
Making slow (blocking forever) syscalls nonblocking
- Call
open()with `O_NONBLOCK - Call
fcntl()(file control) to turn ono_NONBLOCKfile status flag on already opened fd- Will be recorded in the file table entry
- Returns
-1and setserrnotoEAGAINif were to block
Wrapper code to set the flag: get control, and then set control
#include "apue.h"
#include <fcntl.h>
void set_fl(int fd, int flags) /* flags are file status flags to turn on */
{
int val;
// get control
// second argument: control: GET flag
// third arg: args (if needed)
if ((val = fcntl(fd, F_GETFL, 0)) < 0)
err_sys("fcntl F_GETFL error");
val |= flags; /* turn on flags */
// SET flag
if (fcntl(fd, F_SETFL, val) < 0)
err_sys("fcntl F_SETFL error");
}
Clear (turn off) the flag will be the same thing
Example: reading a file into a large buffer (5M)
char buf[500000];
int main(void)
{
int ntowrite, nwrite;
char *ptr;
ntowrite = read(STDIN_FILENO, buf, sizeof(buf));
fprintf(stderr, "read %d bytes\n", ntowrite);
set_fl(STDOUT_FILENO, O_NONBLOCK); /* set nonblocking for output*/
ptr = buf;
while (ntowrite > 0) { // keep looping to write to stdout
errno = 0;
nwrite = write(STDOUT_FILENO, ptr, ntowrite);
fprintf(stderr, "nwrite = %d, errno = %d\n", nwrite, errno);
if (nwrite > 0) {
ptr += nwrite;
ntowrite -= nwrite;
}
}
clr_fl(STDOUT_FILENO, O_NONBLOCK); /* clear nonblocking */
}
Expected behavior: single write, that doesn’t fail (for aststdout)
For slow device (E. fifo pipe, with output cated to another terminal), write() will fail many times, since the pipe is slow. It will sometimes able to write a partial number of bytes, and keep trying
- If
O_NONBLOCKnot set,write()will just block until write is complete. - However, now
write()will return immediately, reporting a failure
select()
To avoid write() errors or blocks E. nc performs two loops at the same time: for reading and writing from/to sockets.
- Two blocking IOs at the same time:
select()select()tells you which IOs are available for R/W
fd_set: a bit vector representing file descriptors (like array of integers with 1024 bits)
void FD_ZERO(fd_set *fdset); // initialize fdset
void FD_SET(int fd, fd_set *fdset); // adds fd to fdset
void FD_CLR(int fd, fd_set *fdset); // removes fd from fdset
int FD_ISSET(int fd, fd_set *fdset); // tests if fd is in fdset
#include <sys/select.h>
int select(int maxfdp1, // max fd plus 1, or simply FD_SETSIZE
/* main params */
fd_set *readfds, // see if they're ready for reading
fd_set *writefds, // see if they're ready for writing
fd_set *exceptfds, // see if exceptional condition
struct timeval *tvptr); // timeout
Returns: count of ready descriptors, 0 on timeout, –1 on error select() will turn on/off fd_set to indicate if each individual fd is ready or not
- Exceptional conditions are NOT errors. Certain fds (E. TCP) supports “out of band/urgent” data
- You also need a copy of the original
fd_set, becauseselect()will modify it to indicate the active set - Always get interrupted on signals (
select()will fail witherrno=EINTR) Parameters: maxfdp1: max fd plus 1
poll() has a better API
Example program
implements a select() server with the following features:
- Single-threaded, but serves multiple clients that can connect and disconnect at any time
- Logs messages received from clients to
stdout - The server shuts down when “quit” is entered on
stdin3 things: - Read from
stdin - Read from network connection(s)
- Handle listening socket to accept new connections
logger.c
// no op for sig_hander
static void sig_handler(int sig) {}
int main(int argc, char **argv)
{
int i;
int sock;
char buf[100];
socklen_t size;
struct sockaddr_in client_name;
fd_set active_fd_set, read_fd_set;
// boiler plate: socket(), bind(), listen()
// 1st thing we do, so sock (server) likely be #3
sock = init_server_socket(atoi(argv[1]));
// catch SIGINT
struct sigaction act, oact;
act.sa_handler = sig_handler;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
act.sa_flags |= SA_RESTART;
if (sigaction(SIGINT, &act, &oact) < 0)
die("sigaction failed");
// start with empty fd_set
FD_ZERO(&active_fd_set);
FD_SET(sock, &active_fd_set); // add servsock
FD_SET(STDIN_FILENO, &active_fd_set); // add stdin
while (1) { // inf loop
read_fd_set = active_fd_set; // make a COPY
if (select(FD_SETSIZE, &read_fd_set, NULL, NULL, NULL) < 0) {
if (errno == EINTR) {
fprintf(stderr, "%s\n", "select interrupted");
continue;
}
die("select failed");
}
// if select() returns, some the fd_set of mutated
for (i = 0; i < FD_SETSIZE; ++i) { // iterate all possilb fds
if (FD_ISSET(i, &read_fd_set)) { // if some i is set
if (i == sock) {
size = sizeof(client_name);
int new = accept(sock, // create a new fd
(struct sockaddr *) &client_name, &size);
if (new < 0) {die("accept failed");}
else {
fprintf(stderr, "new connection from %s (fd:%d)\n",
inet_ntoa(client_name.sin_addr), new);
FD_SET(new, &active_fd_set); // sets clntsock to new fd
}
}
else if (i == STDIN_FILENO) {
buf[0] = '\0';
read(i, buf, sizeof(buf));
if (strncmp(buf, "quit\n", 5) == 0) {exit(0);}
else {fprintf(stderr, "unknown command\n");}
}
else { // clients for later connections
int r = read(i, buf, sizeof(buf));
if (r < 0) {die("read failed");}
else if (r == 0) {
fprintf(stderr, "connection closed (fd:%d)\n", i);
close(i);
FD_CLR(i, &active_fd_set);
} else {
write(STDOUT_FILENO, buf, r);
}
}
}
}
}
}
