6 Threads

14 minute read

Published:

Creating threads

An execution inside a process (two functions in a process running at the same time), sharing the same address space (same heap, same static var, but can’t share stack space) ^4bba4d

  • When thread starts running, the OS assigns a new slab of space in the middle for stack.
    • Limit of stack space for each process
      #include <pthread.h>
      int pthread_create(pthread_t *thread, 
                    const pthread_attr_t *attr,
                    void *(*start_routine)(void *), 
                    void *arg); 
      int pthread_join(pthread_t thread, void **retval);
      
  • thread: thread ID
  • attr: typically NULL, a fine-grain control of thread behavior
  • start_routine: input void *, return void *
  • arg: for start_routine(arg)
  • retval: stores the return value of start_routine() Returns 0 if OK, error number on failure

bank0.c

int balance = 0;

void* deposit(void *arg) {        
    for (int i = 0; i < 1e7; i++)  ++balance;
    long r = 10 * (long)arg;
    return (void *)r;    // test if you can pass something and get it back
}

void* withdraw(void *arg) {        
    for (int i = 0; i < 1e7; i++)  --balance;
    long r = 10 * (long)arg;
    return (void *)r;
}

int main() {
    pthread_t t1, t2;
    // run two threads
    pthread_create(&t1, NULL, &deposit,  (void*)1);        
    pthread_create(&t2, NULL, &withdraw, (void*)2);
    // run deposit((void *)1)

	void *r1;
    void *r2;
    
    pthread_join(t1, &r1);  // waits until t1 terminates
    pthread_join(t2, &r2);  // after t1 done, wait for t2

    printf("t1 returned %ld\n", (long)r1);
    printf("t2 returned %ld\n", (long)r2);
    printf("balance = %d\n", balance);
}

Problem: balance++ is not atomic. fetch -> increment -> store

  • Typically happens when running on multiple CPUs
  • taskset 1 ./bank0 only runs on CPU 1
  • taskset 3 runs on 1 and 3.

Mutex lock

MUTual EXclusion lock bank1.c

pthread_mutex_t balance_lock = PTHREAD_MUTEX_INITIALIZER;

void* deposit(void *arg) {        
    for (int i = 0; i < 1e7; i++) {
        pthread_mutex_lock(&balance_lock);   // 
        ++balance;   // critical session: don't want interrupt
        pthread_mutex_unlock(&balance_lock); // lock and unlock same thread!
    }
    long r = 10 * (long)arg;
    return (void *)r;
}

When a second function calls mutex_lock(), it will not return (block) until the first thread calls mutex_unlock()

Problem: really slow (hyperfine)

API

int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t *attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
  • Return 0 if OK, error number on failure We didn’t use them because we declared balance_lock as static.
  • Typically want pthread_mutex_t object to live in a bigger data struct
    • malloc() the whole structure as a whole
    • Pass the pointer to mutex_init()
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);  // nonblocking if fail
int pthread_mutex_unlock(pthread_mutex_t *mutex);
#include <time.h>
int pthread_mutex_timedlock(pthread_mutex_t *mutex, struct timespec *tsptr);
  • Return 0 if OK, error number on failure

Deadlock

Two threads both can’t proceed

  • Thread tries to lock the same mutex twice
  • A Thread holds mutex A and tries to lock mutex B, and another thread holds mutex B and tries to lock mutex A
  • Fix: strict lock ordering (E. A then B)

Condition variables

Waits for a condition to be true

Example

consider a simple linked list of a message queue:

// simple linked list
struct msg { struct msg *m_next; };
struct msg *workq;   // head

void process_msg(void) {
    struct msg *mp;

    for (;;) {
        while (workq == NULL) { }   // while list is empty, it spins
        
        mp = workq;   
        workq = mp->m_next;  // pops the first node
    }
}

void enqueue_msg(struct msg *mp) {
	pthread_mutex_lock(&qlock);
	
    mp->m_next = workq;  // prepends a node
    workq = mp;
    
    pthread_cond_signal(&qready);
    pthread_mutex_unlock(&qlock);
}

