Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bake.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ def build

Dir.chdir(ext_path) do
system("ruby ./extconf.rb")
system("make")
system("make") if File.exist?("Makefile")
end
end

def clean
ext_path = File.expand_path("ext", __dir__)

Dir.chdir(ext_path) do
system("make clean")
system("make clean") if File.exist?("Makefile")
end
end

Expand Down
1 change: 1 addition & 0 deletions lib/io/event/selector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Copyright, 2021-2026, by Samuel Williams.

require_relative "native"
require_relative "selector/nonblock"
require_relative "selector/select"
require_relative "debug/selector"

Expand Down
11 changes: 9 additions & 2 deletions lib/io/event/selector/nonblock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@ module Selector
# @parameter io [IO] The IO object to operate on.
# @yields {...} The block to execute.
def self.nonblock(io, &block)
io.nonblock(&block)
rescue Errno::EBADF
previous = io.nonblock?
io.nonblock = true
rescue Errno::EBADF, NotImplementedError
# Windows.
yield
else
begin
yield
ensure
io.nonblock = previous
end
end
end
end
58 changes: 56 additions & 2 deletions lib/io/event/selector/select.rb
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ def io_read(fiber, io, buffer, length, offset = 0)

Selector.nonblock(io) do
while true
result = Fiber.blocking{buffer.read(io, 0, offset)}
break if offset >= buffer.size

result = buffer_read(io, buffer, offset)

if result < 0
if length > 0 and again?(result)
Expand Down Expand Up @@ -244,7 +246,9 @@ def io_write(fiber, io, buffer, length, offset = 0)

Selector.nonblock(io) do
while true
result = Fiber.blocking{buffer.write(io, 0, offset)}
break if offset >= buffer.size

result = buffer_write(io, buffer, offset)

if result < 0
if length > 0 and again?(result)
Expand All @@ -265,6 +269,55 @@ def io_write(fiber, io, buffer, length, offset = 0)
return total
end

unless defined?(IO::Buffer) and IO::Buffer.method_defined?(:read) and IO::Buffer.method_defined?(:write)
private def buffer_read(io, buffer, offset)
string = io.read_nonblock(buffer.size - offset)
buffer.set_string(string, offset)
return string.bytesize
rescue EOFError
return 0
rescue IOError
return -Errno::EBADF::Errno
rescue IO::WaitReadable
return EAGAIN
rescue SystemCallError => error
return -error.errno
end

private def buffer_write(io, buffer, offset)
string = buffer.get_string(offset, buffer.size - offset)
return io.write_nonblock(string)
rescue IO::WaitWritable
return EAGAIN
rescue IOError
return -Errno::EBADF::Errno
rescue SystemCallError => error
return -error.errno
end
else
private def buffer_read(io, buffer, offset)
Fiber.blocking{buffer.read(io, 0, offset)}
rescue EOFError
return 0
rescue IOError
return -Errno::EBADF::Errno
rescue IO::WaitReadable
return EAGAIN
rescue SystemCallError => error
return -error.errno
end

private def buffer_write(io, buffer, offset)
Fiber.blocking{buffer.write(io, 0, offset)}
rescue IO::WaitWritable
return EAGAIN
rescue IOError
return -Errno::EBADF::Errno
rescue SystemCallError => error
return -error.errno
end
end

# Wait for a process to change state.
#
# @parameter fiber [Fiber] The fiber to resume after waiting.
Expand Down Expand Up @@ -339,6 +392,7 @@ def select(duration = nil)
# We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR.
Thread.handle_interrupt(::Exception => :on_blocking) do
@blocked = true
duration = 0 unless @ready.empty?
readable, writable, priority = ::IO.select(readable, writable, priority, duration)
rescue ::Exception => error
# Requeue below...
Expand Down
3 changes: 1 addition & 2 deletions test/io/event/interrupt.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
with "test scheduler" do
it "can be used to wake up a fiber blocked in `Thread#join`" do
skip_unless_method_defined(:fork, Process.singleton_class)
skip "Process.fork is not available on JRuby" if RUBY_ENGINE == "jruby"

10.times do
r, w = IO.pipe
Expand All @@ -51,8 +52,6 @@
Fiber.set_scheduler(scheduler)

Fiber.schedule do
selector.dump_state($stderr, label: "interrupt fork before fork") if ENV["IO_EVENT_DIAGNOSTICS"]

pid = Process.fork do
# Child process:
w.write("hello")
Expand Down
19 changes: 15 additions & 4 deletions test/io/event/selector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,30 +69,39 @@ def transfer

with "#wakeup" do
it "can wakeup selector from different thread" do
thread = nil
skip "JRuby scheduling can race this cross-thread timing assertion" if RUBY_ENGINE == "jruby"

duration = 5

thread = Thread.new do
sleep 0.001
selector.wakeup
end

expect do
selector.select(1)
selector.select(duration)
end.to have_duration(be < 1)
ensure
thread.join
thread&.join
end

it "can wakeup selector from different thread twice in a row" do
skip "JRuby scheduling can race this cross-thread timing assertion" if RUBY_ENGINE == "jruby"

duration = 5

2.times do
thread = Thread.new do
sleep 0.001
selector.wakeup
end

expect do
selector.select(1)
selector.select(duration)
end.to have_duration(be < 1)
ensure
thread.join
thread&.join
end
end

Expand Down Expand Up @@ -404,6 +413,8 @@ def transfer
end

it "can handle exception raised during wait from another fiber that was waiting on the same io" do
skip "JRuby does not support transfer back to a fiber currently raising another fiber" if RUBY_ENGINE == "jruby"

