Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions go/python/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ func (Adapter) DefaultExe() string {
}

func (Adapter) LaunchArgs(forwarded []string) []string {
return append([]string{"-u", "-q", "-i"}, forwarded...)
args := append([]string{"-u", "-q", "-i"}, forwarded...)
return append(args, "-c", bootstrapExec())
}

func bootstrapExec() string {
return fmt.Sprintf(`exec(bytes.fromhex("%s").decode())`, hex.EncodeToString([]byte(runtimeSource)))
}

// SessionKey: Python has no project notion; the interpreter path is the env.
Expand All @@ -33,9 +38,7 @@ func (a Adapter) SessionKey(exe string, _ []string) string {
return exe
}

func (Adapter) BootstrapStmt() string {
return fmt.Sprintf(`exec(bytes.fromhex("%s").decode())`, hex.EncodeToString([]byte(runtimeSource)))
}
func (Adapter) BootstrapStmt() string { return "" }

func (Adapter) WrapEval(hexCode string, _ bool) string {
return fmt.Sprintf(`_repld_run("%s")`, hexCode)
Expand All @@ -59,8 +62,6 @@ finally:
}

func (Adapter) SentinelStmt(sentinel string) string {
// Assign write() results to _ so the interactive interpreter never echoes the
// char counts (the no-op displayhook isn't set yet during startup drain).
return fmt.Sprintf(`import sys as _s; _ = _s.stderr.write("%s\n"); _s.stderr.flush(); _ = _s.stdout.write("%s\n"); _s.stdout.flush()`, sentinel, sentinel)
}

