]> git.scottworley.com Git - tl-append/commitdiff
Concurrency tests
authorScott Worley <scottworley@scottworley.com>
Wed, 13 Sep 2023 20:53:50 +0000 (13:53 -0700)
committerScott Worley <scottworley@scottworley.com>
Wed, 13 Sep 2023 20:53:50 +0000 (13:53 -0700)
tl-append-test.c

index 08cf8002b24154468e7d06581684c9daa1d97d64..20ae7fb6a7d807563f001e68ab7c56099a45f086 100644 (file)
@@ -1,13 +1,17 @@
 #define _POSIX_C_SOURCE 2
 #define _XOPEN_SOURCE
 #define _POSIX_C_SOURCE 2
 #define _XOPEN_SOURCE
+#define _GNU_SOURCE
 #include <assert.h>
 #include <ctype.h>
 #include <errno.h>
 #include <assert.h>
 #include <ctype.h>
 #include <errno.h>
+#include <fcntl.h>
 #include <limits.h>
 #include <limits.h>
+#include <pthread.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <time.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <time.h>
+#include <unistd.h>
 
 #include "common.h"
 
 
 #include "common.h"
 
@@ -19,9 +23,14 @@ typedef struct expectation {
 } ex_t;
 
 const ex_t END = {((time_t)-1), ((time_t)-1), NULL};
 } ex_t;
 
 const ex_t END = {((time_t)-1), ((time_t)-1), NULL};
+const ex_t CONSUMED = {((time_t)-2), ((time_t)-2), NULL};
 static int is_end(ex_t exp) {
   return exp.a == END.a && exp.b == END.b && exp.message == END.message;
 }
 static int is_end(ex_t exp) {
   return exp.a == END.a && exp.b == END.b && exp.message == END.message;
 }
