Schedular working

master
Mike 5 years ago
parent a352f8350c
commit cdef61d6dc

@ -1,10 +1,12 @@
/// Processes are run by the Scheduler. They use a similar pattern to Allocators in that they are created and
/// added as fields in a parent struct, your actual process that will be run.
pub const Process = struct { pub const Process = struct {
const State = enum(u8) { const State = enum(u8) {
uninitialized, running, paused, succeeded, failed, aborted, finished uninitialized, running, paused, succeeded, failed, aborted, finished
}; };
updateFn: fn (self: *Process) void, updateFn: fn (self: *Process) void,
initFn: ?fn (self: *Process) void = null, startFn: ?fn (self: *Process) void = null,
abortedFn: ?fn (self: *Process) void = null, abortedFn: ?fn (self: *Process) void = null,
failedFn: ?fn (self: *Process) void = null, failedFn: ?fn (self: *Process) void = null,
succeededFn: ?fn (self: *Process) void = null, succeededFn: ?fn (self: *Process) void = null,
@ -12,6 +14,10 @@ pub const Process = struct {
state: State = .uninitialized, state: State = .uninitialized,
stopped: bool = false, stopped: bool = false,
pub fn getParent(self: *Process, comptime T: type) *T {
return @fieldParentPtr(T, "process", self);
}
/// Terminates a process with success if it's still alive /// Terminates a process with success if it's still alive
pub fn succeed(self: *Process) void { pub fn succeed(self: *Process) void {
if (self.alive()) self.state = .succeeded; if (self.alive()) self.state = .succeeded;
@ -57,11 +63,11 @@ pub const Process = struct {
return self.stopped; return self.stopped;
} }
/// Updates a process and its internal state if required /// Updates a process and its internal state
pub fn tick(self: *Process) void { pub fn tick(self: *Process) void {
switch (self.state) { switch (self.state) {
.uninitialized => { .uninitialized => {
if (self.initFn) |func| func(self); if (self.startFn) |func| func(self);
self.state = .running; self.state = .running;
}, },
.running => { .running => {

@ -1,13 +1,23 @@
const std = @import("std"); const std = @import("std");
const Process = @import("process.zig").Process; const Process = @import("process.zig").Process;
/// Cooperative scheduler for processes. Each process is invoked once per tick. If a process terminates, it's
/// removed automatically from the scheduler and it's never invoked again. A process can also have a child. In
/// this case, the process is replaced with its child when it terminates if it returns with success. In case of errors,
/// both the process and its child are discarded. In order to invoke all scheduled processes, call the `update` member function
/// Processes add themselves by calling `attach` and must satisfy the following conditions:
/// - have a field `process: Process`
/// - have a method `initialize(self: *@This(), data: var) void` that initializes all fields and takes in a the data passed to `attach`
/// - when initializing the `process` field it ust be given an `updateFn`. All other callbacks are optional.
/// - in any callback you can get your oiginal struct back via `process.getParent(@This())`
pub const Scheduler = struct { pub const Scheduler = struct {
handlers: std.ArrayList(ProcessHandler), handlers: std.ArrayList(ProcessHandler),
allocator: *std.mem.Allocator, allocator: *std.mem.Allocator,
fn createProcessHandler(comptime T: type) ProcessHandler { /// helper to create and prepare a process and wrap it in a ProcessHandler
fn createProcessHandler(comptime T: type, data: var) ProcessHandler {
var proc = std.testing.allocator.create(T) catch unreachable; var proc = std.testing.allocator.create(T) catch unreachable;
proc.initialize(); proc.initialize(data);
// get a closure so that we can safely deinit this later // get a closure so that we can safely deinit this later
var handlerDeinitFn = struct { var handlerDeinitFn = struct {
@ -26,13 +36,13 @@ pub const Scheduler = struct {
handler: *ProcessHandler, handler: *ProcessHandler,
pub fn init(handler: *ProcessHandler) Continuation { pub fn init(handler: *ProcessHandler) Continuation {
return .{.handler = handler}; return .{ .handler = handler };
} }
// TODO: fix and return when ProcessHandler can have next be a ProcessHandler // TODO: fix and return this when ProcessHandler can have next be a ProcessHandler
pub fn next(self: *@This(), comptime T: type) void { // *@This() pub fn next(self: *@This(), comptime T: type, data: var) void { // *@This()
var next_handler = createProcessHandler(T); var next_handler = createProcessHandler(T, data);
self.handler.next = .{.deinitChild = next_handler.deinitChild, .process = next_handler.process}; self.handler.next = .{ .deinitChild = next_handler.deinitChild, .process = next_handler.process };
} }
}; };
@ -42,7 +52,7 @@ pub const Scheduler = struct {
process: *Process, process: *Process,
pub fn asProcessHandler(self: @This()) ProcessHandler { pub fn asProcessHandler(self: @This()) ProcessHandler {
return .{.deinitChild = self.deinitChild, .process = self.process}; return .{ .deinitChild = self.deinitChild, .process = self.process };
} }
}; };
@ -87,19 +97,17 @@ pub const Scheduler = struct {
}; };
} }
pub fn deinit(self: Scheduler) void { pub fn deinit(self: *Scheduler) void {
for (self.handlers.items) |handler| { self.clear();
handler.deinit(self.allocator);
}
self.handlers.deinit(); self.handlers.deinit();
} }
/// Schedules a process for the next tick /// Schedules a process for the next tick
pub fn attach(self: *Scheduler, comptime T: type) Continuation { pub fn attach(self: *Scheduler, comptime T: type, data: var) Continuation {
std.debug.assert(@hasDecl(T, "initialize")); std.debug.assert(@hasDecl(T, "initialize"));
std.debug.assert(@hasField(T, "process")); std.debug.assert(@hasField(T, "process"));
var handler = createProcessHandler(T); var handler = createProcessHandler(T, data);
handler.process.tick(); handler.process.tick();
self.handlers.append(handler) catch unreachable; self.handlers.append(handler) catch unreachable;
@ -129,7 +137,7 @@ pub const Scheduler = struct {
/// resets the scheduler to its initial state and discards all the processes /// resets the scheduler to its initial state and discards all the processes
pub fn clear(self: *Scheduler) void { pub fn clear(self: *Scheduler) void {
for (self.handlers.items) |handler| { for (self.handlers.items) |handler| {
handler.deinit(handler.process, self.allocator); handler.deinit(self.allocator);
} }
self.handlers.items.len = 0; self.handlers.items.len = 0;
} }
@ -142,8 +150,6 @@ pub const Scheduler = struct {
} }
}; };
var fart: usize = 666;
test "" { test "" {
std.debug.warn("\n", .{}); std.debug.warn("\n", .{});
@ -151,41 +157,40 @@ test "" {
process: Process, process: Process,
fart: usize, fart: usize,
pub fn initialize(self: *@This()) void { pub fn initialize(self: *@This(), data: var) void {
self.process = .{ self.process = .{
.initFn = init, .startFn = start,
.updateFn = update, .updateFn = update,
.abortedFn = aborted, .abortedFn = aborted,
.failedFn = failed, .failedFn = failed,
.succeededFn = succeeded, .succeededFn = succeeded,
}; };
self.fart = fart; self.fart = data;
fart += 111;
} }
fn init(process: *Process) void { fn start(process: *Process) void {
const self = @fieldParentPtr(@This(), "process", process); const self = @fieldParentPtr(@This(), "process", process);
std.debug.warn("init {}\n", .{self.fart}); // std.debug.warn("start {}\n", .{self.fart});
} }
fn aborted(process: *Process) void { fn aborted(process: *Process) void {
const self = @fieldParentPtr(@This(), "process", process); const self = @fieldParentPtr(@This(), "process", process);
std.debug.warn("aborted {}\n", .{self.fart}); // std.debug.warn("aborted {}\n", .{self.fart});
} }
fn failed(process: *Process) void { fn failed(process: *Process) void {
const self = @fieldParentPtr(@This(), "process", process); const self = @fieldParentPtr(@This(), "process", process);
std.debug.warn("failed {}\n", .{self.fart}); // std.debug.warn("failed {}\n", .{self.fart});
} }
fn succeeded(process: *Process) void { fn succeeded(process: *Process) void {
const self = @fieldParentPtr(@This(), "process", process); const self = @fieldParentPtr(@This(), "process", process);
std.debug.warn("succeeded {}\n", .{self.fart}); // std.debug.warn("succeeded {}\n", .{self.fart});
} }
fn update(process: *Process) void { fn update(process: *Process) void {
const self = @fieldParentPtr(@This(), "process", process); const self = @fieldParentPtr(@This(), "process", process);
std.debug.warn("update {}\n", .{self.fart}); // std.debug.warn("update {}\n", .{self.fart});
process.succeed(); process.succeed();
} }
}; };
@ -193,8 +198,56 @@ test "" {
var scheduler = Scheduler.init(std.testing.allocator); var scheduler = Scheduler.init(std.testing.allocator);
defer scheduler.deinit(); defer scheduler.deinit();
_ = scheduler.attach(Tester).next(Tester); _ = scheduler.attach(Tester, 33).next(Tester, 66);
scheduler.update(); scheduler.update();
scheduler.update(); scheduler.update();
scheduler.update(); scheduler.update();
} }
test "scheduler.clear" {
const Tester = struct {
process: Process,
pub fn initialize(self: *@This(), data: var) void {
self.process = .{ .updateFn = update };
}
fn update(process: *Process) void {
std.debug.assert(false);
}
};
var scheduler = Scheduler.init(std.testing.allocator);
defer scheduler.deinit();
_ = scheduler.attach(Tester, {}).next(Tester, {});
scheduler.clear();
scheduler.update();
}
test "scheduler.attach.next" {
const Tester = struct {
process: Process,
counter: *usize,
pub fn initialize(self: *@This(), data: var) void {
self.process = .{ .updateFn = update };
self.counter = data;
}
fn update(process: *Process) void {
const self = process.getParent(@This());
self.counter.* += 1;
process.succeed();
}
};
var scheduler = Scheduler.init(std.testing.allocator);
defer scheduler.deinit();
var counter: usize = 0;
_ = scheduler.attach(Tester, &counter).next(Tester, &counter);
scheduler.update();
scheduler.update();
std.testing.expectEqual(counter, 2);
}

Loading…
Cancel
Save