Expand Down
86 changes: 59 additions & 27 deletions go/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,6 @@ func (s *Session) start(exe string, workDir string) error {

var startup []startupChunk
capture := func(data string, isStderr bool) {
if s.lang == "python" && isStderr {
data = stripPythonPrompts(data)
if data == "" {
return
}
}
startup = append(startup, startupChunk{data: data, isStderr: isStderr})
}
if _, err := s.executeRaw("", capture, false, startupTimeout); err != nil {
Expand Down Expand Up @@ -198,13 +192,6 @@ func (s *Session) start(exe string, workDir string) error {
return nil
}

func stripPythonPrompts(data string) string {
for strings.HasSuffix(data, ">>> ") || strings.HasSuffix(data, "... ") {
data = data[:len(data)-4]
}
return data
}

type controlWinner struct {
conn net.Conn
br *bufio.Reader
Expand Down Expand Up @@ -309,34 +296,79 @@ func parseControlLine(line string) *evalError {
return &evalError{short: short, smart: smart, full: full}
}

func sentinelOverlap(buf []byte, sentinel string) int {
n := len(buf)
if n > len(sentinel) {
n = len(sentinel)
}
for k := n; k > 0; k-- {
if string(buf[len(buf)-k:]) == sentinel[:k] {
return k
}
}
return 0
}

// Streams output until the closing sentinel line.
// Only the trailing run of bytes that could begin the sentinel is held back.
func (s *Session) scanToSentinel(r *bufio.Reader, isStderr bool, emit func(string, bool)) (tail string, err error) {
const maxTail = 64 * 1024
var buf []byte
keep := func(s string) {
buf = append(buf, s...)
send := func(b []byte) {
if len(b) == 0 {
return
}
buf = append(buf, b...)
if len(buf) > maxTail {
buf = append(buf[:0], buf[len(buf)-maxTail:]...)
}
if emit != nil {
emit(string(b), isStderr)
}
}

var line []byte // bytes since the last '\n' (newline excluded)
sent := 0 // count of `line` already streamed

for {
line, rerr := r.ReadString('\n')
raw := strings.TrimRight(line, "\r\n")
if strings.HasSuffix(raw, s.sentinel) {
if prefix := strings.TrimSuffix(raw, s.sentinel); prefix != "" {
keep(prefix)
if emit != nil {
emit(prefix, isStderr)
b, rerr := r.ReadByte()
var chunk []byte
if rerr == nil {
chunk = append(chunk, b)
if n := r.Buffered(); n > 0 {
more := make([]byte, n)
m, _ := io.ReadFull(r, more)
chunk = append(chunk, more[:m]...)
}
}

for _, c := range chunk {
if c != '\n' {
line = append(line, c)
continue
}
raw := strings.TrimRight(string(line), "\r")
if strings.HasSuffix(raw, s.sentinel) {
if prefix := raw[:len(raw)-len(s.sentinel)]; sent < len(prefix) {
send([]byte(prefix[sent:]))
}
return string(buf), nil
}
return string(buf), nil
send(line[sent:])
send([]byte{'\n'})
line = line[:0]
sent = 0
}

if safe := len(line) - sentinelOverlap(line, s.sentinel); safe > sent {
send(line[sent:safe])
sent = safe
}

if rerr != nil {
send(line[sent:])
return string(buf), rerr
}
keep(line)
if emit != nil {
emit(line, isStderr)
}
}
}

Expand Down
102 changes: 102 additions & 0 deletions go/session_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package main

import (
"bufio"
"io"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestScanToSentinel_StreamsPartialLine(t *testing.T) {
s := &Session{sentinel: "__END__"}
pr, pw := io.Pipe()

var mu sync.Mutex
var got []string
emit := func(data string, _ bool) { mu.Lock(); got = append(got, data); mu.Unlock() }
joined := func() string { mu.Lock(); defer mu.Unlock(); return strings.Join(got, "") }

done := make(chan string, 1)
go func() {
tail, _ := s.scanToSentinel(bufio.NewReader(pr), false, emit)
done <- tail
}()

_, err := pw.Write([]byte("DOT"))
require.NoError(t, err)
// Streams before any newline exists in the stream.
require.Eventually(t, func() bool { return joined() == "DOT" }, time.Second, 5*time.Millisecond,
"partial line should stream before the sentinel arrives")

// Sentinel appended directly onto the same partial line must terminate the
// scan and never leak into user output.
_, err = pw.Write([]byte("__END__\n"))
require.NoError(t, err)
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("scan did not return after sentinel")
}
require.Equal(t, "DOT", joined(), "sentinel must be stripped, not emitted")
}

// partial sentinel never reaches users, while everything before it streams
func TestScanToSentinel_HoldsBackSentinelPrefix(t *testing.T) {
s := &Session{sentinel: "__END__"}
pr, pw := io.Pipe()

var mu sync.Mutex
var got []string
emit := func(data string, _ bool) { mu.Lock(); got = append(got, data); mu.Unlock() }
joined := func() string { mu.Lock(); defer mu.Unlock(); return strings.Join(got, "") }

done := make(chan string, 1)
go func() {
tail, _ := s.scanToSentinel(bufio.NewReader(pr), false, emit)
done <- tail
}()

_, err := pw.Write([]byte("AB__EN"))
require.NoError(t, err)
require.Eventually(t, func() bool { return joined() == "AB" }, time.Second, 5*time.Millisecond)
// Give a beat to ensure the held-back prefix is not flushed late.
time.Sleep(50 * time.Millisecond)
require.Equal(t, "AB", joined(), "sentinel prefix must not leak")

_, err = pw.Write([]byte("D__\n")) // completes the sentinel
require.NoError(t, err)
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("scan did not return after sentinel")
}
require.Equal(t, "AB", joined())
}

func TestScanToSentinel_LinesAndTail(t *testing.T) {
s := &Session{sentinel: "__END__"}
r := bufio.NewReader(strings.NewReader("alpha\nbeta\n__END__\n"))

var got []string
emit := func(data string, _ bool) { got = append(got, data) }
tail, err := s.scanToSentinel(r, false, emit)
require.NoError(t, err)
require.Equal(t, "alpha\nbeta\n", strings.Join(got, ""))
require.Equal(t, "alpha\nbeta\n", tail)
}

func TestScanToSentinel_EOFFlushesPartial(t *testing.T) {
s := &Session{sentinel: "__END__"}
r := bufio.NewReader(strings.NewReader("partial-no-newline"))

var got []string
emit := func(data string, _ bool) { got = append(got, data) }
tail, err := s.scanToSentinel(r, false, emit)
require.ErrorIs(t, err, io.EOF)
require.Equal(t, "partial-no-newline", strings.Join(got, ""))
require.Equal(t, "partial-no-newline", tail)
}
6 changes: 4 additions & 2 deletions skills/repld/references/julia.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ repld --fresh julia -t 4 -E 'Threads.nthreads()'

## Revise

When `Revise` is available, repld loads it at runtime startup and calls `Revise.revise()` before each eval. After editing package code, expect definitions to update in the warm session.
When `Revise` is available, repld loads it and calls `Revise.revise()` before each eval. Tracking depends on load path: dev'd packages pick up method and `const`/global changes. `includet`'d files only patch method unless files/modules set `__revise_mode__ = :eval`.

Use `--fresh` for untrackable changes, such as:

- Struct/type redefinition.
- `using NewPkg` inside modules whose `Project.toml` did not list `NewPkg` when session was created.

Expand All @@ -27,4 +29,4 @@ Use `--fresh` for untrackable changes, such as:
```bash
repld --trace full julia -e 'error("boom")'
repld trace --trace smart julia # show last saved traceback, no rerun
```
```
Loading