+static int is_consumed(ex_t exp) {
+  return exp.a == CONSUMED.a && exp.b == CONSUMED.b &&
+         exp.message == CONSUMED.message;
+}
 static ex_t expectation(time_t a, time_t b, const char *message) {
   ex_t exp;
   exp.a = a;
 static ex_t expectation(time_t a, time_t b, const char *message) {
   ex_t exp;
   exp.a = a;
@@ -54,37 +63,46 @@ static ex_t write_to_tl_append(const char *content) {
   return expectation(start, end, content);
 }
 
   return expectation(start, end, content);
 }
 
-static void verify_timestamp(const ex_t *ex, const char *line) {
+static char *timestamp_problem(const ex_t *ex, const char *line) {
   struct tm tm;
 
   /* localtime_r to set tm's timezone */
   time_t now_time = time(NULL);
   if (localtime_r(&now_time, &tm) == NULL)
   struct tm tm;
 
   /* localtime_r to set tm's timezone */
   time_t now_time = time(NULL);
   if (localtime_r(&now_time, &tm) == NULL)
-    die_err("Can't unpack current time?");
+    return "Can't unpack current time?";
 
   const char *strptime_result = strptime(line, "%Y %m %d %H %M %S", &tm);
   if (strptime_result == NULL || strptime_result != &line[TIMESTAMP_LEN])
 
   const char *strptime_result = strptime(line, "%Y %m %d %H %M %S", &tm);
   if (strptime_result == NULL || strptime_result != &line[TIMESTAMP_LEN])
-    die("Wrong contents in log file: Couldn't parse timestamp");
+    return "Wrong contents in log file: Couldn't parse timestamp";
   time_t t = mktime(&tm);
   int t_in_range = ex->a <= t && t <= ex->b;
   if (!t_in_range)
   time_t t = mktime(&tm);
   int t_in_range = ex->a <= t && t <= ex->b;
   if (!t_in_range)
-    die("Wrong contents in log file: Wrong time");
+    return "Wrong contents in log file: Wrong time";
+  return NULL;
 }
 
 }
 
-static void verify_line(const ex_t *ex, const char *line) {
+static char *line_problem(const ex_t *ex, const char *line) {
   size_t line_len = strlen(line);
   if (line_len < TIMESTAMP_LEN + 1)
   size_t line_len = strlen(line);
   if (line_len < TIMESTAMP_LEN + 1)
-    die("Wrong contents in log file: Line too short");
-  verify_timestamp(ex, line);
+    return "Wrong contents in log file: Line too short";
+  char *trouble = timestamp_problem(ex, line);
+  if (trouble)
+    return trouble;
   if (line[TIMESTAMP_LEN] != ' ')
   if (line[TIMESTAMP_LEN] != ' ')
-    die("Wrong contents in log file: Bad format");
+    return "Wrong contents in log file: Bad format";
   if (strncmp(ex->message, &line[TIMESTAMP_LEN + 1], strlen(ex->message) + 1) !=
       0)
   if (strncmp(ex->message, &line[TIMESTAMP_LEN + 1], strlen(ex->message) + 1) !=
       0)
-    die("Wrong contents in log file");
+    return "Wrong contents in log file";
+  return NULL;
 }
 
 }
 
-static void verify_log_contents(ex_t exps[]) {
+static void verify_line(const ex_t *ex, const char *line) {
+  char *trouble = line_problem(ex, line);
+  if (trouble)
+    die(trouble);
+}
 
 
+static void verify_log_contents(const ex_t exps[]) {
   FILE *f = fopen(FILENAME, "r");
   if (f == NULL)
     die_err("Error opening log file");
   FILE *f = fopen(FILENAME, "r");
   if (f == NULL)
     die_err("Error opening log file");
@@ -104,6 +122,40 @@ static void verify_log_contents(ex_t exps[]) {
     die_err("Error closing log file");
 }
 
     die_err("Error closing log file");
 }
 
+static void consume_expectation(ex_t exps[], const char *line) {
+  for (size_t i = 0; !is_end(exps[i]); i++) {
+    if (line_problem(&exps[i], line) == NULL) {
+      exps[i] = CONSUMED;
+      return;
+    }
+  }
+  die("Wrong contents in log file: Line didn't match any expectation");
+}
+
+static void verify_log_contents_unordered(ex_t exps[]) {
+  FILE *f = fopen(FILENAME, "r");
+  if (f == NULL)
+    die_err("Error opening log file");
+  for (;;) {
+    const int MAX_LEN = 9999;
+    char buf[MAX_LEN];
+    if (fgets(buf, MAX_LEN, f) == NULL) {
+      if (feof(f))
+        break;
+      die("Error reading log file");
+    }
+    if (ferror(f))
+      die("Error reading log file");
+    consume_expectation(exps, buf);
+  }
+  if (fclose(f) != 0)
+    die_err("Error closing log file");
+  for (size_t i = 0; !is_end(exps[i]); i++) {
+    if (!is_consumed(exps[i]))
+      die("Wrong contents in log file: Unconsumed expectation");
+  }
+}
+
 static void test_encode_time() {
   /* localtime_r to set tm's timezone */
   time_t now_time = time(NULL);
 static void test_encode_time() {
   /* localtime_r to set tm's timezone */
   time_t now_time = time(NULL);
@@ -143,6 +195,45 @@ static void test_encode_time() {
   assert(encoded[19] == '\0');
 }
 
   assert(encoded[19] == '\0');
 }
 
+static FILE *take_lock() {
+  FILE *f = fopen(FILENAME, "a");
+  if (f == NULL)
+    die_err("Couldn't open file for locking");
+  int fd = fileno(f);
+  if (fd == -1)
+    die_err("Couldn't get file descriptor for locking");
+  if (fcntl(fd, F_SETLK,
+            &(struct flock){.l_type = F_WRLCK,
+                            .l_whence = SEEK_SET,
+                            .l_start = 0,
+                            .l_len = 0}) == -1)
+    die_err("Couldn't take lock");
+  return f;
+}
+
+static void release_lock(FILE *f) {
+  if (fclose(f) != 0)
+    die_err("Error releasing lock");
+}
+
+static void *release_lock_after_delay(void *f) {
+  sleep(2);
+  release_lock((FILE *)f);
+  return NULL;
+}
+
+static void *writer_thread(void *start_signal) {
+  ex_t *ex = (ex_t *)malloc(sizeof(ex_t));
+  if (ex == NULL)
+    die_err("Couldn't allocate memory");
+  char *message;
+  if (asprintf(&message, "Hello from thread %lu\n", pthread_self()) == -1)
+    die("Couldn't prepare message");
+  pthread_rwlock_rdlock((pthread_rwlock_t *)start_signal);
+  *ex = write_to_tl_append(message);
+  return ex;
+}
+
 static void write_and_read_line() {
   remove_logfile();
   ex_t e = write_to_tl_append("foo\n");
 static void write_and_read_line() {
   remove_logfile();
   ex_t e = write_to_tl_append("foo\n");
@@ -156,8 +247,60 @@ static void write_and_read_two_lines() {
   verify_log_contents((ex_t[]){e1, e2, END});
 }
 
   verify_log_contents((ex_t[]){e1, e2, END});
 }
 
+static void write_to_locked_log() {
+  remove_logfile();
+  ex_t e1 = write_to_tl_append("begin\n");
+  FILE *lock = take_lock();
+  pthread_t unlock_thread;
+  int create_ret =
+      pthread_create(&unlock_thread, NULL, &release_lock_after_delay, lock);
+  if (create_ret != 0) {
+    errno = create_ret;
+    die_err("Couldn't start thread");
+  }
+  ex_t e2 = write_to_tl_append("delayed\n");
+  int join_ret = pthread_join(unlock_thread, NULL);
+  if (join_ret != 0) {
+    errno = join_ret;
+    die_err("Couldn't join thread");
+  }
+  verify_log_contents((ex_t[]){e1, e2, END});
+}
+
+static void write_concurrently() {
+  remove_logfile();
+  const int PARALLELISM = 250;
+  pthread_t threads[PARALLELISM];
+  pthread_rwlock_t start_signal;
+  pthread_rwlock_init(&start_signal, NULL);
+  for (int i = 0; i < PARALLELISM; i++) {
+    int create_ret =
+        pthread_create(&threads[i], NULL, &writer_thread, &start_signal);
+    if (create_ret != 0) {
+      errno = create_ret;
+      die_err("Couldn't start thread");
+    }
+  }
+  pthread_rwlock_unlock(&start_signal);
+  ex_t results[PARALLELISM + 1];
+  for (int i = 0; i < PARALLELISM; i++) {
+    ex_t *ex;
+    int join_ret = pthread_join(threads[i], (void **)&ex);
+    if (join_ret != 0) {
+      errno = join_ret;
+      die_err("Couldn't join thread");
+    }
+    results[i] = *ex;
+    free(ex);
+  }
+  results[PARALLELISM] = END;
+  verify_log_contents_unordered(results);
+}
+
 int main() {
   test_encode_time();
   write_and_read_line();
   write_and_read_two_lines();
 int main() {
   test_encode_time();
   write_and_read_line();
   write_and_read_two_lines();
+  write_to_locked_log();
+  write_concurrently();
 }
 }