scheduler done

master
Mike 4 years ago
parent dd52ed2f46
commit 2beaeedb6f

@ -1,3 +1,5 @@
const std = @import("std");
/// Processes are run by the Scheduler. They use a similar pattern to Allocators in that they are created and /// Processes are run by the Scheduler. They use a similar pattern to Allocators in that they are created and
/// added as fields in a parent struct, your actual process that will be run. /// added as fields in a parent struct, your actual process that will be run.
pub const Process = struct { pub const Process = struct {
@ -10,9 +12,11 @@ pub const Process = struct {
abortedFn: ?fn (self: *Process) void = null, abortedFn: ?fn (self: *Process) void = null,
failedFn: ?fn (self: *Process) void = null, failedFn: ?fn (self: *Process) void = null,
succeededFn: ?fn (self: *Process) void = null, succeededFn: ?fn (self: *Process) void = null,
deinit: fn (self: *Process, allocator: *std.mem.Allocator) void = undefined,
state: State = .uninitialized, state: State = .uninitialized,
stopped: bool = false, stopped: bool = false,
next: ?*Process = null,
pub fn getParent(self: *Process, comptime T: type) *T { pub fn getParent(self: *Process, comptime T: type) *T {
return @fieldParentPtr(T, "process", self); return @fieldParentPtr(T, "process", self);

@ -11,96 +11,52 @@ const Process = @import("process.zig").Process;
/// - when initializing the `process` field it ust be given an `updateFn`. All other callbacks are optional. /// - when initializing the `process` field it ust be given an `updateFn`. All other callbacks are optional.
/// - in any callback you can get your oiginal struct back via `process.getParent(@This())` /// - in any callback you can get your oiginal struct back via `process.getParent(@This())`
pub const Scheduler = struct { pub const Scheduler = struct {
handlers: std.ArrayList(ProcessHandler), processes: std.ArrayList(*Process),
allocator: *std.mem.Allocator, allocator: *std.mem.Allocator,
/// helper to create and prepare a process and wrap it in a ProcessHandler /// helper to create and prepare a process and wrap it in a ProcessHandler
fn createProcessHandler(comptime T: type, data: var) ProcessHandler { fn createProcessHandler(comptime T: type, data: var) *Process {
var proc = std.testing.allocator.create(T) catch unreachable; var proc = std.testing.allocator.create(T) catch unreachable;
proc.initialize(data); proc.initialize(data);
// get a closure so that we can safely deinit this later // get a closure so that we can safely deinit this later
var handlerDeinitFn = struct { proc.process.deinit = struct {
fn deinit(process: *Process, allocator: *std.mem.Allocator) void { fn deinit(process: *Process, allocator: *std.mem.Allocator) void {
if (process.next) |next_process| {
next_process.deinit(next_process, allocator);
}
allocator.destroy(@fieldParentPtr(T, "process", process)); allocator.destroy(@fieldParentPtr(T, "process", process));
} }
}.deinit; }.deinit;
return .{ return &proc.process;
.process = &proc.process,
.deinitChild = handlerDeinitFn,
};
} }
/// returned when appending a process so that sub-processes can be added to the process
const Continuation = struct { const Continuation = struct {
handler: *ProcessHandler,
pub fn init(handler: *ProcessHandler) Continuation {
return .{ .handler = handler };
}
// TODO: fix and return this when ProcessHandler can have next be a ProcessHandler
pub fn next(self: *@This(), comptime T: type, data: var) void { // *@This()
var next_handler = createProcessHandler(T, data);
self.handler.next = .{ .deinitChild = next_handler.deinitChild, .process = next_handler.process };
}
};
/// TODO: remove this when ProcessHandler can have `next` be a ProcessHandler. For now this acts as a data store
/// holding the data ProcessHandler requires.
const NextProcessHandler = struct {
deinitChild: fn (process: *Process, allocator: *std.mem.Allocator) void,
process: *Process, process: *Process,
pub fn asProcessHandler(self: @This()) ProcessHandler { pub fn init(process: *Process) Continuation {
return .{ .deinitChild = self.deinitChild, .process = self.process }; return .{ .process = process };
} }
};
const ProcessHandler = struct { pub fn next(self: *@This(), comptime T: type, data: var) *@This() {
deinitChild: fn (process: *Process, allocator: *std.mem.Allocator) void, self.process.next = createProcessHandler(T, data);
process: *Process, self.process = self.process.next.?;
next: ?NextProcessHandler = null, return self;
pub fn update(self: *ProcessHandler, allocator: *std.mem.Allocator) bool {
self.process.tick();
if (self.process.dead()) {
if (!self.process.rejected() and self.next != null) {
// kill the old Process parent
self.deinitChild(self.process, allocator);
// overwrite our fields and kick off the next process
self.deinitChild = self.next.?.deinitChild;
self.process = self.next.?.process;
self.next = null; // TODO: when ProcessHandler can have next be a ProcessHandler
return self.update(allocator);
} else {
return true;
}
}
return false;
}
pub fn deinit(self: @This(), allocator: *std.mem.Allocator) void {
if (self.next) |next_handler| {
next_handler.asProcessHandler().deinit(allocator);
}
self.deinitChild(self.process, allocator);
} }
}; };
pub fn init(allocator: *std.mem.Allocator) Scheduler { pub fn init(allocator: *std.mem.Allocator) Scheduler {
return .{ return .{
.handlers = std.ArrayList(ProcessHandler).init(allocator), .processes = std.ArrayList(*Process).init(allocator),
.allocator = allocator, .allocator = allocator,
}; };
} }
pub fn deinit(self: *Scheduler) void { pub fn deinit(self: *Scheduler) void {
self.clear(); self.clear();
self.handlers.deinit(); self.processes.deinit();
} }
/// Schedules a process for the next tick /// Schedules a process for the next tick
@ -108,22 +64,44 @@ pub const Scheduler = struct {
std.debug.assert(@hasDecl(T, "initialize")); std.debug.assert(@hasDecl(T, "initialize"));
std.debug.assert(@hasField(T, "process")); std.debug.assert(@hasField(T, "process"));
var handler = createProcessHandler(T, data); var process = createProcessHandler(T, data);
handler.process.tick(); process.tick();
self.processes.append(process) catch unreachable;
return Continuation.init(process);
}
fn updateProcess(process: **Process, allocator: *std.mem.Allocator) bool {
const current_process = process.*;
current_process.tick();
if (current_process.dead()) {
if (!current_process.rejected() and current_process.next != null) {
// grab the next process and null it out so we dont double-free it later
const next_process = current_process.next.?;
current_process.next = null;
process.* = next_process;
// kill the old Process parent
current_process.deinit(current_process, allocator);
return updateProcess(process, allocator);
} else {
return true;
}
}
self.handlers.append(handler) catch unreachable; return false;
return Continuation.init(&self.handlers.items[self.handlers.items.len - 1]);
} }
/// Updates all scheduled processes /// Updates all scheduled processes
pub fn update(self: *Scheduler) void { pub fn update(self: *Scheduler) void {
if (self.handlers.items.len == 0) return; if (self.processes.items.len == 0) return;
var i: usize = self.handlers.items.len - 1; var i: usize = self.processes.items.len - 1;
while (true) : (i -= 1) { while (true) : (i -= 1) {
if (self.handlers.items[i].update(self.allocator)) { if (updateProcess(&self.processes.items[i], self.allocator)) {
var dead_handler = self.handlers.swapRemove(i); var dead_process = self.processes.swapRemove(i);
dead_handler.deinit(self.allocator); dead_process.deinit(dead_process, self.allocator);
} }
if (i == 0) break; if (i == 0) break;
@ -132,20 +110,20 @@ pub const Scheduler = struct {
/// gets the number of processes still running /// gets the number of processes still running
pub fn len(self: Scheduler) usize { pub fn len(self: Scheduler) usize {
return self.handlers.items.len; return self.processes.items.len;
} }
/// resets the scheduler to its initial state and discards all the processes /// resets the scheduler to its initial state and discards all the processes
pub fn clear(self: *Scheduler) void { pub fn clear(self: *Scheduler) void {
for (self.handlers.items) |handler| { for (self.processes.items) |process| {
handler.deinit(self.allocator); process.deinit(process, self.allocator);
} }
self.handlers.items.len = 0; self.processes.items.len = 0;
} }
/// Aborts all scheduled processes. Unless an immediate operation is requested, the abort is scheduled for the next tick /// Aborts all scheduled processes. Unless an immediate operation is requested, the abort is scheduled for the next tick
pub fn abort(self: *Scheduler, immediately: bool) void { pub fn abort(self: *Scheduler, immediately: bool) void {
for (self.handlers.items) |handler| { for (self.processes.items) |handler| {
handler.process.abort(immediately); handler.process.abort(immediately);
} }
} }
@ -170,27 +148,27 @@ test "" {
} }
fn start(process: *Process) void { fn start(process: *Process) void {
const self = @fieldParentPtr(@This(), "process", process); const self = process.getParent(@This());
// std.debug.warn("start {}\n", .{self.fart}); // std.debug.warn("start {}\n", .{self.fart});
} }
fn aborted(process: *Process) void { fn aborted(process: *Process) void {
const self = @fieldParentPtr(@This(), "process", process); const self = process.getParent(@This());
// std.debug.warn("aborted {}\n", .{self.fart}); // std.debug.warn("aborted {}\n", .{self.fart});
} }
fn failed(process: *Process) void { fn failed(process: *Process) void {
const self = @fieldParentPtr(@This(), "process", process); const self = process.getParent(@This());
// std.debug.warn("failed {}\n", .{self.fart}); // std.debug.warn("failed {}\n", .{self.fart});
} }
fn succeeded(process: *Process) void { fn succeeded(process: *Process) void {
const self = @fieldParentPtr(@This(), "process", process); const self = process.getParent(@This());
// std.debug.warn("succeeded {}\n", .{self.fart}); // std.debug.warn("succeeded {}\n", .{self.fart});
} }
fn update(process: *Process) void { fn update(process: *Process) void {
const self = @fieldParentPtr(@This(), "process", process); const self = process.getParent(@This());
// std.debug.warn("update {}\n", .{self.fart}); // std.debug.warn("update {}\n", .{self.fart});
process.succeed(); process.succeed();
} }
@ -199,7 +177,9 @@ test "" {
var scheduler = Scheduler.init(std.testing.allocator); var scheduler = Scheduler.init(std.testing.allocator);
defer scheduler.deinit(); defer scheduler.deinit();
_ = scheduler.attach(Tester, 33).next(Tester, 66); _ = scheduler.attach(Tester, 33).next(Tester, 66).next(Tester, 88).next(Tester, 99);
scheduler.update();
scheduler.update();
scheduler.update(); scheduler.update();
scheduler.update(); scheduler.update();
scheduler.update(); scheduler.update();

@ -17,4 +17,7 @@ comptime {
// resources // resources
_ = @import("resources/cache.zig"); _ = @import("resources/cache.zig");
_ = @import("resources/assets.zig"); _ = @import("resources/assets.zig");
// process
_ = @import("process/scheduler.zig");
} }

Loading…
Cancel
Save