[false, true].each do |swapped| # Try both orderings.
writable1 = writable2 = false
error1 = false
Expand Down
98 changes: 52 additions & 46 deletions test/io/event/selector/closed_io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,65 +22,71 @@

it "does not raise when IO is closed from the same fiber before selecting" do
skip_unless_minimum_ruby_version("4")
skip "JRuby does not currently handle this Ruby 4 closed-IO scheduler interaction" if RUBY_ENGINE == "jruby"

thread = Thread.new do
Thread.current.report_on_exception = false

scheduler = IO::Event::TestScheduler.new(selector: subject.new(Fiber.current))
Fiber.set_scheduler(scheduler)

wait_fiber = Fiber.new do
input.wait_readable
rescue IOError
# acceptable: the IO was closed while waiting
begin
Thread.current.report_on_exception = false

scheduler = IO::Event::TestScheduler.new(selector: subject.new(Fiber.current))
Fiber.set_scheduler(scheduler)

wait_fiber = Fiber.new do
input.wait_readable
rescue IOError
# acceptable: the IO was closed while waiting
end

# Close must happen in a separate fiber so that rb_thread_io_close_wait
# can yield (via kernel_sleep) back to the loop fiber instead of deadlocking:
close_fiber = Fiber.new do
input.close
end

wait_fiber.transfer
close_fiber.transfer

scheduler.run
ensure
Fiber.set_scheduler(nil)
scheduler&.close
end

# Close must happen in a separate fiber so that rb_thread_io_close_wait
# can yield (via kernel_sleep) back to the loop fiber instead of deadlocking:
close_fiber = Fiber.new do
input.close
end

wait_fiber.transfer
close_fiber.transfer

scheduler.run
ensure
Fiber.set_scheduler(nil)
scheduler&.close
end

thread.join
end

it "does not raise when IO is closed from another thread while selecting" do
skip_unless_minimum_ruby_version("4")
skip "JRuby does not currently handle this Ruby 4 closed-IO scheduler interaction" if RUBY_ENGINE == "jruby"

thread = Thread.new do
Thread.current.report_on_exception = false

scheduler = IO::Event::TestScheduler.new(selector: subject.new(Fiber.current))
Fiber.set_scheduler(scheduler)

wait_fiber = Fiber.new do
input.wait_readable
rescue IOError
# acceptable: the IO was closed while waiting
end

wait_fiber.transfer

# Close the IO from another thread while the selector is blocking:
closer = Thread.new do
sleep(0.01)
input.close
begin
Thread.current.report_on_exception = false

scheduler = IO::Event::TestScheduler.new(selector: subject.new(Fiber.current))
Fiber.set_scheduler(scheduler)

wait_fiber = Fiber.new do
input.wait_readable
rescue IOError
# acceptable: the IO was closed while waiting
end

wait_fiber.transfer

# Close the IO from another thread while the selector is blocking:
closer = Thread.new do
sleep(0.01)
input.close
end

scheduler.run
ensure
closer&.join
Fiber.set_scheduler(nil)
scheduler&.close
end

scheduler.run
ensure
closer&.join
Fiber.set_scheduler(nil)
scheduler&.close
end

error = nil
Expand Down
1 change: 1 addition & 0 deletions test/io/event/selector/fifo_io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def around(&block)

it "can read and write" do
skip_if_ruby_platform(/mswin|mingw|cygwin/)
skip "JRuby's Java NIO selector does not support FIFO channels" if RUBY_ENGINE == "jruby"

File.mkfifo(path)

Expand Down
7 changes: 6 additions & 1 deletion test/io/event/selector/interruptable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
thread.raise(::Interrupt)

expect{thread.join}.to raise_exception(::Interrupt)
expect(result).to be == 0

# JRuby interrupts the selector, but does not defer the same-thread re-raise
# from within the selector until after this assignment completes.
unless RUBY_ENGINE == "jruby"
expect(result).to be == 0
end
end

with "pipe" do
Expand Down
15 changes: 12 additions & 3 deletions test/io/event/selector/nonblock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,33 @@
require "io/nonblock"
require "io/event/selector"

require "socket"
require "unix_socket"

describe IO::Event::Selector do
with ".nonblock" do
it "makes non-blocking IO" do
executed = false

UNIXSocket.pair do |input, output|
input, output = UNIXSocket.pair

begin
input.nonblock = false
output.nonblock = false

IO::Event::Selector.nonblock(input) do
executed = true

# This does not work on Windows...
unless RUBY_PLATFORM =~ /mswin|mingw|cygwin/
# This does not work on Windows; JRuby's `nonblock?` does not
# reliably reflect the temporary block state for this socket.
unless RUBY_PLATFORM =~ /mswin|mingw|cygwin/ or RUBY_ENGINE == "jruby"
expect(input).to be(:nonblock?)
expect(output).not.to be(:nonblock?)
end
end
ensure
input&.close unless input&.closed?
output&.close unless output&.closed?
end

expect(executed).to be == true
Expand Down
2 changes: 2 additions & 0 deletions test/io/event/selector/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ def object.transfer
end

it "can yield from resumed fiber" do
skip "JRuby does not support this nested Fiber#resume/#transfer interaction" if RUBY_ENGINE == "jruby"

sequence = []

child = Fiber.new do |argument|
Expand Down
2 changes: 2 additions & 0 deletions test/io/event/selector/select.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@

with "#select" do
it "dispatches priority events" do
skip "JRuby does not report TCP out-of-band data as a priority event" if RUBY_ENGINE == "jruby"

server = TCPServer.new("127.0.0.1", 0)
client = TCPSocket.new("127.0.0.1", server.addr[1])
socket = server.accept
Expand Down
Loading
Loading