Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 28 additions & 32 deletions src/Package/Fetch.zig
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,12 @@ const assert = std.debug.assert;
const ascii = std.ascii;
const Allocator = std.mem.Allocator;
const Cache = std.Build.Cache;
const ThreadPool = std.Thread.Pool;
const WaitGroup = std.Thread.WaitGroup;
const git = @import("Fetch/git.zig");
const Package = @import("../Package.zig");
const Manifest = Package.Manifest;
const ErrorBundle = std.zig.ErrorBundle;

arena: std.heap.ArenaAllocator,
io: Io,
location: Location,
location_tok: std.zig.Ast.TokenIndex,
hash_tok: std.zig.Ast.OptionalTokenIndex,
Expand Down Expand Up @@ -104,7 +101,8 @@ pub const LazyStatus = enum {

/// Contains shared state among all `Fetch` tasks.
pub const JobQueue = struct {
mutex: std.Thread.Mutex = .{},
io: Io,
mutex: Io.Mutex = .init,
/// It's an array hash map so that it can be sorted before rendering the
/// dependencies.zig source file.
/// Protected by `mutex`.
Expand All @@ -115,8 +113,7 @@ pub const JobQueue = struct {
all_fetches: std.ArrayList(*Fetch) = .empty,

http_client: *std.http.Client,
thread_pool: *ThreadPool,
wait_group: WaitGroup = .{},
group: Io.Group = .init,
global_cache: Cache.Directory,
/// If true then, no fetching occurs, and:
/// * The `global_cache` directory is assumed to be the direct parent
Expand Down Expand Up @@ -320,13 +317,14 @@ pub const Location = union(enum) {

pub const RunError = error{
OutOfMemory,
Canceled,
/// This error code is intended to be handled by inspecting the
/// `error_bundle` field.
FetchFailed,
};

pub fn run(f: *Fetch) RunError!void {
const io = f.io;
const io = f.job_queue.io;
const eb = &f.error_bundle;
const arena = f.arena.allocator();
const gpa = f.arena.child_allocator;
Expand Down Expand Up @@ -488,7 +486,7 @@ fn runResource(
resource: *Resource,
remote_hash: ?Package.Hash,
) RunError!void {
const io = f.io;
const io = f.job_queue.io;
defer resource.deinit(io);
const arena = f.arena.allocator();
const eb = &f.error_bundle;
Expand Down Expand Up @@ -702,7 +700,8 @@ fn loadManifest(f: *Fetch, pkg_root: Cache.Path) RunError!void {
}

fn queueJobsForDeps(f: *Fetch) RunError!void {
const io = f.io;
const io = f.job_queue.io;

assert(f.job_queue.recursive);

// If the package does not have a build.zig.zon file then there are no dependencies.
Expand All @@ -722,8 +721,8 @@ fn queueJobsForDeps(f: *Fetch) RunError!void {
const prog_names = try parent_arena.alloc([]const u8, deps.len);
var new_fetch_index: usize = 0;

f.job_queue.mutex.lock();
defer f.job_queue.mutex.unlock();
try f.job_queue.mutex.lock(io);
defer f.job_queue.mutex.unlock(io);

try f.job_queue.all_fetches.ensureUnusedCapacity(gpa, new_fetches.len);
try f.job_queue.table.ensureUnusedCapacity(gpa, @intCast(new_fetches.len));
Expand Down Expand Up @@ -792,7 +791,6 @@ fn queueJobsForDeps(f: *Fetch) RunError!void {
f.job_queue.all_fetches.appendAssumeCapacity(new_fetch);
}
new_fetch.* = .{
.io = io,
.arena = std.heap.ArenaAllocator.init(gpa),
.location = location,
.location_tok = dep.location_tok,
Expand Down Expand Up @@ -830,11 +828,9 @@ fn queueJobsForDeps(f: *Fetch) RunError!void {
break :nf .{ new_fetches[0..new_fetch_index], prog_names[0..new_fetch_index] };
};

// Now it's time to give tasks to the thread pool.
const thread_pool = f.job_queue.thread_pool;

// Now it's time to dispatch tasks.
for (new_fetches, prog_names) |*new_fetch, prog_name| {
thread_pool.spawnWg(&f.job_queue.wait_group, workerRun, .{ new_fetch, prog_name });
f.job_queue.group.async(io, workerRun, .{ new_fetch, prog_name });
}
}

Expand All @@ -848,6 +844,7 @@ pub fn workerRun(f: *Fetch, prog_name: []const u8) void {

run(f) catch |err| switch (err) {
error.OutOfMemory => f.oom_flag = true,
error.Canceled => {},
error.FetchFailed => {
// Nothing to do because the errors are already reported in `error_bundle`,
// and a reference is kept to the `Fetch` task inside `all_fetches`.
Expand Down Expand Up @@ -992,7 +989,7 @@ const FileType = enum {
const init_resource_buffer_size = git.Packet.max_data_length;

fn initResource(f: *Fetch, uri: std.Uri, resource: *Resource, reader_buffer: []u8) RunError!void {
const io = f.io;
const io = f.job_queue.io;
const arena = f.arena.allocator();
const eb = &f.error_bundle;

Expand Down Expand Up @@ -1281,12 +1278,16 @@ fn unpackTarball(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) RunError!Unpack
return res;
}

fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutOfMemory, FetchFailed }!UnpackResult {
fn unzip(
f: *Fetch,
out_dir: fs.Dir,
reader: *Io.Reader,
) error{ ReadFailed, OutOfMemory, Canceled, FetchFailed }!UnpackResult {
// We write the entire contents to a file first because zip files
// must be processed back to front and they could be too large to
// load into memory.

const io = f.io;
const io = f.job_queue.io;
const cache_root = f.job_queue.global_cache;
const prefix = "tmp/";
const suffix = ".zip";
Expand All @@ -1306,6 +1307,7 @@ fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutO
.read = true,
}) catch |err| switch (err) {
error.PathAlreadyExists => continue,
error.Canceled => return error.Canceled,
else => |e| return f.fail(
f.location_tok,
try eb.printString("failed to create temporary zip file: {t}", .{e}),
Expand Down Expand Up @@ -1348,7 +1350,7 @@ fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutO
}

fn unpackGitPack(f: *Fetch, out_dir: fs.Dir, resource: *Resource.Git) anyerror!UnpackResult {
const io = f.io;
const io = f.job_queue.io;
const arena = f.arena.allocator();
// TODO don't try to get a gpa from an arena. expose this dependency higher up
// because the backing of arena could be page allocator
Expand Down Expand Up @@ -1486,11 +1488,11 @@ const ComputedHash = struct {
/// hashed* and must not be present on the file system when calling this
/// function.
fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!ComputedHash {
const io = f.job_queue.io;
// All the path name strings need to be in memory for sorting.
const arena = f.arena.allocator();
const gpa = f.arena.child_allocator;
const eb = &f.error_bundle;
const thread_pool = f.job_queue.thread_pool;
const root_dir = pkg_path.root_dir.handle;

// Collect all files, recursively, then sort.
Expand All @@ -1514,10 +1516,8 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
{
// The final hash will be a hash of each file hashed independently. This
// allows hashing in parallel.
var wait_group: WaitGroup = .{};
// `computeHash` is called from a worker thread so there must not be
// any waiting without working or a deadlock could occur.
defer thread_pool.waitAndWork(&wait_group);
var group: Io.Group = .init;
defer group.wait(io);

while (walker.next() catch |err| {
try eb.addRootErrorMessage(.{ .msg = try eb.printString(
Expand All @@ -1542,7 +1542,7 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
.fs_path = fs_path,
.failure = undefined, // to be populated by the worker
};
thread_pool.spawnWg(&wait_group, workerDeleteFile, .{ root_dir, deleted_file });
group.async(io, workerDeleteFile, .{ root_dir, deleted_file });
try deleted_files.append(deleted_file);
continue;
}
Expand Down Expand Up @@ -1570,7 +1570,7 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
.failure = undefined, // to be populated by the worker
.size = undefined, // to be populated by the worker
};
thread_pool.spawnWg(&wait_group, workerHashFile, .{ root_dir, hashed_file });
group.async(io, workerHashFile, .{ root_dir, hashed_file });
try all_files.append(hashed_file);
}
}
Expand Down Expand Up @@ -2241,7 +2241,6 @@ fn saveEmbedFile(comptime tarball_name: []const u8, dir: fs.Dir) !void {

// Builds Fetch with required dependencies, clears dependencies on deinit().
const TestFetchBuilder = struct {
thread_pool: ThreadPool,
http_client: std.http.Client,
global_cache_directory: Cache.Directory,
job_queue: Fetch.JobQueue,
Expand All @@ -2256,13 +2255,12 @@ const TestFetchBuilder = struct {
) !*Fetch {
const cache_dir = try cache_parent_dir.makeOpenPath("zig-global-cache", .{});

try self.thread_pool.init(.{ .allocator = allocator });
self.http_client = .{ .allocator = allocator, .io = io };
self.global_cache_directory = .{ .handle = cache_dir, .path = null };

self.job_queue = .{
.io = io,
.http_client = &self.http_client,
.thread_pool = &self.thread_pool,
.global_cache = self.global_cache_directory,
.recursive = false,
.read_only = false,
Expand All @@ -2273,7 +2271,6 @@ const TestFetchBuilder = struct {

self.fetch = .{
.arena = std.heap.ArenaAllocator.init(allocator),
.io = io,
.location = .{ .path_or_url = path_or_url },
.location_tok = 0,
.hash_tok = .none,
Expand Down Expand Up @@ -2309,7 +2306,6 @@ const TestFetchBuilder = struct {
self.fetch.prog_node.end();
self.global_cache_directory.handle.close();
self.http_client.deinit();
self.thread_pool.deinit();
}

fn packageDir(self: *TestFetchBuilder) !fs.Dir {
Expand Down
14 changes: 5 additions & 9 deletions src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5139,8 +5139,8 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
defer fetch_prog_node.end();

var job_queue: Package.Fetch.JobQueue = .{
.io = io,
.http_client = &http_client,
.thread_pool = &thread_pool,
.global_cache = dirs.global_cache,
.read_only = false,
.recursive = true,
Expand Down Expand Up @@ -5173,7 +5173,6 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)

var fetch: Package.Fetch = .{
.arena = std.heap.ArenaAllocator.init(gpa),
.io = io,
.location = .{ .relative_path = phantom_package_root },
.location_tok = 0,
.hash_tok = .none,
Expand Down Expand Up @@ -5207,10 +5206,8 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
&fetch,
);

job_queue.thread_pool.spawnWg(&job_queue.wait_group, Package.Fetch.workerRun, .{
&fetch, "root",
});
job_queue.wait_group.wait();
job_queue.group.async(io, Package.Fetch.workerRun, .{ &fetch, "root" });
job_queue.group.wait(io);

try job_queue.consolidateErrors();

Expand Down Expand Up @@ -6899,8 +6896,8 @@ fn cmdFetch(
defer global_cache_directory.handle.close();

var job_queue: Package.Fetch.JobQueue = .{
.io = io,
.http_client = &http_client,
.thread_pool = &thread_pool,
.global_cache = global_cache_directory,
.recursive = false,
.read_only = false,
Expand All @@ -6912,7 +6909,6 @@ fn cmdFetch(

var fetch: Package.Fetch = .{
.arena = std.heap.ArenaAllocator.init(gpa),
.io = io,
.location = .{ .path_or_url = path_or_url },
.location_tok = 0,
.hash_tok = .none,
Expand Down Expand Up @@ -6942,7 +6938,7 @@ fn cmdFetch(
defer fetch.deinit();

fetch.run() catch |err| switch (err) {
error.OutOfMemory => fatal("out of memory", .{}),
error.OutOfMemory, error.Canceled => |e| return e,
error.FetchFailed => {}, // error bundle checked below
};

Expand Down