If left them run at the same time, very very messed up. Need to protect with mutex. Now with threading

while (workq == NULL) {  
	pthread_mutex_unlock(&qlock);
	sleep(1);
	pthread_mutex_lock(&qlock); 
	// pthread_cond_wait(&qready, &qlock);
}

very clumsy, and performs a lot of unnecessary locking and unlocking

  • Need a mechanism to unlock, let enqueue() go, and lock it back to process it.
  • -> condition variable. Allows to unlock, wait for some signal, come out with the mutex locked.

API

#include <pthread.h>
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cond);

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
                           struct timespec *tsptr);
                           
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);  // wake up all threads

Return 0 on OK, error number on failure

#include <pthread.h>

// thread initializer
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;

// 
void process_msg(void) {
    struct msg *mp;

    for (;;) {
        pthread_mutex_lock(&qlock);

        while (workq == NULL) {  
            pthread_cond_wait(&qready, &qlock);
        }
        
        mp = workq;   
        workq = mp->m_next;  // pops the first node
        
        pthread_mutex_unlock(&qlock);
    }
}

void enqueue_msg(struct msg *mp) {
    pthread_mutex_lock(&qlock);
    
    mp->m_next = workq;
    workq = mp;
    
    pthread_cond_signal(&qready);
    pthread_mutex_unlock(&qlock);
}

Consumer thread

  1. Pass mutex that has already locked to cond_wait()
  2. cond_wait() syscall will put your thread to sleep, unlocking the mutex.
  3. When enqueue_msg() thread done putting something in there, it calls pthread_cond_signal(&cond).
  4. The kernel takes the thread out. Wakes it up
  5. cond_wait() grabs the lock and locks the mutex (if another thread enqueues a message and locks, then can’t do lock immediately)
  6. cond_wait() returns, and we finally come out.
  7. Still need to go back and check if queue empty
    • In case another consumer thread consumes the message and empties it

Producer thread Signal or unlock first?

  • Optimal to unlock first (minimize locked part), but not always safe

Thread safety

Is it safe to call a function to be called by multiple threads at the same time?

Reentrancy implies thread/signal safe (thread and signal do not imply each other)

  • To make non-reentrant functions thread safe, use lock (E. printf())
    • To facilitate, every open file carries a flockfile()
    • flockfile() is recursive (once locked by printf(), another locked by greptile)

Reentrancy

A function is reentrant if it can be safely paused and called again in the middle of a previous execution. Can be reentered:

  • From same thread when interrupted by a signal and calling it from the signal handler
  • By multiple threads

Worst offender: function that maintains internal static data: strtok(), mutates the original string. Static variables can’t be reentrant!

  • Better version: strtok_r() (r for reentry), pass pointer to the originally static variable.

Signal safety

Signal safety not necessarily implies each other Async-signal-safe if the functions can be called from signal handlers

  • To make a function async signal safe, block all signals that may call this function
    • May not be thread safe Other case: printf() thread safe, but can’t really reenter if called through signal. It will enter via recursive lock, and mess up the internals

Thread signal handling

Which thread gets the signal?

