9 IO Blocking and Multiplexing

5 minute read

Published:

Nonblocking IO

Making slow (blocking forever) syscalls nonblocking

  • Call open() with `O_NONBLOCK
  • Call fcntl() (file control) to turn on o_NONBLOCK file status flag on already opened fd
    • Will be recorded in the file table entry
  • Returns -1 and sets errno to EAGAIN if were to block

Wrapper code to set the flag: get control, and then set control

#include "apue.h"
#include <fcntl.h>

void set_fl(int fd, int flags) /* flags are file status flags to turn on */
{
        int		val;
        // get control
		// second argument: control: GET flag
		// third arg: args (if needed)
        if ((val = fcntl(fd, F_GETFL, 0)) < 0)
                err_sys("fcntl F_GETFL error");

        val |= flags;		/* turn on flags */
		// SET flag
        if (fcntl(fd, F_SETFL, val) < 0)
                err_sys("fcntl F_SETFL error");
}

Clear (turn off) the flag will be the same thing

Example: reading a file into a large buffer (5M)

char buf[500000];

int main(void)
{
    int     ntowrite, nwrite;
    char    *ptr;

    ntowrite = read(STDIN_FILENO, buf, sizeof(buf));
    fprintf(stderr, "read %d bytes\n", ntowrite);

    set_fl(STDOUT_FILENO, O_NONBLOCK);  /* set nonblocking for output*/

    ptr = buf;
    while (ntowrite > 0) {   // keep looping to write to stdout
        errno = 0;
        nwrite = write(STDOUT_FILENO, ptr, ntowrite);
        fprintf(stderr, "nwrite = %d, errno = %d\n", nwrite, errno);

        if (nwrite > 0) {
            ptr += nwrite;
            ntowrite -= nwrite;
        }
    }

    clr_fl(STDOUT_FILENO, O_NONBLOCK);  /* clear nonblocking */
}

Expected behavior: single write, that doesn’t fail (for aststdout)

For slow device (E. fifo pipe, with output cated to another terminal), write() will fail many times, since the pipe is slow. It will sometimes able to write a partial number of bytes, and keep trying

  • If O_NONBLOCK not set, write() will just block until write is complete.
  • However, now write() will return immediately, reporting a failure

select()

To avoid write() errors or blocks E. nc performs two loops at the same time: for reading and writing from/to sockets.

  • Two blocking IOs at the same time: select() select() tells you which IOs are available for R/W

fd_set: a bit vector representing file descriptors (like array of integers with 1024 bits)

void FD_ZERO(fd_set *fdset);          // initialize fdset
void FD_SET(int fd, fd_set *fdset);   // adds fd to fdset
void FD_CLR(int fd, fd_set *fdset);   // removes fd from fdset
int FD_ISSET(int fd, fd_set *fdset);  // tests if fd is in fdset
#include <sys/select.h>
int select(int maxfdp1, // max fd plus 1, or simply FD_SETSIZE
			/* main params */
           fd_set *readfds,   // see if they're ready for reading
           fd_set *writefds,  // see if they're ready for writing
           fd_set *exceptfds, // see if exceptional condition

           struct timeval *tvptr); // timeout

Returns: count of ready descriptors, 0 on timeout, –1 on error select() will turn on/off fd_set to indicate if each individual fd is ready or not

  • Exceptional conditions are NOT errors. Certain fds (E. TCP) supports “out of band/urgent” data
  • You also need a copy of the original fd_set, because select() will modify it to indicate the active set
  • Always get interrupted on signals (select() will fail with errno=EINTR) Parameters:
  • maxfdp1: max fd plus 1

poll() has a better API

Example program

implements a select() server with the following features:

  • Single-threaded, but serves multiple clients that can connect and disconnect at any time
  • Logs messages received from clients to stdout
  • The server shuts down when “quit” is entered on stdin 3 things:
  • Read from stdin
  • Read from network connection(s)
  • Handle listening socket to accept new connections

logger.c

// no op for sig_hander
static void sig_handler(int sig) {}

int main(int argc, char **argv)
{
    int i;
    int sock;
    char buf[100];
    socklen_t size;
    struct sockaddr_in client_name;
    fd_set active_fd_set, read_fd_set;

	// boiler plate: socket(), bind(), listen()
	// 1st thing we do, so sock (server) likely be #3
    sock = init_server_socket(atoi(argv[1]));  

    // catch SIGINT
    struct sigaction act, oact;
    act.sa_handler = sig_handler;
    sigemptyset(&act.sa_mask);
    act.sa_flags = 0;
    act.sa_flags |= SA_RESTART;
    if (sigaction(SIGINT, &act, &oact) < 0)
        die("sigaction failed");

	// start with empty fd_set
    FD_ZERO(&active_fd_set);
    FD_SET(sock, &active_fd_set);         // add servsock
    FD_SET(STDIN_FILENO, &active_fd_set); // add stdin

    while (1) {   // inf loop
        read_fd_set = active_fd_set;   // make a COPY

        if (select(FD_SETSIZE, &read_fd_set, NULL, NULL, NULL) < 0) {
            if (errno == EINTR) {
                fprintf(stderr, "%s\n", "select interrupted");
                continue;
            }
            die("select failed");
        }
        // if select() returns, some the fd_set of mutated

        for (i = 0; i < FD_SETSIZE; ++i) {  // iterate all possilb fds
            if (FD_ISSET(i, &read_fd_set)) {   // if some i is set
                if (i == sock) {
                    size = sizeof(client_name);
                    int new = accept(sock,  // create a new fd
                                     (struct sockaddr *) &client_name, &size);
                    if (new < 0) {die("accept failed");} 
                    else {
                        fprintf(stderr, "new connection from %s (fd:%d)\n",
                                inet_ntoa(client_name.sin_addr), new);
                        FD_SET(new, &active_fd_set);   // sets clntsock to new fd
                    }
                }
                else if (i == STDIN_FILENO) {
                    buf[0] = '\0';
                    read(i, buf, sizeof(buf));
                    if (strncmp(buf, "quit\n", 5) == 0) {exit(0);} 
                    else {fprintf(stderr, "unknown command\n");}
                }
                else {    // clients for later connections
                    int r = read(i, buf, sizeof(buf));
                    if (r < 0) {die("read failed");} 
                    else if (r == 0) {
                        fprintf(stderr, "connection closed (fd:%d)\n", i);
                        close(i);
                        FD_CLR(i, &active_fd_set);
                    } else {
                        write(STDOUT_FILENO, buf, r);
                    }
                }
            }
        }
    }
}