Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 48 additions & 64 deletions lib/compiler/build_runner.zig
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ pub fn main() !void {

var targets = std.array_list.Managed([]const u8).init(arena);
var debug_log_scopes = std.array_list.Managed([]const u8).init(arena);
var thread_pool_options: std.Thread.Pool.Options = .{ .allocator = arena };

var install_prefix: ?[]const u8 = null;
var dir_list = std.Build.DirList{};
Expand Down Expand Up @@ -413,19 +412,11 @@ pub fn main() !void {
};
} else if (mem.eql(u8, arg, "-fno-reference-trace")) {
builder.reference_trace = null;
} else if (mem.startsWith(u8, arg, "-j")) {
const num = arg["-j".len..];
const n_jobs = std.fmt.parseUnsigned(u32, num, 10) catch |err| {
std.debug.print("unable to parse jobs count '{s}': {s}", .{
num, @errorName(err),
});
process.exit(1);
};
if (n_jobs < 1) {
std.debug.print("number of jobs must be at least 1\n", .{});
process.exit(1);
}
thread_pool_options.n_jobs = n_jobs;
} else if (mem.cutPrefix(u8, arg, "-j")) |text| {
const n = std.fmt.parseUnsigned(u32, text, 10) catch |err|
fatal("unable to parse jobs count '{s}': {t}", .{ text, err });
if (n < 1) fatal("number of jobs must be at least 1", .{});
threaded.setAsyncLimit(.limited(n));
} else if (mem.eql(u8, arg, "--")) {
builder.args = argsRest(args, arg_idx);
break;
Expand Down Expand Up @@ -503,7 +494,7 @@ pub fn main() !void {

.max_rss = max_rss,
.max_rss_is_default = false,
.max_rss_mutex = .{},
.max_rss_mutex = .init,
.skip_oom_steps = skip_oom_steps,
.unit_test_timeout_ns = test_timeout_ns,

Expand All @@ -516,7 +507,6 @@ pub fn main() !void {
.error_style = error_style,
.multiline_errors = multiline_errors,
.summary = summary orelse if (watch or webui_listen != null) .line else .failures,
.thread_pool = undefined,

.ttyconf = ttyconf,
};
Expand Down Expand Up @@ -547,16 +537,12 @@ pub fn main() !void {
break :w try .init();
};

try run.thread_pool.init(thread_pool_options);
defer run.thread_pool.deinit();

const now = Io.Clock.Timestamp.now(io, .awake) catch |err| fatal("failed to collect timestamp: {t}", .{err});

run.web_server = if (webui_listen) |listen_address| ws: {
if (builtin.single_threaded) unreachable; // `fatal` above
break :ws .init(.{
.gpa = gpa,
.thread_pool = &run.thread_pool,
.ttyconf = ttyconf,
.graph = &graph,
.all_steps = run.step_stack.keys(),
Expand Down Expand Up @@ -597,7 +583,7 @@ pub fn main() !void {

if (run.web_server) |*ws| {
assert(!watch); // fatal error after CLI parsing
while (true) switch (ws.wait()) {
while (true) switch (try ws.wait()) {
.rebuild => {
for (run.step_stack.keys()) |step| {
step.state = .precheck_done;
Expand Down Expand Up @@ -666,7 +652,7 @@ const Run = struct {
gpa: Allocator,
max_rss: u64,
max_rss_is_default: bool,
max_rss_mutex: std.Thread.Mutex,
max_rss_mutex: Io.Mutex,
skip_oom_steps: bool,
unit_test_timeout_ns: ?u64,
watch: bool,
Expand All @@ -675,7 +661,6 @@ const Run = struct {
memory_blocked_steps: std.ArrayList(*Step),
/// Allocated into `gpa`.
step_stack: std.AutoArrayHashMapUnmanaged(*Step, void),
thread_pool: std.Thread.Pool,
/// Similar to the `tty.Config` returned by `std.debug.lockStderrWriter`,
/// but also respects the '--color' flag.
ttyconf: tty.Config,
Expand Down Expand Up @@ -754,14 +739,13 @@ fn runStepNames(
const gpa = run.gpa;
const io = b.graph.io;
const step_stack = &run.step_stack;
const thread_pool = &run.thread_pool;

{
const step_prog = parent_prog_node.start("steps", step_stack.count());
defer step_prog.end();

var wait_group: std.Thread.WaitGroup = .{};
defer wait_group.wait();
var group: Io.Group = .init;
defer group.wait(io);

// Here we spawn the initial set of tasks with a nice heuristic -
// dependency order. Each worker when it finishes a step will then
Expand All @@ -771,9 +755,7 @@ fn runStepNames(
const step = steps_slice[steps_slice.len - i - 1];
if (step.state == .skipped_oom) continue;

thread_pool.spawnWg(&wait_group, workerMakeOneStep, .{
&wait_group, b, step, step_prog, run,
});
group.async(io, workerMakeOneStep, .{ &group, b, step, step_prog, run });
}
}

Expand Down Expand Up @@ -855,7 +837,6 @@ fn runStepNames(
var f = std.Build.Fuzz.init(
gpa,
io,
thread_pool,
run.ttyconf,
step_stack.keys(),
parent_prog_node,
Expand Down Expand Up @@ -1318,13 +1299,14 @@ fn constructGraphAndCheckForDependencyLoop(
}

fn workerMakeOneStep(
wg: *std.Thread.WaitGroup,
group: *Io.Group,
b: *std.Build,
s: *Step,
prog_node: std.Progress.Node,
run: *Run,
) void {
const thread_pool = &run.thread_pool;
const io = b.graph.io;
const gpa = run.gpa;

// First, check the conditions for running this step. If they are not met,
// then we return without doing the step, relying on another worker to
Expand All @@ -1347,8 +1329,8 @@ fn workerMakeOneStep(
}

if (s.max_rss != 0) {
run.max_rss_mutex.lock();
defer run.max_rss_mutex.unlock();
run.max_rss_mutex.lockUncancelable(io);
defer run.max_rss_mutex.unlock(io);

// Avoid running steps twice.
if (s.state != .precheck_done) {
Expand All @@ -1360,7 +1342,7 @@ fn workerMakeOneStep(
if (new_claimed_rss > run.max_rss) {
// Running this step right now could possibly exceed the allotted RSS.
// Add this step to the queue of memory-blocked steps.
run.memory_blocked_steps.append(run.gpa, s) catch @panic("OOM");
run.memory_blocked_steps.append(gpa, s) catch @panic("OOM");
return;
}

Expand All @@ -1381,12 +1363,11 @@ fn workerMakeOneStep(

const make_result = s.make(.{
.progress_node = sub_prog_node,
.thread_pool = thread_pool,
.watch = run.watch,
.web_server = if (run.web_server) |*ws| ws else null,
.ttyconf = run.ttyconf,
.unit_test_timeout_ns = run.unit_test_timeout_ns,
.gpa = run.gpa,
.gpa = gpa,
});

// No matter the result, we want to display error/warning messages.
Expand All @@ -1397,7 +1378,7 @@ fn workerMakeOneStep(
const bw, _ = std.debug.lockStderrWriter(&stdio_buffer_allocation);
defer std.debug.unlockStderrWriter();
const ttyconf = run.ttyconf;
printErrorMessages(run.gpa, s, .{}, bw, ttyconf, run.error_style, run.multiline_errors) catch {};
printErrorMessages(gpa, s, .{}, bw, ttyconf, run.error_style, run.multiline_errors) catch {};
}

handle_result: {
Expand All @@ -1419,40 +1400,43 @@ fn workerMakeOneStep(

// Successful completion of a step, so we queue up its dependants as well.
for (s.dependants.items) |dep| {
thread_pool.spawnWg(wg, workerMakeOneStep, .{
wg, b, dep, prog_node, run,
});
group.async(io, workerMakeOneStep, .{ group, b, dep, prog_node, run });
}
}

// If this is a step that claims resources, we must now queue up other
// steps that are waiting for resources.
if (s.max_rss != 0) {
run.max_rss_mutex.lock();
defer run.max_rss_mutex.unlock();

// Give the memory back to the scheduler.
run.claimed_rss -= s.max_rss;
// Avoid kicking off too many tasks that we already know will not have
// enough resources.
var remaining = run.max_rss - run.claimed_rss;
var i: usize = 0;
var j: usize = 0;
while (j < run.memory_blocked_steps.items.len) : (j += 1) {
const dep = run.memory_blocked_steps.items[j];
assert(dep.max_rss != 0);
if (dep.max_rss <= remaining) {
remaining -= dep.max_rss;

thread_pool.spawnWg(wg, workerMakeOneStep, .{
wg, b, dep, prog_node, run,
});
} else {
run.memory_blocked_steps.items[i] = dep;
i += 1;
var dispatch_deps: std.ArrayList(*Step) = .empty;
defer dispatch_deps.deinit(gpa);
dispatch_deps.ensureUnusedCapacity(gpa, run.memory_blocked_steps.items.len) catch @panic("OOM");

{
run.max_rss_mutex.lockUncancelable(io);
defer run.max_rss_mutex.unlock(io);

// Give the memory back to the scheduler.
run.claimed_rss -= s.max_rss;
// Avoid kicking off too many tasks that we already know will not have
// enough resources.
var remaining = run.max_rss - run.claimed_rss;
var i: usize = 0;
for (run.memory_blocked_steps.items) |dep| {
assert(dep.max_rss != 0);
if (dep.max_rss <= remaining) {
remaining -= dep.max_rss;
dispatch_deps.appendAssumeCapacity(dep);
} else {
run.memory_blocked_steps.items[i] = dep;
i += 1;
}
}
run.memory_blocked_steps.shrinkRetainingCapacity(i);
}
for (dispatch_deps.items) |dep| {
// Must be called without max_rss_mutex held in case it executes recursively.
group.async(io, workerMakeOneStep, .{ group, b, dep, prog_node, run });
}
run.memory_blocked_steps.shrinkRetainingCapacity(i);
}
}

Expand Down
31 changes: 22 additions & 9 deletions lib/std/Build/Cache.zig
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ manifest_dir: fs.Dir,
hash: HashHelper = .{},
/// This value is accessed from multiple threads, protected by mutex.
recent_problematic_timestamp: Io.Timestamp = .zero,
mutex: std.Thread.Mutex = .{},
mutex: Io.Mutex = .init,

/// A set of strings such as the zig library directory or project source root, which
/// are stripped from the file paths before putting into the cache. They
Expand Down Expand Up @@ -474,6 +474,7 @@ pub const Manifest = struct {
/// A cache manifest file exists however it could not be parsed.
InvalidFormat,
OutOfMemory,
Canceled,
};

/// Check the cache to see if the input exists in it. If it exists, returns `true`.
Expand Down Expand Up @@ -559,12 +560,14 @@ pub const Manifest = struct {
self.diagnostic = .{ .manifest_create = error.FileNotFound };
return error.CacheCheckFailed;
},
error.Canceled => return error.Canceled,
else => |e| {
self.diagnostic = .{ .manifest_create = e };
return error.CacheCheckFailed;
},
}
},
error.Canceled => return error.Canceled,
else => |e| {
self.diagnostic = .{ .manifest_create = e };
return error.CacheCheckFailed;
Expand Down Expand Up @@ -762,6 +765,7 @@ pub const Manifest = struct {
// Every digest before this one has been populated successfully.
return .{ .miss = .{ .file_digests_populated = idx } };
},
error.Canceled => return error.Canceled,
else => |e| {
self.diagnostic = .{ .file_open = .{
.file_index = idx,
Expand Down Expand Up @@ -790,7 +794,7 @@ pub const Manifest = struct {
.inode = actual_stat.inode,
};

if (self.isProblematicTimestamp(cache_hash_file.stat.mtime)) {
if (try self.isProblematicTimestamp(cache_hash_file.stat.mtime)) {
// The actual file has an unreliable timestamp, force it to be hashed
cache_hash_file.stat.mtime = .zero;
cache_hash_file.stat.inode = 0;
Expand Down Expand Up @@ -848,16 +852,18 @@ pub const Manifest = struct {
}
}

fn isProblematicTimestamp(man: *Manifest, timestamp: Io.Timestamp) bool {
fn isProblematicTimestamp(man: *Manifest, timestamp: Io.Timestamp) error{Canceled}!bool {
const io = man.cache.io;

// If the file_time is prior to the most recent problematic timestamp
// then we don't need to access the filesystem.
if (timestamp.nanoseconds < man.recent_problematic_timestamp.nanoseconds)
return false;

// Next we will check the globally shared Cache timestamp, which is accessed
// from multiple threads.
man.cache.mutex.lock();
defer man.cache.mutex.unlock();
try man.cache.mutex.lock(io);
defer man.cache.mutex.unlock(io);

// Save the global one to our local one to avoid locking next time.
man.recent_problematic_timestamp = man.cache.recent_problematic_timestamp;
Expand All @@ -871,11 +877,18 @@ pub const Manifest = struct {
var file = man.cache.manifest_dir.createFile("timestamp", .{
.read = true,
.truncate = true,
}) catch return true;
}) catch |err| switch (err) {
error.Canceled => return error.Canceled,
else => return true,
};
defer file.close();

// Save locally and also save globally (we still hold the global lock).
man.recent_problematic_timestamp = (file.stat() catch return true).mtime;
const stat = file.stat() catch |err| switch (err) {
error.Canceled => return error.Canceled,
else => return true,
};
man.recent_problematic_timestamp = stat.mtime;
man.cache.recent_problematic_timestamp = man.recent_problematic_timestamp;
}

Expand All @@ -902,7 +915,7 @@ pub const Manifest = struct {
.inode = actual_stat.inode,
};

if (self.isProblematicTimestamp(ch_file.stat.mtime)) {
if (try self.isProblematicTimestamp(ch_file.stat.mtime)) {
// The actual file has an unreliable timestamp, force it to be hashed
ch_file.stat.mtime = .zero;
ch_file.stat.inode = 0;
Expand Down Expand Up @@ -1038,7 +1051,7 @@ pub const Manifest = struct {
.contents = null,
};

if (self.isProblematicTimestamp(new_file.stat.mtime)) {
if (try self.isProblematicTimestamp(new_file.stat.mtime)) {
// The actual file has an unreliable timestamp, force it to be hashed
new_file.stat.mtime = .zero;
new_file.stat.inode = 0;
Expand Down
Loading