6 Threads
Published:
Creating threads
An execution inside a process (two functions in a process running at the same time), sharing the same address space (same heap, same static var, but can’t share stack space) ^4bba4d
- When thread starts running, the OS assigns a new slab of space in the middle for stack.
- Limit of stack space for each process
#include <pthread.h> int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg); int pthread_join(pthread_t thread, void **retval);
- Limit of stack space for each process
thread
: thread IDattr
: typicallyNULL
, a fine-grain control of thread behaviorstart_routine
: inputvoid *
, returnvoid *
arg
: forstart_routine(arg)
retval
: stores the return value ofstart_routine()
Returns 0 if OK, error number on failure
bank0.c
int balance = 0;
void* deposit(void *arg) {
for (int i = 0; i < 1e7; i++) ++balance;
long r = 10 * (long)arg;
return (void *)r; // test if you can pass something and get it back
}
void* withdraw(void *arg) {
for (int i = 0; i < 1e7; i++) --balance;
long r = 10 * (long)arg;
return (void *)r;
}
int main() {
pthread_t t1, t2;
// run two threads
pthread_create(&t1, NULL, &deposit, (void*)1);
pthread_create(&t2, NULL, &withdraw, (void*)2);
// run deposit((void *)1)
void *r1;
void *r2;
pthread_join(t1, &r1); // waits until t1 terminates
pthread_join(t2, &r2); // after t1 done, wait for t2
printf("t1 returned %ld\n", (long)r1);
printf("t2 returned %ld\n", (long)r2);
printf("balance = %d\n", balance);
}
Problem: balance++
is not atomic. fetch -> increment -> store
- Typically happens when running on multiple CPUs
taskset 1 ./bank0
only runs on CPU 1taskset 3
runs on 1 and 3.
Mutex lock
MUTual EXclusion lock bank1.c
pthread_mutex_t balance_lock = PTHREAD_MUTEX_INITIALIZER;
void* deposit(void *arg) {
for (int i = 0; i < 1e7; i++) {
pthread_mutex_lock(&balance_lock); //
++balance; // critical session: don't want interrupt
pthread_mutex_unlock(&balance_lock); // lock and unlock same thread!
}
long r = 10 * (long)arg;
return (void *)r;
}
When a second function calls mutex_lock()
, it will not return (block) until the first thread calls mutex_unlock()
Problem: really slow (hyperfine
)
API
int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t *attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
- Return 0 if OK, error number on failure We didn’t use them because we declared
balance_lock
as static. - Typically want
pthread_mutex_t
object to live in a bigger datastruct
malloc()
the whole structure as a whole- Pass the pointer to
mutex_init()
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex); // nonblocking if fail
int pthread_mutex_unlock(pthread_mutex_t *mutex);
#include <time.h>
int pthread_mutex_timedlock(pthread_mutex_t *mutex, struct timespec *tsptr);
- Return 0 if OK, error number on failure
Deadlock
Two threads both can’t proceed
- Thread tries to lock the same mutex twice
- A Thread holds mutex A and tries to lock mutex B, and another thread holds mutex B and tries to lock mutex A
- Fix: strict lock ordering (E. A then B)
Condition variables
Waits for a condition to be true
Example
consider a simple linked list of a message queue:
// simple linked list
struct msg { struct msg *m_next; };
struct msg *workq; // head
void process_msg(void) {
struct msg *mp;
for (;;) {
while (workq == NULL) { } // while list is empty, it spins
mp = workq;
workq = mp->m_next; // pops the first node
}
}
void enqueue_msg(struct msg *mp) {
pthread_mutex_lock(&qlock);
mp->m_next = workq; // prepends a node
workq = mp;
pthread_cond_signal(&qready);
pthread_mutex_unlock(&qlock);
}
If left them run at the same time, very very messed up. Need to protect with mutex. Now with threading
while (workq == NULL) {
pthread_mutex_unlock(&qlock);
sleep(1);
pthread_mutex_lock(&qlock);
// pthread_cond_wait(&qready, &qlock);
}
very clumsy, and performs a lot of unnecessary locking and unlocking
- Need a mechanism to unlock, let
enqueue()
go, and lock it back to process it. - -> condition variable. Allows to unlock, wait for some signal, come out with the mutex locked.
API
#include <pthread.h>
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
struct timespec *tsptr);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); // wake up all threads
Return 0 on OK, error number on failure
#include <pthread.h>
// thread initializer
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
//
void process_msg(void) {
struct msg *mp;
for (;;) {
pthread_mutex_lock(&qlock);
while (workq == NULL) {
pthread_cond_wait(&qready, &qlock);
}
mp = workq;
workq = mp->m_next; // pops the first node
pthread_mutex_unlock(&qlock);
}
}
void enqueue_msg(struct msg *mp) {
pthread_mutex_lock(&qlock);
mp->m_next = workq;
workq = mp;
pthread_cond_signal(&qready);
pthread_mutex_unlock(&qlock);
}
Consumer thread
- Pass mutex that has already locked to
cond_wait()
cond_wait()
syscall will put your thread to sleep, unlocking the mutex.- When
enqueue_msg()
thread done putting something in there, it callspthread_cond_signal(&cond)
. - The kernel takes the thread out. Wakes it up
cond_wait()
grabs the lock and locks the mutex (if another thread enqueues a message and locks, then can’t do lock immediately)cond_wait()
returns, and we finally come out.- Still need to go back and check if queue empty
- In case another consumer thread consumes the message and empties it
Producer thread Signal or unlock first?
- Optimal to unlock first (minimize locked part), but not always safe
Thread safety
Is it safe to call a function to be called by multiple threads at the same time?
Reentrancy implies thread/signal safe (thread and signal do not imply each other)
- To make non-reentrant functions thread safe, use lock (E.
printf()
)- To facilitate, every open file carries a
flockfile()
flockfile()
is recursive (once locked byprintf()
, another locked bygreptile
)
- To facilitate, every open file carries a
Reentrancy
A function is reentrant if it can be safely paused and called again in the middle of a previous execution. Can be reentered:
- From same thread when interrupted by a signal and calling it from the signal handler
- By multiple threads
Worst offender: function that maintains internal static data: strtok()
, mutates the original string. Static variables can’t be reentrant!
- Better version:
strtok_r()
(r
for reentry), pass pointer to the originally static variable.
Signal safety
Signal safety not necessarily implies each other Async-signal-safe if the functions can be called from signal handlers
- To make a function async signal safe, block all signals that may call this function
- May not be thread safe Other case:
printf()
thread safe, but can’t really reenter if called through signal. It will enter via recursive lock, and mess up the internals
- May not be thread safe Other case:
Thread signal handling
Which thread gets the signal?
A process-directed signal may be delivered to any of the threads that doesn’t block the signal.
- Individual threads can block a signal
- But all threads will share the same signal action Usually designate one thread to handle signals, and block this signal in all others
- Use
pthread_sigmask()
instead of [[#Signal mask|sigprocmask()
]]sig1.c
```c int sig = 0; void handler(int signo) { sig = signo; } #define handle_error_en(en, msg)
do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0)
static void *sig_thread(void *arg) { int s; sigset_t *set = arg; // retrieve set
/* Unblock SIGUSR1 and SIGQUIT for this thread */
s = pthread_sigmask(SIG_UNBLOCK, set, NULL);
/* Race condition: you can miss a signal HERE. Alternatively, use
* sigsuspend() to atomically adjust the signal mask and then suspend
* execution. */
for (;;) {
pause(); // slow syscall interrupted by signal
printf("Signal handling thread got signal %d\n", sig);
}
return NULL; }
int main(int argc, char *argv[]) { pthread_t thread; sigset_t set; int s;
/* Block SIGUSR1 and SIGQUIT (Ctrl+\) */
sigemptyset(&set);
sigaddset(&set, SIGQUIT);
sigaddset(&set, SIGUSR1);
s = pthread_sigmask(SIG_BLOCK, &set, NULL);
/* Install handlers for SIGQUIT and SIGUSR1 */
struct sigaction act;
act.sa_handler = handler;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
sigaction(SIGUSR1, &act, NULL);
sigaction(SIGQUIT, &act, NULL);
s = pthread_create(&thread, NULL, &sig_thread, &set);
/* Main thread carries on to create other threads and/or do
other work */
pause(); // pause until any signal arrives } ```
blocking a signal is not ignoring it. The signal is instead run by a handler
- Main thread first blocks all signals of interest and installs handlers for them
- After handler thread created, signal disposition apply to all threads
- Handler thread inherits the block set, but unblocks and handles the signal
- For other signals (
^C
), whichever gets it will kill the process
- For other signals (
Race condition: use sigsuspend()
that atomically combines signal unblock and pause()
handler()
gets called, butpause()
is not interrupted. Nothing happens
sigwait()
int sigwait(sigset_t *set, int *sig);
- Suspends execution of calling thread
- Waits for any signal in the set becomes pending (blocks the signal)
- Accepts signal (consumes it from the pending list)
- Returns the signal number for you to handle Turn signal to condition variable: wait until something comes in, and consumes it
sig2.c
: no handler or handler installation
static void *sig_thread(void *arg) {
sigset_t *set = arg;
int s, sig;
for (;;) {
/* sigwait() consumes a pending signal */
s = sigwait(set, &sig);
printf("Signal handling thread got signal %d\n", sig);
}
return NULL;
}
int main(int argc, char *argv[]) {
pthread_t thread;
sigset_t set;
int s;
/* Block SIGUSR1 and SIGQUIT; other threads created by main()
will inherit a copy of the signal mask. */
sigemptyset(&set);
sigaddset(&set, SIGUSR1);
sigaddset(&set, SIGQUIT);
s = pthread_sigmask(SIG_BLOCK, &set, NULL);
s = pthread_create(&thread, NULL, &sig_thread, &set);
/* Main thread carries on to create other threads and/or do
other work */
pause();
}
Note that the signals remain blocked in both threads
Thread local storage
Thread local global variable: marked with __thread
- Compiler allocates thread local storage, and each thread will get its own copy
- Space allocated for it when thread created
tls.c
: similar to bank example
// Global variable shared by all threads
uint64_t x = 0;
// Thread local global variable: each thread has a copy
__thread uint64_t y = 0;
void *inc_and_print(void *unused) {
for (int i = 0; i < 1e7; i++) {x++; y++;}
printf("x=%lu, y=%lu\n", x, y);
return NULL;
}
int main(int argc, char **argv) {
pthread_t threads[2];
pthread_create(&threads[0], NULL, &inc_and_print, NULL);
pthread_create(&threads[1], NULL, &inc_and_print, NULL);
pthread_join(threads[0], NULL);
pthread_join(threads[1], NULL);
return 0;
}
x
will mess up, but y
won’t
E. errno
, allows multiple threads to use syscalls and errno
without interfering
Thread vs process creation
Thread creation
Sample thread program pthread.c
:
void *noop(void *arg) { return NULL; }
int main(int argc, char **argv) {
pthread_t thread;
pthread_create(&thread, NULL, &noop, NULL);
pthread_join(thread, NULL);
}
$ strace ./pthread
mmap(NULL, 8392704, PROT_NONE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0x7efe0d9ff000
mprotect(0x7efe0da00000, 8388608, PROT_READ|PROT_WRITE) = 0
- Stack is created as a private anonymous mapping.
MAP_STACK
flag specified that mapping is for a stack - Mapping size
8392704
is 8MB + 4kB PROT_NONE
: allocate without read/write protection first- Then call
mprotec()
to change the protection toREAD | WRITE
only for the 8MB chunk- The bottom 4kB region is left along for no permission
- Stack rolls down.
- If stack rolls beyond to read/write there, segfault
rt_sigprocmask(SIG_BLOCK, ~[], [], 8) = 0
clone3({flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, child_tid=0x7efe0e1ff910, parent_tid=0x7efe0e1ff910, exit_signal=0, stack=0x7efe0d9ff000, stack_size=0x7fff00, tls=0x7efe0e1ff640} => {parent_tid=[0]}, 88) = 3141615
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
rt_sigprocmask()
is analogous topthread_procmask()
, underlying syscall forsigprocmask
, blocking signal for the entire process, before creating a thread- This is required by
clone()
. Requires all signals to be blocked ~[]
: not empty set (universal set), blocking everything[]
: what you get back (old signal mask), also empty set (nothing blocked)
- This is required by
clone3()
creates a stack- called by
pthread_create()
. In Linux,fork()
also calls it - Flags:
CLONE_VM
: the new task (aka thread) shares the same address spaceCLONE_FILES
: shares same file descriptor tableCLONE_SIGHAND
: shares signal dispositionsCLONE_THREAD
: the new task is placed in the same thread group; it will have the same pid but get a unique thread ID (tid)CLONE_SETTLS
: sets the new task’s TLS descriptor to thetls
argumentstack
: the address of the new task’s stack. This is the address thatmmap()
returned earlier
- called by
- Second mask to restore the previous signal mask (empty set)
Process creation
int main(int argc, char **argv) {
if ((pid_t pid = fork()) > 0) { waitpid(pid, NULL, 0); }
}
clone(child_stack=NULL, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f2c04fc1a10) = 3157115
wait4(3157115, NULL, 0, NULL) = 3157115
--- SIGCHLD {si_signo=SIGCHLD, si_code=CLD_EXITED, si_pid=3157115, si_uid=1008, si_status=0, si_utime=0, si_stime=0} ---
- Lots of the
clone()
flags are missingSIGCHLD
:SIGCHLD
sent to the parent process when this child process terminates.
child_stack=NULL
: the child process uses the same stack area as the parent, but in its own address space- Note that the resource-sharing flags from
pthread_create()
’s invocation aren’t specified here. --- ---
:strace
shows the parent process receivesSIGCHLD
when child terminates.Takeaway: underlying syscalls for library functions. For Linux,
pthread_create()
andfork()
both callclone()
, andmmap()
is involved
False sharing
E. for malloctupus
, if multiple threads (CPUs) looking at the same piece of memory. Other CPU’s cache will invalidate certain portions of each other’s cache -> multithreading even slower
void inc(uint64_t *val) {
for (int i = 0; i < 1e7; i++)
++*val;
printf("%lu\n", *val);
}
void* inc_thread(void *v) {
inc(v);
return NULL;
}
int main(int argc, char **argv)
{
mm_init();
void *unused = mm_malloc(32);
uint64_t *p = mm_malloc(8);
uint64_t *q = mm_malloc(8);
mm_checkheap(1);
pthread_t t1, t2;
pthread_create(&t1, NULL, &inc_thread, p);
pthread_create(&t2, NULL, &inc_thread, q);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
}
p
and q
are in the same cache block, and invalidated together
- 48B
unused
, combined with the 16B prologue, alignsp
andq
to the same cache block - Solution: allocate
p
andq
to different blocks (E.p = mm_malloc(64)
)
Modern memory allocators are optimized to reduce contention in multi-threaded environments. For example, the GNU C implementation does the following:
- Instead of maintaining one centralized heap structure, the allocator maintains multiple disjoint “arenas” from which to allocate memory from. This reduces lock contention because two threads allocating from different arenas do not compete for the same lock.
- To avoid contending for a lock all together, each thread maintains a per-thread cache, called the tcache, containing a blocks which that can be accessed without locking an arena. This is done with the help of TLS.