Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions lib/std/Io.zig
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,9 @@ pub const VTable = struct {
/// Thread-safe.
cancelRequested: *const fn (?*anyopaque) bool,

/// Executes `start` asynchronously in a manner such that it cleans itself
/// up. This mode does not support results, await, or cancel.
/// When this function returns, implementation guarantees that `start` has
/// either already been called, or a unit of concurrency has been assigned
/// to the task of calling the function.
///
/// Thread-safe.
groupAsync: *const fn (
Expand All @@ -640,6 +641,17 @@ pub const VTable = struct {
context_alignment: std.mem.Alignment,
start: *const fn (*Group, context: *const anyopaque) void,
) void,
/// Thread-safe.
groupConcurrent: *const fn (
/// Corresponds to `Io.userdata`.
userdata: ?*anyopaque,
/// Owner of the spawned async task.
group: *Group,
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (*Group, context: *const anyopaque) void,
) ConcurrentError!void,
groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,

Expand Down Expand Up @@ -1021,8 +1033,8 @@ pub const Group = struct {
/// Threadsafe.
///
/// See also:
/// * `Io.async`
/// * `concurrent`
/// * `Io.async`
pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
const Args = @TypeOf(args);
const TypeErased = struct {
Expand All @@ -1035,6 +1047,34 @@ pub const Group = struct {
io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
}

/// Calls `function` with `args`, such that the function is not guaranteed
/// to have returned until `wait` is called, allowing the caller to
/// progress while waiting for any `Io` operations.
///
/// The resource spawned is owned by the group; after this is called,
/// `wait` or `cancel` must be called before the group is deinitialized.
///
/// This has stronger guarantee than `async`, placing restrictions on what kind
/// of `Io` implementations are supported. By calling `async` instead, one
/// allows, for example, stackful single-threaded blocking I/O.
///
/// Threadsafe.
///
/// See also:
/// * `async`
/// * `Io.concurrent`
pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void {
const Args = @TypeOf(args);
const TypeErased = struct {
fn start(group: *Group, context: *const anyopaque) void {
_ = group;
const args_casted: *const Args = @ptrCast(@alignCast(context));
@call(.auto, function, args_casted.*);
}
};
return io.vtable.groupConcurrent(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
}

/// Blocks until all tasks of the group finish. During this time,
/// cancellation requests propagate to all members of the group.
///
Expand Down
54 changes: 54 additions & 0 deletions lib/std/Io/Threaded.zig
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub fn init(
/// * `Io.VTable.async`
/// * `Io.VTable.concurrent`
/// * `Io.VTable.groupAsync`
/// * `Io.VTable.groupConcurrent`
/// If these functions are avoided, then `Allocator.failing` may be passed
/// here.
gpa: Allocator,
Expand Down Expand Up @@ -221,6 +222,7 @@ pub fn io(t: *Threaded) Io {
.select = select,

.groupAsync = groupAsync,
.groupConcurrent = groupConcurrent,
.groupWait = groupWait,
.groupCancel = groupCancel,

Expand Down Expand Up @@ -317,6 +319,7 @@ pub fn ioBasic(t: *Threaded) Io {
.select = select,

.groupAsync = groupAsync,
.groupConcurrent = groupConcurrent,
.groupWait = groupWait,
.groupCancel = groupCancel,

Expand Down Expand Up @@ -729,6 +732,57 @@ fn groupAsync(
t.cond.signal();
}

fn groupConcurrent(
userdata: ?*anyopaque,
group: *Io.Group,
context: []const u8,
context_alignment: Alignment,
start: *const fn (*Io.Group, context: *const anyopaque) void,
) Io.ConcurrentError!void {
if (builtin.single_threaded) return error.ConcurrencyUnavailable;

const t: *Threaded = @ptrCast(@alignCast(userdata));

const gpa = t.allocator;
const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch
return error.ConcurrencyUnavailable;

t.mutex.lock();
defer t.mutex.unlock();

const busy_count = t.busy_count;

if (busy_count >= @intFromEnum(t.concurrent_limit))
return error.ConcurrencyUnavailable;

t.busy_count = busy_count + 1;
errdefer t.busy_count = busy_count;

const pool_size = t.wait_group.value();
if (pool_size - busy_count == 0) {
t.wait_group.start();
errdefer t.wait_group.finish();

const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch
return error.ConcurrencyUnavailable;
thread.detach();
}

// Append to the group linked list inside the mutex to make `Io.Group.concurrent` thread-safe.
gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
group.token = &gc.node;

t.run_queue.prepend(&gc.closure.node);

// This needs to be done before unlocking the mutex to avoid a race with
// the associated task finishing.
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const prev_state = group_state.fetchAdd(GroupClosure.sync_one_pending, .monotonic);
assert((prev_state / GroupClosure.sync_one_pending) < (std.math.maxInt(usize) / GroupClosure.sync_one_pending));

t.cond.signal();
}

fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const gpa = t.allocator;
Expand Down
26 changes: 26 additions & 0 deletions lib/std/Io/test.zig
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,32 @@ fn sleep(io: Io, result: *usize) void {
result.* = 1;
}

test "Group concurrent" {
const io = testing.io;

var group: Io.Group = .init;
defer group.cancel(io);
var results: [2]usize = undefined;

group.concurrent(io, count, .{ 1, 10, &results[0] }) catch |err| switch (err) {
error.ConcurrencyUnavailable => {
try testing.expect(builtin.single_threaded);
return;
},
};

group.concurrent(io, count, .{ 20, 30, &results[1] }) catch |err| switch (err) {
error.ConcurrencyUnavailable => {
try testing.expect(builtin.single_threaded);
return;
},
};

group.wait(io);

try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results);
}

test "select" {
const io = testing.io;

Expand Down