236 lines
7.8 KiB
Zig
Raw Normal View History

2020-06-13 18:36:57 -07:00
const std = @import("std");
const Process = @import("process.zig").Process;
2020-06-13 21:08:00 -07:00
/// Cooperative scheduler for processes. Each process is invoked once per tick. If a process terminates, it's
/// removed automatically from the scheduler and it's never invoked again. A process can also have a child. In
/// this case, the process is replaced with its child when it terminates if it returns with success. In case of errors,
/// both the process and its child are discarded. In order to invoke all scheduled processes, call the `update` member function
/// Processes add themselves by calling `attach` and must satisfy the following conditions:
/// - have a field `process: Process`
/// - have a method `initialize(self: *@This(), data: var) void` that initializes all fields and takes in a the data passed to `attach`
/// - when initializing the `process` field it ust be given an `updateFn`. All other callbacks are optional.
/// - in any callback you can get your oiginal struct back via `process.getParent(@This())`
2020-06-13 18:36:57 -07:00
pub const Scheduler = struct {
2020-06-13 21:57:27 -07:00
processes: std.ArrayList(*Process),
2020-06-13 18:36:57 -07:00
allocator: *std.mem.Allocator,
2020-06-15 23:33:27 -07:00
/// helper to create and prepare a process
fn createProcessHandler(comptime T: type, data: var, allocator: *std.mem.Allocator) *Process {
var proc = allocator.create(T) catch unreachable;
2020-06-13 21:08:00 -07:00
proc.initialize(data);
2020-06-13 18:36:57 -07:00
// get a closure so that we can safely deinit this later
2020-06-13 21:57:27 -07:00
proc.process.deinit = struct {
2020-06-13 18:36:57 -07:00
fn deinit(process: *Process, allocator: *std.mem.Allocator) void {
2020-06-13 21:57:27 -07:00
if (process.next) |next_process| {
next_process.deinit(next_process, allocator);
}
2020-06-13 18:36:57 -07:00
allocator.destroy(@fieldParentPtr(T, "process", process));
}
}.deinit;
2020-06-13 21:57:27 -07:00
return &proc.process;
2020-06-13 18:36:57 -07:00
}
2020-06-13 21:57:27 -07:00
/// returned when appending a process so that sub-processes can be added to the process
2020-06-13 18:36:57 -07:00
const Continuation = struct {
process: *Process,
2020-06-15 23:33:27 -07:00
allocator: *std.mem.Allocator,
2020-06-13 18:36:57 -07:00
2020-06-15 23:33:27 -07:00
pub fn init(process: *Process, allocator: *std.mem.Allocator) Continuation {
return .{ .process = process, .allocator = allocator };
2020-06-13 18:36:57 -07:00
}
2020-06-13 21:57:27 -07:00
pub fn next(self: *@This(), comptime T: type, data: var) *@This() {
2020-06-15 23:33:27 -07:00
self.process.next = createProcessHandler(T, data, self.allocator);
2020-06-13 21:57:27 -07:00
self.process = self.process.next.?;
return self;
2020-06-13 18:36:57 -07:00
}
};
pub fn init(allocator: *std.mem.Allocator) Scheduler {
return .{
2020-06-13 21:57:27 -07:00
.processes = std.ArrayList(*Process).init(allocator),
2020-06-13 18:36:57 -07:00
.allocator = allocator,
};
}
2020-06-13 21:08:00 -07:00
pub fn deinit(self: *Scheduler) void {
self.clear();
2020-06-13 21:57:27 -07:00
self.processes.deinit();
2020-06-13 18:36:57 -07:00
}
/// Schedules a process for the next tick
2020-06-13 21:08:00 -07:00
pub fn attach(self: *Scheduler, comptime T: type, data: var) Continuation {
2020-06-13 18:36:57 -07:00
std.debug.assert(@hasDecl(T, "initialize"));
std.debug.assert(@hasField(T, "process"));
2020-06-15 23:33:27 -07:00
var process = createProcessHandler(T, data, self.allocator);
2020-06-13 21:57:27 -07:00
process.tick();
self.processes.append(process) catch unreachable;
2020-06-15 23:33:27 -07:00
return Continuation.init(process, self.allocator);
2020-06-13 21:57:27 -07:00
}
fn updateProcess(process: **Process, allocator: *std.mem.Allocator) bool {
const current_process = process.*;
current_process.tick();
if (current_process.dead()) {
if (!current_process.rejected() and current_process.next != null) {
// grab the next process and null it out so we dont double-free it later
const next_process = current_process.next.?;
current_process.next = null;
process.* = next_process;
// kill the old Process parent
current_process.deinit(current_process, allocator);
return updateProcess(process, allocator);
} else {
return true;
}
}
2020-06-13 18:36:57 -07:00
2020-06-13 21:57:27 -07:00
return false;
2020-06-13 18:36:57 -07:00
}
/// Updates all scheduled processes
pub fn update(self: *Scheduler) void {
2020-06-13 21:57:27 -07:00
if (self.processes.items.len == 0) return;
2020-06-13 18:36:57 -07:00
2020-06-13 21:57:27 -07:00
var i: usize = self.processes.items.len - 1;
2020-06-13 18:36:57 -07:00
while (true) : (i -= 1) {
2020-06-13 21:57:27 -07:00
if (updateProcess(&self.processes.items[i], self.allocator)) {
var dead_process = self.processes.swapRemove(i);
dead_process.deinit(dead_process, self.allocator);
2020-06-13 18:36:57 -07:00
}
if (i == 0) break;
}
}
/// gets the number of processes still running
pub fn len(self: Scheduler) usize {
2020-06-13 21:57:27 -07:00
return self.processes.items.len;
2020-06-13 18:36:57 -07:00
}
/// resets the scheduler to its initial state and discards all the processes
pub fn clear(self: *Scheduler) void {
2020-06-13 21:57:27 -07:00
for (self.processes.items) |process| {
process.deinit(process, self.allocator);
2020-06-13 18:36:57 -07:00
}
2020-06-13 21:57:27 -07:00
self.processes.items.len = 0;
2020-06-13 18:36:57 -07:00
}
/// Aborts all scheduled processes. Unless an immediate operation is requested, the abort is scheduled for the next tick
pub fn abort(self: *Scheduler, immediately: bool) void {
2020-06-13 21:57:27 -07:00
for (self.processes.items) |handler| {
2020-06-13 18:36:57 -07:00
handler.process.abort(immediately);
}
}
};
test "" {
std.debug.warn("\n", .{});
const Tester = struct {
process: Process,
fart: usize,
2020-06-13 21:08:00 -07:00
pub fn initialize(self: *@This(), data: var) void {
2020-06-13 18:36:57 -07:00
self.process = .{
2020-06-13 21:08:00 -07:00
.startFn = start,
2020-06-13 18:36:57 -07:00
.updateFn = update,
.abortedFn = aborted,
.failedFn = failed,
.succeededFn = succeeded,
};
2020-06-13 21:08:00 -07:00
self.fart = data;
2020-06-13 18:36:57 -07:00
}
2020-06-13 21:08:00 -07:00
fn start(process: *Process) void {
2020-06-13 21:57:27 -07:00
const self = process.getParent(@This());
2020-06-13 21:08:00 -07:00
// std.debug.warn("start {}\n", .{self.fart});
2020-06-13 18:36:57 -07:00
}
fn aborted(process: *Process) void {
2020-06-13 21:57:27 -07:00
const self = process.getParent(@This());
2020-06-13 21:08:00 -07:00
// std.debug.warn("aborted {}\n", .{self.fart});
2020-06-13 18:36:57 -07:00
}
fn failed(process: *Process) void {
2020-06-13 21:57:27 -07:00
const self = process.getParent(@This());
2020-06-13 21:08:00 -07:00
// std.debug.warn("failed {}\n", .{self.fart});
2020-06-13 18:36:57 -07:00
}
fn succeeded(process: *Process) void {
2020-06-13 21:57:27 -07:00
const self = process.getParent(@This());
2020-06-13 21:08:00 -07:00
// std.debug.warn("succeeded {}\n", .{self.fart});
2020-06-13 18:36:57 -07:00
}
fn update(process: *Process) void {
2020-06-13 21:57:27 -07:00
const self = process.getParent(@This());
2020-06-13 21:08:00 -07:00
// std.debug.warn("update {}\n", .{self.fart});
2020-06-13 18:36:57 -07:00
process.succeed();
}
};
var scheduler = Scheduler.init(std.testing.allocator);
defer scheduler.deinit();
2020-06-13 21:57:27 -07:00
_ = scheduler.attach(Tester, 33).next(Tester, 66).next(Tester, 88).next(Tester, 99);
scheduler.update();
scheduler.update();
2020-06-13 18:36:57 -07:00
scheduler.update();
scheduler.update();
scheduler.update();
}
2020-06-13 21:08:00 -07:00
test "scheduler.clear" {
const Tester = struct {
process: Process,
pub fn initialize(self: *@This(), data: var) void {
self.process = .{ .updateFn = update };
}
fn update(process: *Process) void {
std.debug.assert(false);
}
};
var scheduler = Scheduler.init(std.testing.allocator);
defer scheduler.deinit();
_ = scheduler.attach(Tester, {}).next(Tester, {});
scheduler.clear();
scheduler.update();
}
test "scheduler.attach.next" {
const Tester = struct {
process: Process,
counter: *usize,
pub fn initialize(self: *@This(), data: var) void {
self.process = .{ .updateFn = update };
self.counter = data;
}
fn update(process: *Process) void {
const self = process.getParent(@This());
self.counter.* += 1;
process.succeed();
}
};
var scheduler = Scheduler.init(std.testing.allocator);
defer scheduler.deinit();
var counter: usize = 0;
_ = scheduler.attach(Tester, &counter).next(Tester, &counter);
scheduler.update();
scheduler.update();
std.testing.expectEqual(counter, 2);
}