Make-initrd development discussion
 help / color / mirror / Atom feed
From: Alexey Gladkov <gladkov.alexey@gmail.com>
To: make-initrd@lists.altlinux.org
Subject: [make-initrd] [PATCH 1/3] Reimplement ueventd
Date: Thu,  4 May 2023 15:42:50 +0200
Message-ID: <6b444ed922286eb3df8f5322b1bddf9c55753eb8.1683200226.git.gladkov.alexey@gmail.com> (raw)
In-Reply-To: <cover.1683200226.git.gladkov.alexey@gmail.com>

The new C implementation of ueventd makes the logic more understandable.
It also allowed us to add more optimizations. For now, we are still
using the filesystem as the database because it allows us to accumulate
events before ueventd is started.

Signed-off-by: Alexey Gladkov <gladkov.alexey@gmail.com>
---
 data/bin/uevent-sh-functions      |   3 +
 data/etc/rc.d/init.d/uevent       |   9 +-
 data/sbin/ueventd                 | 180 ----------
 data/sbin/ueventd-queue           |  65 ++++
 datasrc/ueventd/Makefile.mk       |  13 +
 datasrc/ueventd/logging.c         |  63 ++++
 datasrc/ueventd/memory.c          |  32 ++
 datasrc/ueventd/path.c            |  80 +++++
 datasrc/ueventd/process.c         |  17 +
 datasrc/ueventd/queue-processor.c | 116 +++++++
 datasrc/ueventd/ueventd.c         | 551 ++++++++++++++++++++++++++++++
 datasrc/ueventd/ueventd.h         |  76 +++++
 12 files changed, 1024 insertions(+), 181 deletions(-)
 delete mode 100755 data/sbin/ueventd
 create mode 100755 data/sbin/ueventd-queue
 create mode 100644 datasrc/ueventd/Makefile.mk
 create mode 100644 datasrc/ueventd/logging.c
 create mode 100644 datasrc/ueventd/memory.c
 create mode 100644 datasrc/ueventd/path.c
 create mode 100644 datasrc/ueventd/process.c
 create mode 100644 datasrc/ueventd/queue-processor.c
 create mode 100644 datasrc/ueventd/ueventd.c
 create mode 100644 datasrc/ueventd/ueventd.h