A process-directed signal may be delivered to any of the threads that doesn’t block the signal.

  • Individual threads can block a signal
  • But all threads will share the same signal action Usually designate one thread to handle signals, and block this signal in all others
  • Use pthread_sigmask() instead of [[#Signal mask|sigprocmask()]] sig1.c ```c int sig = 0; void handler(int signo) { sig = signo; } #define handle_error_en(en, msg)
    do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0)

static void *sig_thread(void *arg) { int s; sigset_t *set = arg; // retrieve set

/* Unblock SIGUSR1 and SIGQUIT for this thread */
s = pthread_sigmask(SIG_UNBLOCK, set, NULL);

/* Race condition: you can miss a signal HERE. Alternatively, use
 * sigsuspend() to atomically adjust the signal mask and then suspend
 * execution. */

for (;;) {
    pause();  // slow syscall interrupted by signal
    printf("Signal handling thread got signal %d\n", sig);
}

return NULL; }

int main(int argc, char *argv[]) { pthread_t thread; sigset_t set; int s;

/* Block SIGUSR1 and SIGQUIT (Ctrl+\) */
sigemptyset(&set);
sigaddset(&set, SIGQUIT);
sigaddset(&set, SIGUSR1);
s = pthread_sigmask(SIG_BLOCK, &set, NULL);

/* Install handlers for SIGQUIT and SIGUSR1 */
struct sigaction act;
act.sa_handler = handler;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
sigaction(SIGUSR1, &act, NULL);
sigaction(SIGQUIT, &act, NULL);

s = pthread_create(&thread, NULL, &sig_thread, &set);

/* Main thread carries on to create other threads and/or do
   other work */
pause();        // pause until any signal arrives } ```

blocking a signal is not ignoring it. The signal is instead run by a handler

  1. Main thread first blocks all signals of interest and installs handlers for them
  2. After handler thread created, signal disposition apply to all threads
  3. Handler thread inherits the block set, but unblocks and handles the signal
    • For other signals (^C), whichever gets it will kill the process

Race condition: use sigsuspend() that atomically combines signal unblock and pause()

  • handler() gets called, but pause() is not interrupted. Nothing happens

sigwait()

int sigwait(sigset_t *set, int *sig);
  1. Suspends execution of calling thread
  2. Waits for any signal in the set becomes pending (blocks the signal)
  3. Accepts signal (consumes it from the pending list)
  4. Returns the signal number for you to handle Turn signal to condition variable: wait until something comes in, and consumes it sig2.c: no handler or handler installation
static void *sig_thread(void *arg) {
    sigset_t *set = arg;
    int s, sig;

    for (;;) {
        /* sigwait() consumes a pending signal */
        s = sigwait(set, &sig);
        printf("Signal handling thread got signal %d\n", sig);
    }
    return NULL;
}

int main(int argc, char *argv[]) {
    pthread_t thread;
    sigset_t set;
    int s;

    /* Block SIGUSR1 and SIGQUIT; other threads created by main()
       will inherit a copy of the signal mask. */
    sigemptyset(&set);
    sigaddset(&set, SIGUSR1);
    sigaddset(&set, SIGQUIT);
    s = pthread_sigmask(SIG_BLOCK, &set, NULL);

    s = pthread_create(&thread, NULL, &sig_thread, &set);

    /* Main thread carries on to create other threads and/or do
       other work */

    pause();       
}

Note that the signals remain blocked in both threads

Thread local storage

Thread local global variable: marked with __thread

  • Compiler allocates thread local storage, and each thread will get its own copy
  • Space allocated for it when thread created tls.c: similar to bank example
// Global variable shared by all threads
uint64_t x = 0;

// Thread local global variable: each thread has a copy
__thread uint64_t y = 0;

void *inc_and_print(void *unused) {
    for (int i = 0; i < 1e7; i++) {x++; y++;}
    printf("x=%lu, y=%lu\n", x, y);
    return NULL;
}

int main(int argc, char **argv) {
    pthread_t threads[2];
    pthread_create(&threads[0], NULL, &inc_and_print, NULL);
    pthread_create(&threads[1], NULL, &inc_and_print, NULL);
    pthread_join(threads[0], NULL);
    pthread_join(threads[1], NULL);
    return 0;
}

x will mess up, but y won’t

E. errno, allows multiple threads to use syscalls and errno without interfering

Thread vs process creation

Thread creation

Sample thread program pthread.c:

void *noop(void *arg) { return NULL; }

int main(int argc, char **argv) {
    pthread_t thread;
    pthread_create(&thread, NULL, &noop, NULL);
    pthread_join(thread, NULL);
}
$ strace ./pthread
mmap(NULL, 8392704, PROT_NONE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0x7efe0d9ff000
mprotect(0x7efe0da00000, 8388608, PROT_READ|PROT_WRITE) = 0
  • Stack is created as a private anonymous mapping. MAP_STACK flag specified that mapping is for a stack
  • Mapping size 8392704 is 8MB + 4kB
  • PROT_NONE: allocate without read/write protection first
  • Then call mprotec() to change the protection to READ | WRITE only for the 8MB chunk
    • The bottom 4kB region is left along for no permission
    • Stack rolls down.
    • If stack rolls beyond to read/write there, segfault
rt_sigprocmask(SIG_BLOCK, ~[], [], 8)   = 0

clone3({flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, child_tid=0x7efe0e1ff910, parent_tid=0x7efe0e1ff910, exit_signal=0, stack=0x7efe0d9ff000, stack_size=0x7fff00, tls=0x7efe0e1ff640} => {parent_tid=[0]}, 88) = 3141615

rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
  • rt_sigprocmask() is analogous to pthread_procmask(), underlying syscall for sigprocmask, blocking signal for the entire process, before creating a thread
    • This is required by clone(). Requires all signals to be blocked
    • ~[]: not empty set (universal set), blocking everything
    • []: what you get back (old signal mask), also empty set (nothing blocked)
  • clone3() creates a stack
    • called by pthread_create(). In Linux, fork() also calls it
    • Flags:
      • CLONE_VM: the new task (aka thread) shares the same address space
      • CLONE_FILES: shares same file descriptor table
      • CLONE_SIGHAND: shares signal dispositions
      • CLONE_THREAD: the new task is placed in the same thread group; it will have the same pid but get a unique thread ID (tid)
      • CLONE_SETTLS: sets the new task’s TLS descriptor to the tls argument
      • stack: the address of the new task’s stack. This is the address that mmap() returned earlier
  • Second mask to restore the previous signal mask (empty set)

Process creation

int main(int argc, char **argv) {
    if ((pid_t pid = fork()) > 0) { waitpid(pid, NULL, 0); }
}
clone(child_stack=NULL, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f2c04fc1a10) = 3157115

wait4(3157115, NULL, 0, NULL)           = 3157115
--- SIGCHLD {si_signo=SIGCHLD, si_code=CLD_EXITED, si_pid=3157115, si_uid=1008, si_status=0, si_utime=0, si_stime=0} ---
  • Lots of the clone() flags are missing
    • SIGCHLDSIGCHLD sent to the parent process when this child process terminates.
  • child_stack=NULL: the child process uses the same stack area as the parent, but in its own address space
  • Note that the resource-sharing flags from pthread_create()’s invocation aren’t specified here.
  • --- ---strace shows the parent process receives SIGCHLD when child terminates.

    Takeaway: underlying syscalls for library functions. For Linux, pthread_create() and fork() both call clone(), and mmap() is involved

False sharing

E. for malloctupus, if multiple threads (CPUs) looking at the same piece of memory. Other CPU’s cache will invalidate certain portions of each other’s cache -> multithreading even slower

void inc(uint64_t *val) {
    for (int i = 0; i < 1e7; i++)
        ++*val;
    printf("%lu\n", *val);
}

void* inc_thread(void *v) {
    inc(v);
    return NULL;
}

int main(int argc, char **argv)
{
    mm_init();
    void *unused = mm_malloc(32);
    uint64_t *p = mm_malloc(8);
    uint64_t *q = mm_malloc(8);
    mm_checkheap(1);

    pthread_t t1, t2;
    pthread_create(&t1, NULL, &inc_thread, p);
    pthread_create(&t2, NULL, &inc_thread, q);
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
}

p and q are in the same cache block, and invalidated together

  • 48B unused, combined with the 16B prologue, aligns p and q to the same cache block
  • Solution: allocate p and q to different blocks (E. p = mm_malloc(64))

Modern memory allocators are optimized to reduce contention in multi-threaded environments. For example, the GNU C implementation does the following:

  • Instead of maintaining one centralized heap structure, the allocator maintains multiple disjoint “arenas” from which to allocate memory from. This reduces lock contention because two threads allocating from different arenas do not compete for the same lock.
  • To avoid contending for a lock all together, each thread maintains a per-thread cache, called the tcache, containing a blocks which that can be accessed without locking an arena. This is done with the help of TLS.