diff --git a/data/bin/uevent-sh-functions b/data/bin/uevent-sh-functions
index ff86d645..7f3947ae 100644
--- a/data/bin/uevent-sh-functions
+++ b/data/bin/uevent-sh-functions
@@ -33,6 +33,9 @@ release_event() {
 
 done_event() {
 	rm -f -- "$1"
+
+	local message_time=1 queue="${QUEUE:--}" session="${SESSION:-0}"
+	message "$queue: session=$session: finish event: $1" ||:
 }
 
 fi
diff --git a/data/etc/rc.d/init.d/uevent b/data/etc/rc.d/init.d/uevent
index 8997ff8f..a8e6a8e6 100755
--- a/data/etc/rc.d/init.d/uevent
+++ b/data/etc/rc.d/init.d/uevent
@@ -20,7 +20,14 @@ PIDFILE=/var/run/$NAME.pid
 ARGS="--lockfile $LOCKFILE --pidfile $PIDFILE --displayname $NAME --name $NAME"
 
 start() {
-	start_daemon --background $ARGS -- "$NAME"
+	export filterdir ueventdir uevent_confdb
+
+	mkdir -p -- \
+		"$filterdir" \
+		"$ueventdir" \
+		"$uevent_confdb/queue/pause"
+
+	start_daemon --background $ARGS -- "$NAME" /sbin/ueventd-queue
 	RETVAL=$?
 	return $RETVAL
 }
diff --git a/data/sbin/ueventd b/data/sbin/ueventd
deleted file mode 100755
index d891016b..00000000
--- a/data/sbin/ueventd
+++ /dev/null
@@ -1,180 +0,0 @@
-#!/bin/bash -eu
-
-. shell-error
-. shell-signal
-
-. /.initrd/initenv
-. uevent-sh-functions
-
-message_time=1
-
-pidfile="/var/run/$PROG.pid"
-logfile="/var/log/$PROG.log"
-inotifyd='/sbin/inotifyd'
-
-[ "${RDLOG-}" != 'console' ] ||
-	logfile=/dev/console
-
-UEVENT_MODE="${UEVENT_MODE:-server}"
-
-if [ "$UEVENT_MODE" = 'server' ]; then
-	exec >"$logfile" 2>&1
-
-	message "starting server ..."
-
-	[ -d "$filterdir" ] ||
-		fatal "$filterdir: bad directory"
-
-	echo "$$" > "$pidfile"
-
-	exit_handler()
-	{
-		local d rc="$?"
-		trap - EXIT
-		for d in "$filterdir"/*; do
-			[ ! -d "$d" ] || [ ! -f "$d.pid" ] ||
-				rm -f -- "$d.pid"
-		done
-		[ ! -f "$pidfile" ] ||
-			rm -f -- "$pidfile"
-		exit $rc
-	}
-	set_cleanup_handler exit_handler
-
-	mkdir -p -- "$uevent_confdb/queue/pause"
-
-	export UEVENT_MODE='queue-processor'
-
-	for d in "$filterdir"/*; do
-		[ ! -d "$d" ] || "$0" "n" "${d%/*}" "${d##*/}"
-	done
-
-	"$inotifyd" "$0" "$pidfile:x" "$filterdir:nd" &
-	wait
-
-	exit 0
-fi
-
-evtype="$1"
-name="${3-}"
-event="$2${name:+/$name}"
-
-if [ "$UEVENT_MODE" = 'queue-processor' ]; then
-	case "$evtype" in
-		n)
-			[ -d "$event" ] && [ -n "${name##.*}" ] ||
-				exit 0
-
-			export UEVENT_MODE='queue-handler'
-			export QUEUE="$name"
-			export INDIR="$event"
-
-			message "starting sub-process for '$QUEUE' queue ..."
-
-			mkdir -p -- "$ueventdir/$QUEUE"
-
-			:> "$event.startup"
-			:> "$event.pid"
-			:> "$event.timer"
-
-			"$inotifyd" "$0" "$pidfile:x" "$event.pid:x" "$event.startup:0" "$event.timer:0" "$event:ny" &
-
-			while [ -e "$event.startup" ]; do
-				:<"$event.startup"
-				sleep 0.1
-			done
-
-			echo "$!" >"$event.pid"
-			;;
-		d)
-			[ ! -d "$event" ] || [ ! -f "$event.pid" ] ||
-				rm -f -- "$event.pid"
-			;;
-		x)
-			kill "$PPID"
-			;;
-	esac
-	exit 0
-fi
-
-[ "$UEVENT_MODE" = 'queue-handler' ] ||
-	fatal "unexpected mode: $UEVENT_MODE"
-
-if [ "$2" = "$INDIR.startup" ]; then
-	[ "$evtype" = '0' ] ||
-		exit 0
-	rm -f "$INDIR.startup"
-fi
-
-if [ "$evtype" = 'x' ]; then
-	kill "$PPID"
-	exit 0
-fi
-
-if [ -e "$uevent_confdb/queue/pause/.all" ] || [ -e "$uevent_confdb/queue/pause/$QUEUE" ]; then
-	message "$QUEUE: queue paused"
-	exit 0
-fi
-
-udevadm settle --timeout=3 ||:
-
-glob()
-{
-	[ -e "$1" ]
-}
-
-queuedir="$ueventdir/$QUEUE"
-
-mv -f -- "$INDIR"/* "$queuedir" 2>/dev/null ||:
-glob "$queuedir"/* || exit 0
-
-[ "$evtype" != '0' ] || [ "$2" = "$INDIR.startup" ] ||
-	message "$QUEUE: retrying with failed events ..."
-
-for ev in "$queuedir"/*; do
-	message "$QUEUE: event $ev"
-done
-
-get_name()
-{
-	[ ! -d "$fn" ] && [ -x "$fn" ] || return 1
-	name="${fn##*/}"
-	name="${name#[0-9][0-9][0-9]-}"
-}
-
-run_scripts()
-{
-	local exe
-
-	for exe in "/lib/uevent/each/$1"/*; do
-		[ -x "$exe" ] || continue
-		"$exe" "$2" ||:
-	done
-}
-
-for fn in "$handlerdir/$QUEUE"/*; do
-	get_name || continue
-
-	run_scripts pre "$QUEUE"
-
-	message "$QUEUE: running queue-specific handler: $fn"
-	"$fn" "$queuedir" || message "$QUEUE: event handler failed: $fn"
-
-	run_scripts post "$QUEUE"
-done
-
-for fn in "$handlerdir"/*; do
-	get_name && glob "$queuedir/$name".* || continue
-
-	run_scripts pre "$QUEUE"
-
-	message "$QUEUE: running handler: $fn"
-	"$fn" || message "$QUEUE: event handler failed: $fn"
-
-	run_scripts post "$QUEUE"
-done
-
-if glob "$queuedir"/*; then
-	sleep ${RDUEVENT_TIMEOUT:-1}
-	:<"$filterdir/$QUEUE.timer"
-fi
diff --git a/data/sbin/ueventd-queue b/data/sbin/ueventd-queue
new file mode 100755
index 00000000..c106dc60
--- /dev/null
+++ b/data/sbin/ueventd-queue
@@ -0,0 +1,65 @@
+#!/bin/bash -eu
+
+. /.initrd/initenv
+
+[ "${RDLOG-}" != 'console' ] &&
+	logfile=/var/log/ueventd.log ||
+	logfile=/dev/console
+
+exec >"$logfile" 2>&1
+
+. shell-error
+. uevent-sh-functions
+
+message_time=1
+
+export queuedir="$1"
+export QUEUE="${queuedir##*/}"
+export SESSION="${SESSION:-0}"
+
+glob()
+{
+	[ -e "$1" ]
+}
+
+get_name()
+{
+	[ ! -d "$fn" ] && [ -x "$fn" ] || return 1
+	name="${fn##*/}"
+	name="${name#[0-9][0-9][0-9]-}"
+}
+
+run_scripts()
+{
+	local exe
+
+	for exe in "/lib/uevent/each/$1"/*; do
+		[ ! -x "$exe" ] || "$exe" "$2" ||:
+	done
+}
+
+for ev in "$queuedir"/*; do
+	message "$QUEUE: session=$SESSION: event $ev"
+done
+
+for fn in "$handlerdir/$QUEUE"/*; do
+	get_name || continue
+
+	run_scripts pre "$QUEUE"
+
+	message "$QUEUE: session=$SESSION: running queue-specific handler: $fn"
+	"$fn" "$queuedir" || message "$QUEUE: session=$SESSION: event handler failed: $fn"
+
+	run_scripts post "$QUEUE"
+done
+
+for fn in "$handlerdir"/*; do
+	get_name && glob "$queuedir/$name".* || continue
+
+	run_scripts pre "$QUEUE"
+
+	message "$QUEUE: session=$SESSION: running handler: $fn"
+	"$fn" || message "$QUEUE: session=$SESSION: event handler failed: $fn"
+
+	run_scripts post "$QUEUE"
+done
diff --git a/datasrc/ueventd/Makefile.mk b/datasrc/ueventd/Makefile.mk
new file mode 100644
index 00000000..887cc8d1
--- /dev/null
+++ b/datasrc/ueventd/Makefile.mk
@@ -0,0 +1,13 @@
+ueventd_DEST = $(dest_data_sbindir)/ueventd
+ueventd_SRCS = \
+	datasrc/ueventd/logging.c \
+	datasrc/ueventd/memory.c \
+	datasrc/ueventd/path.c \
+	datasrc/ueventd/process.c \
+	datasrc/ueventd/queue-processor.c \
+	datasrc/ueventd/ueventd.c \
+	datasrc/ueventd/ueventd.h
+
+ueventd_CFLAGS = -D_GNU_SOURCE
+
+PROGS += ueventd
diff --git a/datasrc/ueventd/logging.c b/datasrc/ueventd/logging.c
new file mode 100644
index 00000000..671f6814
--- /dev/null
+++ b/datasrc/ueventd/logging.c
@@ -0,0 +1,63 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+
+#include <stdio.h>
+#include <stdarg.h>
+#include <strings.h>
+#include <errno.h>
+#include <time.h>
+
+#include "ueventd.h"
+
+#define default_logfile "/var/log/ueventd.log"
+
+int log_priority = LOG_INFO;
+
+int logging_level(const char *name)
+{
+	if (!strcasecmp(name, "debug"))   return LOG_DEBUG;
+	if (!strcasecmp(name, "info"))    return LOG_INFO;
+	if (!strcasecmp(name, "warning")) return LOG_WARNING;
+	if (!strcasecmp(name, "error"))   return LOG_ERR;
+	return log_priority;
+}
+
+void logging_init(int loglevel)
+{
+	if (!getenv("UEVENTD_USE_STDERR")) {
+		char *rdlog = getenv("RDLOG");
+		const char *logfile = default_logfile;
+
+		if (rdlog && !strcasecmp(rdlog, "console"))
+			logfile = "/dev/console";
+
+		FILE *cons = fopen(logfile, "w+");
+		if (!cons)
+			fatal("open(%s): %m", logfile);
+
+		fclose(stderr);
+		stderr = cons;
+	}
+
+	log_priority = loglevel;
+}
+
+void logging_close(void)
+{
+}
+
+void message(int priority, const char *fmt, ...)
+{
+	va_list ap;
+	va_start(ap, fmt);
+	if (priority <= log_priority) {
+		time_t ts = time(NULL);
+		struct tm *t = localtime(&ts);
+		fprintf(stderr, "[%04d-%02d-%02d %02d:%02d:%02d] %s: ",
+		        t->tm_year + 1900, t->tm_mon + 1, t->tm_mday,
+		        t->tm_hour, t->tm_min, t->tm_sec,
+		        program_invocation_short_name);
+		vfprintf(stderr, fmt, ap);
+		fprintf(stderr, "\n");
+	}
+	va_end(ap);
+}
diff --git a/datasrc/ueventd/memory.c b/datasrc/ueventd/memory.c
new file mode 100644
index 00000000..cb911d58
--- /dev/null
+++ b/datasrc/ueventd/memory.c
@@ -0,0 +1,32 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+
+#include <string.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <errno.h>
+#include <error.h>
+#include <limits.h>
+
+#include "ueventd.h"
+
+void *xcalloc(size_t nmemb, size_t size)
+{
+	void *r = calloc(nmemb, size);
+	if (!r)
+		fatal("calloc: allocating %lu*%lu bytes: %m",
+		      (unsigned long) nmemb, (unsigned long) size);
+	return r;
+}
+
+char *xasprintf(char **ptr, const char *fmt, ...)
+{
+	va_list arg;
+
+	va_start(arg, fmt);
+	if (vasprintf(ptr, fmt, arg) < 0)
+		fatal("vasprintf: %m");
+	va_end(arg);
+
+	return *ptr;
+}
diff --git a/datasrc/ueventd/path.c b/datasrc/ueventd/path.c
new file mode 100644
index 00000000..7880ad5c
--- /dev/null
+++ b/datasrc/ueventd/path.c
@@ -0,0 +1,80 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+
+#include <stdlib.h>
+#include <dirent.h>
+#include <fcntl.h>
+#include <errno.h>
+
+#include "ueventd.h"
+
+int is_dot_dir(struct dirent *ent)
+{
+	return (ent->d_type == DT_DIR &&
+	        (ent->d_name[0] == '.' && (ent->d_name[1] == '\0' ||
+	                                   (ent->d_name[1] == '.' && ent->d_name[2] == '\0') )));
+}
+
+DIR *xopendir(const char *path)
+{
+	DIR *dir = opendir(path);
+	if (!dir)
+		fatal("opendir: %s: %m", path);
+	return dir;
+}
+
+
+struct dirent *xreaddir(DIR *d, const char *path)
+{
+	struct dirent *ent;
+
+	errno = 0;
+	ent = readdir(d);
+	if (!ent) {
+		if (!errno)
+			return NULL;
+		fatal("readdir: %s: %m", path);
+	}
+	return ent;
+}
+
+int empty_directory(const char *path)
+{
+	struct dirent *ent;
+	int is_empty = 1;
+	DIR *d = xopendir(path);
+
+	while ((ent = xreaddir(d, path)) != NULL) {
+		if (ent->d_name[0] != '.') {
+			is_empty = 0;
+			break;
+		}
+	}
+	closedir(d);
+
+	return is_empty;
+}
+
+ssize_t read_retry(int fd, void *buf, size_t count)
+{
+	return TEMP_FAILURE_RETRY(read(fd, buf, count));
+}
+
+ssize_t write_retry(int fd, const void *buf, size_t count)
+{
+	return TEMP_FAILURE_RETRY(write(fd, buf, count));
+}
+
+ssize_t write_loop(int fd, const char *buffer, size_t count)
+{
+	ssize_t offset = 0;
+
+	while (count > 0) {
+		ssize_t block = write_retry(fd, &buffer[offset], count);
+
+		if (block <= 0)
+			return offset ? : block;
+		offset += block;
+		count -= (size_t) block;
+	}
+	return offset;
+}
diff --git a/datasrc/ueventd/process.c b/datasrc/ueventd/process.c
new file mode 100644
index 00000000..9ff4ec4f
--- /dev/null
+++ b/datasrc/ueventd/process.c
@@ -0,0 +1,17 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+/*
+ * Copyright (C) 2022  Arseny Maslennikov <arseny@altlinux.org>
+ * All rights reserved.
+ */
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <unistd.h>
+#include <errno.h>
+
+#include "ueventd.h"
+
+pid_t waitpid_retry(pid_t pid, int *wstatus, int options)
+{
+	return (pid_t) TEMP_FAILURE_RETRY(waitpid(pid, wstatus, options));
+}
diff --git a/datasrc/ueventd/queue-processor.c b/datasrc/ueventd/queue-processor.c
new file mode 100644
index 00000000..ab5e03e4
--- /dev/null
+++ b/datasrc/ueventd/queue-processor.c
@@ -0,0 +1,116 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <signal.h>
+#include <dirent.h>
+#include <fcntl.h>
+#include <errno.h>
+
+#include "ueventd.h"
+
+static void event_handler(struct watch *queue, char *path) __attribute__((nonnull(1, 2)));
+static void move_files(char *srcdir, char *dstdir)         __attribute__((nonnull(1, 2)));
+static void signal_unhandled_events(struct watch *queue)   __attribute__((nonnull(1)));
+
+void event_handler(struct watch *queue, char *path)
+{
+	char *argv[] = { handler_file, path, NULL };
+	pid_t pid = vfork();
+
+	if (pid < 0) {
+		err("fork: %m");
+
+	} else if (!pid) {
+		execve(argv[0], argv, environ);
+		fatal("exec: %s: %m", argv[0]);
+	} else {
+		int status = 0;
+
+		if (waitpid_retry(pid, &status, 0) != pid || !WIFEXITED(status))
+			info("%s: session=%lu: %s failed",
+			     queue->q_name, session, handler_file);
+		else
+			info("%s: session=%lu: %s finished with return code %d",
+			     queue->q_name, session, handler_file, WEXITSTATUS(status));
+	}
+}
+
+void move_files(char *srcdir, char *dstdir)
+{
+	struct dirent *ent;
+	int srcfd, dstfd;
+
+	if ((srcfd = open(srcdir, O_RDONLY | O_CLOEXEC)) < 0)
+		fatal("open: %s: %m", srcdir);
+
+	errno = 0;
+	if ((dstfd = open(dstdir, O_RDONLY | O_CLOEXEC)) < 0) {
+		if (errno == ENOENT) {
+			if (mkdir(dstdir, 0755) < 0)
+				fatal("mkdir: %s: %m", dstdir);
+			dstfd = open(dstdir, O_RDONLY | O_CLOEXEC);
+		}
+		if (dstfd < 0)
+			fatal("open: %s: %m", dstdir);
+	}
+
+	DIR *d = fdopendir(srcfd);
+	if (!d)
+		fatal("fdopendir: %m");
+
+	while ((ent = xreaddir(d, srcdir)) != NULL) {
+		if (ent->d_name[0] != '.' && ent->d_type == DT_REG &&
+		    renameat(srcfd, ent->d_name, dstfd, ent->d_name) < 0)
+			fatal("rename `%s/%s' -> `%s/%s': %m", srcdir, ent->d_name, dstdir, ent->d_name);
+	}
+
+	closedir(d);
+	close(dstfd);
+}
+
+void signal_unhandled_events(struct watch *queue)
+{
+	ssize_t len;
+
+	len = write_loop(queue->q_parentfd,
+	                 (char *) &queue->q_watchfd, sizeof(queue->q_watchfd));
+	if (len < 0)
+		err("write(pipe): %m");
+
+	info("%s: session=%lu: retry with the events remaining in the queue", queue->q_name, session);
+}
+
+void process_events(struct watch *queue)
+{
+	info("%s: session=%lu: processing events", queue->q_name, session);
+
+	char *numenv = NULL;
+
+	xasprintf(&numenv, "SESSION=%lu", session);
+	putenv(numenv);
+
+	char *srcdir, *dstdir;
+
+	xasprintf(&srcdir, "%s/%s", filter_dir, queue->q_name);
+	xasprintf(&dstdir, "%s/%s", uevent_dir, queue->q_name);
+
+	move_files(srcdir, dstdir);
+
+	if (!empty_directory(dstdir)) {
+		event_handler(queue, dstdir);
+
+		if (!empty_directory(dstdir))
+			signal_unhandled_events(queue);
+	}
+
+	free(srcdir);
+	free(dstdir);
+
+	exit(0);
+}
diff --git a/datasrc/ueventd/ueventd.c b/datasrc/ueventd/ueventd.c
new file mode 100644
index 00000000..f5923367
--- /dev/null
+++ b/datasrc/ueventd/ueventd.c
@@ -0,0 +1,551 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+
+#include <sys/types.h>
+#include <sys/epoll.h>
+#include <sys/inotify.h>
+#include <sys/signalfd.h>
+#include <sys/prctl.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <dirent.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <getopt.h>
+#include <errno.h>
+
+#include "ueventd.h"
+
+char *uevent_confdb;
+char *filter_dir;
+char *uevent_dir;
+char *handler_file;
+uint64_t session = 0;
+
+static struct watch *watch_list = NULL;
+static int pause_all = 0;
+
+enum {
+	FD_INOTIFY,
+	FD_SIGNAL,
+	FD_PIPE,
+	FD_MAX,
+};
+
+typedef int (*fdhandler_t)(int);
+
+struct fd_handler {
+	int fd;
+	fdhandler_t fd_handler;
+};
+
+struct fd_handler fd_list[FD_MAX];
+
+enum {
+	PIPE_READ,
+	PIPE_WRITE,
+	PIPE_MAX,
+};
+
+int pipefd[PIPE_MAX];
+
+#define EV_PAUSE_MASK (IN_CREATE | IN_DELETE | IN_ONLYDIR)
+#define EV_ROOT_MASK  (IN_CREATE | IN_ONLYDIR)
+#define EV_QUEUE_MASK (IN_CLOSE_WRITE | IN_MOVED_TO | IN_DELETE_SELF)
+#define EV_MAX        (FD_MAX * 32)
+
+static int add_queue_dir(int inotifyfd, const char *path, uint32_t mask) __attribute__((nonnull(2)));
+static void watch_pauses(int inotifyfd);
+static void watch_queues(int inotifyfd);
+static int watch_path(int inotifyfd, const char *dir, const char *name, uint32_t mask, int flags) __attribute__((nonnull(2, 3)));
+static int unwatch_path(int inotifyfd, ino_t ino);
+static void apply_pause(void);
+static int process_signal_events(int signfd);
+static int process_inotify_events(int inotifyfd);
+static int process_pipefd_events(int readfd);
+static void setup_signal_fd(struct fd_handler *el)          __attribute__((nonnull(1)));
+static void setup_inotify_fd(struct fd_handler *el)         __attribute__((nonnull(1)));
+static void setup_pipe_fd(struct fd_handler *el)            __attribute__((nonnull(1)));
+static int setup_epoll_fd(struct fd_handler list[FD_MAX]);
+static pid_t spawn_worker(int epollfd, struct watch *queue) __attribute__((nonnull(2)));
+static inline char *get_param_dir(const char *name)         __attribute__((nonnull(1)));
+
+int add_queue_dir(int inotifyfd, const char *path, uint32_t mask)
+{
+	int ret;
+
+	errno = 0;
+	ret = inotify_add_watch(inotifyfd, path, mask | IN_MASK_CREATE);
+	if (ret < 0) {
+		if (errno == EEXIST) {
+			return -128;
+		}
+		err("inotify failed to watch: %s: %m", path);
+	}
+	return ret;
+}
+
+int watch_path(int inotifyfd, const char *dir, const char *name, uint32_t mask, int flags)
+{
+	struct stat st;
+	struct watch *new = NULL;
+	char *path = NULL;
+
+	xasprintf(&path, "%s/%s", dir, name);
+
+	int wfd = add_queue_dir(inotifyfd, path, mask);
+	if (wfd < 0)
+		return (wfd == -128 ? 0 : wfd);
+
+	if (stat(path, &st) < 0) {
+		err("stat: %s: %m", path);
+		goto fail;
+	}
+
+	new = xcalloc(1, sizeof(*new) + sizeof(new->q_name[0]) * (strlen(name) + 1));
+
+	strcpy(new->q_name, name);
+
+	new->q_watchfd = wfd;
+	new->q_ino     = st.st_ino;
+	new->q_flags   = flags;
+	new->next      = watch_list;
+
+	if (flags & F_QUEUE_DIR) {
+		if (!empty_directory(path))
+			new->q_flags |= F_DIRTY;
+		new->q_parentfd = pipefd[PIPE_WRITE];
+	}
+
+	if (pause_all)
+		new->q_flags |= F_PAUSED;
+
+	watch_list = new;
+
+	info("watch path: %s", path);
+	free(path);
+	return 0;
+fail:
+	inotify_rm_watch(inotifyfd, wfd);
+	free(new);
+	free(path);
+	return -1;
+}
+
+int unwatch_path(int inotifyfd, ino_t ino)
+{
+	struct watch *prev, *elem;
+
+	prev = NULL;
+	elem = watch_list;
+
+	while (elem) {
+		if (elem->q_ino == ino) {
+			if (prev)
+				prev->next = elem->next;
+			if (elem == watch_list)
+				watch_list = elem->next;
+			inotify_rm_watch(inotifyfd, elem->q_watchfd);
+			free(elem);
+			break;
+		}
+
+		prev = elem;
+		elem = elem->next;
+	}
+
+	return 0;
+}
+
+void watch_pauses(int inotifyfd)
+{
+	char *path = NULL;
+
+	xasprintf(&path, "%s/queue/pause", uevent_confdb);
+
+	if (watch_path(inotifyfd, path, ".", EV_PAUSE_MASK, F_PAUSE_DIR) < 0)
+		exit(EXIT_FAILURE);
+
+	free(path);
+}
+
+void apply_pause(void)
+{
+	struct watch *p;
+	DIR *dir;
+	char *path = NULL;
+
+	xasprintf(&path, "%s/queue/pause", uevent_confdb);
+
+	dir = xopendir(path);
+	pause_all = 0;
+
+	for (p = watch_list; p; p = p->next) {
+		if (p->q_flags & F_PAUSED)
+			p->q_flags &= ~F_PAUSED;
+	}
+
+	struct dirent *ent;
+
+	while ((ent = xreaddir(dir, path)) != NULL) {
+		if (ent->d_type != DT_DIR || is_dot_dir(ent))
+			continue;
+
+		if (!strcmp(ent->d_name, ".all"))
+			pause_all = 1;
+
+		for (p = watch_list; p; p = p->next) {
+			if ((p->q_flags & F_QUEUE_DIR) &&
+			    (pause_all || !strcmp(ent->d_name, p->q_name)))
+				p->q_flags |= F_PAUSED;
+		}
+
+		if (pause_all)
+			break;
+	}
+	closedir(dir);
+	free(path);
+}
+
+void watch_queues(int inotifyfd)
+{
+	DIR *dir = opendir(filter_dir);
+	if (!dir)
+		fatal("opendir: %s: %m", filter_dir);
+
+	struct dirent *ent;
+
+	while ((ent = xreaddir(dir, filter_dir)) != NULL) {
+		if (ent->d_type != DT_DIR || is_dot_dir(ent))
+			continue;
+
+		if (watch_path(inotifyfd, filter_dir, ent->d_name, EV_QUEUE_MASK, F_QUEUE_DIR) < 0)
+			fatal("unable to start watching: %s/%s", filter_dir, ent->d_name);
+	}
+	closedir(dir);
+}
+
+int process_signal_events(int signfd)
+{
+	struct watch *p;
+	struct signalfd_siginfo fdsi = { 0 };
+	ssize_t size;
+
+	size = read_retry(signfd, &fdsi, sizeof(fdsi));
+	if (size != sizeof(fdsi)) {
+		err("unable to read signal info");
+		return 0;
+	}
+
+	if (fdsi.ssi_signo == SIGCHLD) {
+		while (1) {
+			pid_t pid = waitpid_retry(-1, NULL, WNOHANG);
+
+			if (pid <= 0) {
+				if (pid < 0 && errno != ECHILD) {
+					err("waitpid: %m");
+					return -1;
+				}
+				break;
+			}
+
+			for (p = watch_list; p; p = p->next) {
+				if (p->q_pid == pid)
+					p->q_pid = 0;
+			}
+		}
+		return 0;
+	}
+
+	info("got signal %d, exit", fdsi.ssi_signo);
+	return -1;
+}
+
+void setup_signal_fd(struct fd_handler *el)
+{
+	sigset_t mask;
+
+	sigemptyset(&mask);
+
+	sigaddset(&mask, SIGHUP);
+	sigaddset(&mask, SIGINT);
+	sigaddset(&mask, SIGQUIT);
+	sigaddset(&mask, SIGTERM);
+	sigaddset(&mask, SIGCHLD);
+
+	if ((el->fd = signalfd(-1, &mask, SFD_NONBLOCK | SFD_CLOEXEC)) < 0)
+		fatal("signalfd: %m");
+
+	if (sigprocmask(SIG_BLOCK, &mask, NULL) < 0)
+		fatal("sigprocmask: %m");
+
+	el->fd_handler = process_signal_events;
+}
+
+int process_inotify_events(int inotifyfd)
+{
+	char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
+
+	while (1) {
+		ssize_t len = read_retry(inotifyfd, buf, sizeof(buf));
+
+		if (len < 0) {
+			if (errno == EAGAIN)
+				break;
+			fatal("read: %m");
+		} else if (!len) {
+			break;
+		}
+
+		struct inotify_event *event;
+		char *ptr;
+
+		for (ptr = buf; ptr < buf + len; ptr += sizeof(*event) + event->len) {
+			struct watch *p;
+
+			event = (struct inotify_event *) ptr;
+
+			for (p = watch_list; p && (p->q_watchfd != event->wd); p = p->next);
+			if (!p)
+				continue;
+
+			if (event->mask & IN_DELETE_SELF) {
+				info("unwatch path: %s", p->q_name);
+				unwatch_path(inotifyfd, p->q_ino);
+				continue;
+			}
+
+			if (p->q_flags & F_ROOT_DIR) {
+				if (event->mask & IN_CREATE)
+					watch_queues(inotifyfd);
+				continue;
+			}
+
+			if (p->q_flags & F_PAUSE_DIR) {
+				if (event->mask & IN_CREATE)
+					info("%s: queue paused", event->name);
+				else if (event->mask & IN_DELETE)
+					info("%s: queue unpaused", event->name);
+				apply_pause();
+				continue;
+			}
+
+			if (!(p->q_flags & F_DIRTY))
+				p->q_flags |= F_DIRTY;
+		}
+	}
+	return 0;
+}
+
+void setup_inotify_fd(struct fd_handler *el)
+{
+	if ((el->fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC)) < 0)
+		fatal("inotify_init1: %m");
+
+	el->fd_handler = process_inotify_events;
+}
+
+int process_pipefd_events(int readfd)
+{
+	while (1) {
+		struct watch *p;
+		ssize_t len;
+		int value = 0;
+
+		errno = 0;
+		len = read_retry(readfd, &value, sizeof(value));
+
+		if (len < 0) {
+			if (errno == EAGAIN)
+				break;
+			fatal("read(pipe): %m");
+		} else if (!len) {
+			break;
+		}
+
+		for (p = watch_list; p; p = p->next) {
+			if (!(p->q_flags & F_QUEUE_DIR) || p->q_watchfd != value)
+				continue;
+			if (!(p->q_flags & F_DIRTY))
+				p->q_flags |= F_DIRTY;
+			break;
+		}
+	}
+
+	return 0;
+}
+
+void setup_pipe_fd(struct fd_handler *el)
+{
+	if (pipe2(pipefd, O_NONBLOCK | O_CLOEXEC) < 0)
+		fatal("pipe2: %m");
+
+	el->fd = pipefd[PIPE_READ];
+	el->fd_handler = process_pipefd_events;
+}
+
+int setup_epoll_fd(struct fd_handler list[FD_MAX])
+{
+	int epollfd;
+	struct epoll_event ev = { .events = EPOLLIN };
+
+	if ((epollfd = epoll_create1(EPOLL_CLOEXEC)) < 0)
+		fatal("epoll_create1: %m");
+
+	for (int i = 0; i < FD_MAX; i++) {
+		ev.data.fd = list[i].fd;
+
+		if (epoll_ctl(epollfd, EPOLL_CTL_ADD, list[i].fd, &ev) < 0)
+			fatal("epoll_ctl: %m");
+	}
+
+	return epollfd;
+}
+
+pid_t spawn_worker(int epollfd, struct watch *queue)
+{
+	pid_t pid;
+
+	session++;
+
+	pid = fork();
+	if (pid < 0) {
+		err("fork: %m");
+		return 0;
+	} else if (pid > 0) {
+		return pid;
+	}
+
+	if (prctl(PR_SET_PDEATHSIG, SIGKILL) < 0)
+		fatal("prctl(PR_SET_PDEATHSIG): %m");
+
+	for (int i = 0; i < FD_MAX; i++) {
+		if (fd_list[i].fd >= 0)
+			close(fd_list[i].fd);
+	}
+	close(epollfd);
+
+	sigset_t mask;
+	sigemptyset(&mask);
+
+	if (sigprocmask(SIG_SETMASK, &mask, NULL) < 0)
+		fatal("sigprocmask: %m");
+
+	process_events(queue);
+
+	return 0;
+}
+
+char *get_param_dir(const char *name)
+{
+	char *value = getenv(name);
+	if (!value)
+		fatal("please set `%s' env variable", name);
+	return value;
+}
+
+int main(int argc, char **argv)
+{
+	struct watch *e, *n;
+	int i, epollfd;
+	int loglevel = LOG_INFO;
+
+	while ((i = getopt(argc, argv, "hl:")) != -1) {
+		switch (i) {
+			case 'h':
+				fprintf(stderr, "Usage: %s [-l loglevel] handler-prog\n",
+				        program_invocation_short_name);
+				exit(0);
+				break;
+			case 'l':
+				loglevel = logging_level(optarg);
+				break;
+			default:
+				exit(1);
+				break;
+		}
+	}
+
+	if (optind == argc)
+		fatal("specify handler program");
+
+	logging_init(loglevel);
+
+	info("starting server ...");
+
+	filter_dir = get_param_dir("filterdir");
+	uevent_dir = get_param_dir("ueventdir");
+	uevent_confdb = get_param_dir("uevent_confdb");
+	handler_file = argv[optind++];
+
+	setup_inotify_fd(&fd_list[FD_INOTIFY]);
+	setup_signal_fd(&fd_list[FD_SIGNAL]);
+	setup_pipe_fd(&fd_list[FD_PIPE]);
+
+	epollfd = setup_epoll_fd(fd_list);
+
+	watch_pauses(fd_list[FD_INOTIFY].fd);
+
+	if (watch_path(fd_list[FD_INOTIFY].fd, filter_dir, ".", EV_ROOT_MASK, F_ROOT_DIR) < 0)
+		exit(EXIT_FAILURE);
+
+	watch_queues(fd_list[FD_INOTIFY].fd);
+	apply_pause();
+
+	while (1) {
+		struct epoll_event ev[EV_MAX];
+		int fdcount;
+
+		errno = 0;
+		fdcount = epoll_wait(epollfd, ev, EV_MAX, 250);
+
+		if (fdcount < 0) {
+			if (errno == EINTR)
+				continue;
+			fatal("epoll_wait: %m");
+		}
+
+		for (i = 0; i < fdcount; i++) {
+			if (!(ev[i].events & EPOLLIN))
+				continue;
+			for (int k = 0; k < FD_MAX; k++) {
+				if (ev[i].data.fd != fd_list[k].fd)
+					continue;
+				if (fd_list[k].fd_handler(fd_list[k].fd) != 0)
+					goto done;
+			}
+		}
+
+		for (e = watch_list; e; e = e->next) {
+			if (!(e->q_flags & F_QUEUE_DIR) || !(e->q_flags & F_DIRTY) || (e->q_pid != 0) || (e->q_flags & F_PAUSED))
+				continue;
+			e->q_flags &= ~F_DIRTY;
+			e->q_pid = spawn_worker(epollfd, e);
+		}
+	}
+done:
+	for (e = watch_list, n = NULL; e; e = n) {
+		n = e->next;
+		if (e->q_pid)
+			kill(e->q_pid, SIGKILL);
+		inotify_rm_watch(fd_list[FD_INOTIFY].fd, e->q_watchfd);
+		free(e);
+	}
+
+	for (i = 0; i < FD_MAX; i++) {
+		if (fd_list[i].fd >= 0) {
+			if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd_list[i].fd, NULL) < 0)
+				err("epoll_ctl: %m");
+			close(fd_list[i].fd);
+		}
+	}
+	close(epollfd);
+	logging_close();
+
+	return EXIT_SUCCESS;
+}
diff --git a/datasrc/ueventd/ueventd.h b/datasrc/ueventd/ueventd.h
new file mode 100644
index 00000000..a5e50379
--- /dev/null
+++ b/datasrc/ueventd/ueventd.h
@@ -0,0 +1,76 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+
+#ifndef __UEVENTD_H__
+#define __UEVENTD_H__
+
+#include <sys/types.h>
+#include <stdint.h>
+
+#define F_ROOT_DIR  (1 << 0)
+#define F_QUEUE_DIR (1 << 1)
+#define F_PAUSE_DIR (1 << 2)
+#define F_DIRTY     (1 << 3)
+#define F_PAUSED    (1 << 4)
+
+struct watch {
+	struct watch *next;
+	int   q_flags;
+	int   q_watchfd;
+	int   q_parentfd;
+	ino_t q_ino;
+	pid_t q_pid;
+	char  q_name[];
+};
+
+extern char *filter_dir;
+extern char *uevent_dir;
+extern char *handler_file;
+extern uint64_t session;
+
+/* memory.c */
+extern void *xcalloc(size_t nmemb, size_t size)          __attribute__((alloc_size(1, 2)));
+extern char *xasprintf(char **ptr, const char *fmt, ...) __attribute__((format(printf, 2, 3)));
+
+/* queue-processor.c */
+extern void process_events(struct watch *queue) __attribute__((nonnull(1), noreturn));
+
+/* path.c */
+#include <dirent.h>
+
+extern DIR *xopendir(const char *path)                              __attribute__((nonnull(1)));
+extern struct dirent *xreaddir(DIR *d, const char *path)            __attribute__((nonnull(1, 2)));
+extern int empty_directory(const char *path)                        __attribute__((nonnull(1)));
+extern ssize_t read_retry(int fd, void *buf, size_t count)          __attribute__((nonnull(2)));
+extern ssize_t write_retry(int fd, const void *buf, size_t count)   __attribute__((nonnull(2)));
+extern ssize_t write_loop(int fd, const char *buffer, size_t count) __attribute__((nonnull(2)));
+
+/* process.c */
+extern pid_t waitpid_retry(pid_t pid, int *wstatus, int options);
+
+#include <dirent.h>
+
+extern  int is_dot_dir(struct dirent *ent) __attribute__((nonnull(1)));
+
+/* logging.c */
+#include <unistd.h>
+#include <syslog.h>
+#include <stdlib.h>
+
+extern void logging_init(int level);
+extern void logging_close(void);
+extern int logging_level(const char *lvl)               __attribute__((nonnull(1)));
+extern void message(int priority, const char *fmt, ...) __attribute__((format(printf, 2, 3)));
+
+#define __message(level, format, arg...) message(level, format, ##arg)
+
+#define fatal(format, arg...)                       \
+	do {                                        \
+		__message(LOG_CRIT, "%s:%d: " format, __FILE__, __LINE__, ##arg); \
+		_exit(EXIT_FAILURE);                \
+	} while (0)
+
+#define err(format, arg...)  __message(LOG_ERR,   format, ##arg)
+#define info(format, arg...) __message(LOG_INFO,  format, ##arg)
+#define dbg(format, arg...)  __message(LOG_DEBUG, format, ##arg)
+
+#endif /* __UEVENTD_H__ */
-- 
2.33.7



  reply	other threads:[~2023-05-04 13:42 UTC|newest]

Thread overview: 45+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-05-04 13:42 [make-initrd] [PATCH 0/3] " Alexey Gladkov
2023-05-04 13:42 ` Alexey Gladkov [this message]
2023-05-05  3:08   ` [make-initrd] [PATCH 1/3] " Leonid Krivoshein
2023-05-05 17:02     ` Alexey Gladkov
2023-05-05  4:03   ` Leonid Krivoshein
2023-05-05 17:16     ` Alexey Gladkov
2023-05-05  5:21   ` Leonid Krivoshein
2023-05-05 17:24     ` Alexey Gladkov
2023-05-14 20:12   ` Leonid Krivoshein
2023-05-14 20:45     ` Alexey Gladkov
2023-05-14 20:57   ` Leonid Krivoshein
2023-05-15  8:52     ` Alexey Gladkov
2023-05-14 23:08   ` Leonid Krivoshein
2023-05-15  9:05     ` Alexey Gladkov
2023-05-15  0:47   ` Leonid Krivoshein
2023-05-15  9:12     ` Alexey Gladkov
2023-05-15  4:38   ` Leonid Krivoshein
2023-05-15 10:43     ` Alexey Gladkov
2023-05-18  6:39         ` Leonid Krivoshein
2023-05-18  7:05           ` Alexey Gladkov
2023-05-20  9:00   ` Leonid Krivoshein
2023-05-20 13:18     ` Alexey Gladkov
2023-05-20 15:17       ` Vladimir D. Seleznev
2023-05-20 17:23       ` Leonid Krivoshein
2023-05-20 18:51         ` Alexey Gladkov
2023-05-21  8:53         ` [make-initrd] [PATCH] ueventd: Don't use a epoll timeout when it's not needed Alexey Gladkov
2023-05-22  4:46           ` Leonid Krivoshein
2023-05-22  7:54             ` Alexey Gladkov
2023-05-22  9:19               ` Alexey Gladkov
2023-05-22  7:57             ` [make-initrd] [PATCH 1/2] ueventd: Fix memory leak Alexey Gladkov
2023-05-22  7:57             ` [make-initrd] [PATCH 2/2] ueventd: Change interface rd_asprintf_or_die Alexey Gladkov
2023-05-22  9:36               ` [make-initrd] [PATCH v2] " Alexey Gladkov
2023-05-20 16:37     ` [make-initrd] [PATCH 1/3] ueventd: Simplify call of the queue handler Alexey Gladkov
2023-05-20 16:37     ` [make-initrd] [PATCH 2/3] ueventd: Rename fd_list to fd_handler_list Alexey Gladkov
2023-05-20 16:37     ` [make-initrd] [PATCH 3/3] ueventd: Drop obsolete declarations Alexey Gladkov
2023-05-04 13:42 ` [make-initrd] [PATCH 2/3] Replace polld by uevent queue Alexey Gladkov
2023-05-04 13:42 ` [make-initrd] [PATCH 3/3] feature/kickstart: Reset rootdelay timer after kickstart Alexey Gladkov
2023-05-06 19:45 ` [make-initrd] ueventd: Add a prefix to the logging functions to avoid name collisions Alexey Gladkov
2023-05-06 19:45 ` [make-initrd] ueventd: Allow to configure the log destination Alexey Gladkov
2023-05-06 19:45 ` [make-initrd] ueventd: Pass the program name when initializing the logger Alexey Gladkov
2023-05-06 19:45 ` [make-initrd] ueventd: Rewrite logging function to make it more atomic Alexey Gladkov
2023-05-07 12:48 ` [make-initrd] [PATCH 0/3] Reimplement ueventd Alexey Gladkov
2023-05-13 11:50 ` Alexey Gladkov
2023-05-14 20:15   ` Leonid Krivoshein
2023-05-14 20:49     ` Alexey Gladkov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=6b444ed922286eb3df8f5322b1bddf9c55753eb8.1683200226.git.gladkov.alexey@gmail.com \
    --to=gladkov.alexey@gmail.com \
    --cc=make-initrd@lists.altlinux.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Make-initrd development discussion

This inbox may be cloned and mirrored by anyone:

	git clone --mirror http://lore.altlinux.org/make-initrd/0 make-initrd/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 make-initrd make-initrd/ http://lore.altlinux.org/make-initrd \
		make-initrd@lists.altlinux.org make-initrd@lists.altlinux.ru make-initrd@lists.altlinux.com
	public-inbox-index make-initrd

Example config snippet for mirrors.
Newsgroup available over NNTP:
	nntp://lore.altlinux.org/org.altlinux.lists.make-initrd


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git