From d20a7c53cd860ef11f4f2ed2a0c95fb5eab84f97 Mon Sep 17 00:00:00 2001 From: Rustem Kamalov Date: Fri, 12 Jun 2026 04:24:06 +0300 Subject: [PATCH 1/4] fix: centralize engine panic recovery in resilient layer A rod panic during SearchImage propagated uncaught into fasthttp and killed the whole process: per-engine recover blocks only covered Search (5 of 6 engines had no recovery on SearchImage), and there was no Fiber recover middleware. Regression test: a panicking SearchImage returns 502 engine_internal and the server keeps serving. --- baidu/search.go | 6 ---- baidu/search_raw.go | 6 ---- bing/search.go | 6 ---- core/resilient.go | 27 ++++++++++----- core/server.go | 5 +++ core/server_panic_test.go | 71 +++++++++++++++++++++++++++++++++++++++ duckduckgo/search.go | 6 ---- ecosia/search.go | 14 -------- ecosia/search_raw.go | 6 ---- google/search.go | 7 ---- google/search_raw.go | 6 ---- yandex/search.go | 6 ---- yandex/search_raw.go | 6 ---- 13 files changed, 94 insertions(+), 78 deletions(-) create mode 100644 core/server_panic_test.go diff --git a/baidu/search.go b/baidu/search.go index 69f2b92..503e19e 100644 --- a/baidu/search.go +++ b/baidu/search.go @@ -87,12 +87,6 @@ func (baid *Baidu) Search(ctx context.Context, query core.Query) (results []core baid = &scoped baid.logger.Debug("Starting search, query: %+v", query) - defer func() { - if recovered := recover(); recovered != nil { - err = core.RecoverEnginePanicWithContext(ctx, baid.Name(), recovered, baid.logger) - results = nil - } - }() // Build URL from query struct to open in browser url, err := BuildURL(query) diff --git a/baidu/search_raw.go b/baidu/search_raw.go index 26afdb0..d3a365b 100644 --- a/baidu/search_raw.go +++ b/baidu/search_raw.go @@ -26,12 +26,6 @@ func classifyBaiduRawHTML(body []byte) error { func Search(ctx context.Context, query core.Query) (results []core.SearchResult, err error) { ctx = core.PrepareEngineContext(ctx, query, "baidu", false) - defer func() { - if recovered := recover(); recovered != nil { - err = core.RecoverEnginePanicWithContext(ctx, "baidu", recovered, nil) - results = nil - } - }() searchURL, err := BuildURL(query) if err != nil { diff --git a/bing/search.go b/bing/search.go index c69a092..f519abe 100644 --- a/bing/search.go +++ b/bing/search.go @@ -151,12 +151,6 @@ func (bing *Bing) Search(ctx context.Context, query core.Query) (results []core. bing = &scoped bing.logger.Debug("Starting search, query: %+v", query) - defer func() { - if recovered := recover(); recovered != nil { - err = core.RecoverEnginePanicWithContext(ctx, bing.Name(), recovered, bing.logger) - results = nil - } - }() searchResults := []core.SearchResult{} diff --git a/core/resilient.go b/core/resilient.go index 8de16ba..6ba0ecb 100644 --- a/core/resilient.go +++ b/core/resilient.go @@ -202,16 +202,8 @@ func (rs *ResilientSearcher) searchWithProtection(ctx context.Context, engine Se attemptMeta.Used = MaskProxyURL(proxyURL) } - var ( - results []SearchResult - err error - ) requestCtx := proxyRequestContext(callCtx, engine.Name(), attemptQuery) - if isImage { - results, err = engine.SearchImage(requestCtx, attemptQuery) - } else { - results, err = engine.Search(requestCtx, attemptQuery) - } + results, err := invokeEngine(requestCtx, engine, attemptQuery, isImage) if reportToRegistry { rs.reportProxyAttempt(engineCtx, proxyURL, err) @@ -241,6 +233,23 @@ func (rs *ResilientSearcher) searchWithProtection(ctx context.Context, engine Se return result.Results, attemptMeta, nil } +// invokeEngine is the single panic-recovery point for every engine call made +// through the resilient pipeline (browser, raw, and any future engine method). +// A rod/CDP panic surfaces as ErrEngineInternal instead of killing the process. +func invokeEngine(ctx context.Context, engine SearchEngine, q Query, isImage bool) (results []SearchResult, err error) { + defer func() { + if recovered := recover(); recovered != nil { + results = nil + err = RecoverEnginePanicWithContext(ctx, engine.Name(), recovered, nil) + } + }() + + if isImage { + return engine.SearchImage(ctx, q) + } + return engine.Search(ctx, q) +} + // SearchAllParallel applies retry/circuit protections per engine for mega search. // Returns results, list of engines that responded, and list of engines that failed. func (rs *ResilientSearcher) SearchAllParallel(ctx context.Context, q Query, engines []SearchEngine) ([]MegaSearchResult, []string, []string) { diff --git a/core/server.go b/core/server.go index c4dec82..271e968 100644 --- a/core/server.go +++ b/core/server.go @@ -18,6 +18,7 @@ import ( "time" "github.com/gofiber/fiber/v2" + fiberrecover "github.com/gofiber/fiber/v2/middleware/recover" browserprofile "github.com/karust/openserp/core/browser" "github.com/karust/openserp/core/fpcheck" "github.com/karust/openserp/core/fpcheck/detectors" @@ -155,6 +156,10 @@ func NewServerWithOptions(host string, port int, opts ServerOptions, searchEngin }).Info("Response cache enabled") } + // Defense-in-depth: engine panics are recovered in the resilient layer + // (invokeEngine); this catches panics in handlers that bypass it (parse, + // extract, stats) so the process survives. + app.Use(fiberrecover.New(fiberrecover.Config{EnableStackTrace: true})) app.Use(RequestContextMiddleware()) if opts.EnableCORS { app.Use(CORSMiddleware(opts.CORS)) diff --git a/core/server_panic_test.go b/core/server_panic_test.go new file mode 100644 index 0000000..070493e --- /dev/null +++ b/core/server_panic_test.go @@ -0,0 +1,71 @@ +package core + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "testing" +) + +// FP-1: panics from engine code must be converted to ErrEngineInternal by the +// central recovery in the resilient layer (invokeEngine) instead of killing +// the process. SearchImage had no per-engine recovery in 5 of 6 engines. +func TestInvokeEngineRecoversPanics(t *testing.T) { + engine := &engineMock{ + name: "google", + initialized: true, + searchFn: func(_ context.Context, _ Query) ([]SearchResult, error) { + panic("rod: page crashed") + }, + imageFn: func(_ context.Context, _ Query) ([]SearchResult, error) { + panic("rod: object not found") + }, + } + + for _, isImage := range []bool{false, true} { + results, err := invokeEngine(context.Background(), engine, Query{Text: "golang"}, isImage) + if results != nil { + t.Fatalf("isImage=%v: expected nil results after panic, got %v", isImage, results) + } + if !errors.Is(err, ErrEngineInternal) { + t.Fatalf("isImage=%v: expected ErrEngineInternal, got %v", isImage, err) + } + } +} + +func TestPanickingSearchImageReturns502AndServerSurvives(t *testing.T) { + engine := &engineMock{ + name: "google", + initialized: true, + imageFn: func(_ context.Context, _ Query) ([]SearchResult, error) { + panic("rod: page crashed") + }, + } + + opts := DefaultServerOptions() + opts.Resilience.Retry.MaxRetries = 0 + srv := NewServerWithOptions("127.0.0.1", 7130, opts, engine) + + req := httptest.NewRequest(http.MethodGet, "/google/image?text=golang", nil) + resp, err := srv.app.Test(req, -1) + if err != nil { + t.Fatalf("image request failed: %v", err) + } + if resp.StatusCode != http.StatusBadGateway { + t.Fatalf("expected 502 for panicking SearchImage, got %d", resp.StatusCode) + } + var payload JSONErrorResponse + if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { + t.Fatalf("decode error response: %v", err) + } + if payload.Error != "engine_internal" { + t.Fatalf("expected error=engine_internal, got %q", payload.Error) + } + + second := request(t, srv, "/google/search?text=golang") + if second.StatusCode != http.StatusOK { + t.Fatalf("expected server to keep serving after panic, got %d", second.StatusCode) + } +} diff --git a/duckduckgo/search.go b/duckduckgo/search.go index b0e8bfd..43a73c9 100644 --- a/duckduckgo/search.go +++ b/duckduckgo/search.go @@ -166,12 +166,6 @@ func (ddg *DuckDuckGo) Search(ctx context.Context, query core.Query) (results [] ddg = &scoped ddg.logger.Debug("Starting search, query: %+v", query) - defer func() { - if recovered := recover(); recovered != nil { - err = core.RecoverEnginePanicWithContext(ctx, ddg.Name(), recovered, ddg.logger) - results = nil - } - }() allResults := []core.SearchResult{} var pageFeatures []core.SerpFeature diff --git a/ecosia/search.go b/ecosia/search.go index 0cbb2d3..ce6c38a 100644 --- a/ecosia/search.go +++ b/ecosia/search.go @@ -131,13 +131,6 @@ func (e *Ecosia) Search(ctx context.Context, query core.Query) (results []core.S e = &scoped e.logger.Debug("Starting search, query: %+v", query) - defer func() { - if recovered := recover(); recovered != nil { - err = core.RecoverEnginePanicWithContext(ctx, e.Name(), recovered, e.logger) - results = nil - } - }() - // nextRank counts up across pages for organic results; nextAdRank counts // up within sponsored results so ad rank stays separate from SEO rank. all := []core.SearchResult{} @@ -288,13 +281,6 @@ func (e *Ecosia) SearchImage(ctx context.Context, query core.Query) (results []c e = &scoped e.logger.Debug("Starting image search, query: %+v", query) - defer func() { - if recovered := recover(); recovered != nil { - err = core.RecoverEnginePanicWithContext(ctx, e.Name(), recovered, e.logger) - results = nil - } - }() - out := []core.SearchResult{} pageNum := 0 nextRank := 1 diff --git a/ecosia/search_raw.go b/ecosia/search_raw.go index c1d8de3..774c9de 100644 --- a/ecosia/search_raw.go +++ b/ecosia/search_raw.go @@ -72,12 +72,6 @@ func imageResultParser(response *http.Response) ([]core.SearchResult, error) { func Search(ctx context.Context, query core.Query) (results []core.SearchResult, err error) { ctx = core.PrepareEngineContext(ctx, query, "ecosia", false) - defer func() { - if recovered := recover(); recovered != nil { - err = core.RecoverEnginePanicWithContext(ctx, "ecosia", recovered, nil) - results = nil - } - }() pageNum, startRank, err := startPage(query.Start) if err != nil { diff --git a/google/search.go b/google/search.go index 3733393..4b97267 100644 --- a/google/search.go +++ b/google/search.go @@ -189,13 +189,6 @@ func (gogl *Google) Search(ctx context.Context, query core.Query) (results []cor gogl = &scoped gogl.logger.Debug("Starting search, query: %+v", query) - defer func() { - if recovered := recover(); recovered != nil { - err = core.RecoverEnginePanicWithContext(ctx, gogl.Name(), recovered, gogl.logger) - results = nil - } - }() - searchResults := []core.SearchResult{} // Build URL from query struct to open in browser diff --git a/google/search_raw.go b/google/search_raw.go index 6950bbd..bd1b717 100644 --- a/google/search_raw.go +++ b/google/search_raw.go @@ -128,12 +128,6 @@ func classifyGoogleRawHTML(body []byte) error { func Search(ctx context.Context, query core.Query) (results []core.SearchResult, err error) { ctx = core.PrepareEngineContext(ctx, query, "google", false) - defer func() { - if recovered := recover(); recovered != nil { - err = core.RecoverEnginePanicWithContext(ctx, "google", recovered, nil) - results = nil - } - }() googleURL, err := BuildURL(query) if err != nil { diff --git a/yandex/search.go b/yandex/search.go index f6b9c42..5ea406b 100644 --- a/yandex/search.go +++ b/yandex/search.go @@ -185,12 +185,6 @@ func (yand *Yandex) Search(ctx context.Context, query core.Query) (results []cor yand = &scoped yand.logger.Debug("Starting search, query: %+v", query) - defer func() { - if recovered := recover(); recovered != nil { - err = core.RecoverEnginePanicWithContext(ctx, yand.Name(), recovered, yand.logger) - results = nil - } - }() if query.Start < 0 { return nil, fmt.Errorf("incorrect start provided") } diff --git a/yandex/search_raw.go b/yandex/search_raw.go index 9af173c..3195226 100644 --- a/yandex/search_raw.go +++ b/yandex/search_raw.go @@ -26,12 +26,6 @@ func classifyYandexRawHTML(body []byte) error { func Search(ctx context.Context, query core.Query) (results []core.SearchResult, err error) { ctx = core.PrepareEngineContext(ctx, query, "yandex", false) - defer func() { - if recovered := recover(); recovered != nil { - err = core.RecoverEnginePanicWithContext(ctx, "yandex", recovered, nil) - results = nil - } - }() startPage, skipOnFirstPage, err := core.ComputePagination(query.Start, 10) if err != nil { From 665f09266a98f454e3cef0534687a36ded9583b2 Mon Sep 17 00:00:00 2001 From: Rustem Kamalov Date: Fri, 12 Jun 2026 04:40:31 +0300 Subject: [PATCH 2/4] fix: derive a per-request deadline from the retry budget. Add `RequestTimeout` config that bounds wall-clock time of any request that does not manage its own deadline budget. It is derived from the engine timeout and retry budget via RequestTimeoutForRetries. --- .gitignore | 1 + cmd/serve.go | 16 +- config.yaml | 4 +- core/middleware.go | 16 ++ core/retry.go | 27 +++ core/server.go | 15 +- core/server_timeout_test.go | 146 ++++++++++++++++ docs/PROXY_SETUP.md | 322 ++++++++++++++++++++++++++++++++++++ 8 files changed, 539 insertions(+), 8 deletions(-) create mode 100644 core/server_timeout_test.go create mode 100644 docs/PROXY_SETUP.md diff --git a/.gitignore b/.gitignore index aca7193..55be0d2 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,4 @@ sanitize_html_fixtures.py core/testdata/* .tmp-gocache .tmpcache/ +google/data/geotargets-2026-05-28.csv diff --git a/cmd/serve.go b/cmd/serve.go index a58817d..86b28bd 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -151,6 +151,14 @@ func buildFingerprintBrowserOptions() core.BrowserOpts { } func buildServerOptions(corsCfg core.CORSConfig, proxyCfg core.ProxyConfig, fingerprintBrowserOpts core.BrowserOpts) core.ServerOptions { + retryCfg := core.RetryConfig{ + MaxRetries: config.Resilience.MaxRetries, + InitialBackoff: 1 * time.Second, + MaxBackoff: 30 * time.Second, + BackoffFactor: 2.0, + } + engineTimeout := time.Duration(config.App.Timeout) * time.Second + return core.ServerOptions{ CacheTTL: time.Duration(config.Cache.TTLSeconds) * time.Second, CacheMaxSize: config.Cache.MaxSize, @@ -161,14 +169,10 @@ func buildServerOptions(corsCfg core.CORSConfig, proxyCfg core.ProxyConfig, fing FingerprintArtifactDir: core.DefaultFingerprintArtifactDir, FingerprintBrowserOpts: fingerprintBrowserOpts, MegaTimeout: config.App.MegaTimeout, + RequestTimeout: core.RequestTimeoutForRetries(engineTimeout, retryCfg), Extract: config.Extract, Resilience: core.ResilientConfig{ - Retry: core.RetryConfig{ - MaxRetries: config.Resilience.MaxRetries, - InitialBackoff: 1 * time.Second, - MaxBackoff: 30 * time.Second, - BackoffFactor: 2.0, - }, + Retry: retryCfg, CircuitBreaker: core.CircuitBreakerConfig{ FailureThreshold: config.CircuitBreaker.Failures, RecoveryTimeout: time.Duration(config.CircuitBreaker.RecoverySeconds) * time.Second, diff --git a/config.yaml b/config.yaml index bac3be4..bd0f051 100644 --- a/config.yaml +++ b/config.yaml @@ -8,7 +8,7 @@ server: app: log_format: "text" # json|text - timeout: 15 # Browser/search timeout in seconds + timeout: 15 # Browser/search timeout in seconds, per attempt; the request deadline is derived from this x retries browser_path: "" # Custom browser binary path (chrome/chromium/edge..) profiles: "" # Overriding built-in browser profiles head: false # Headful mode @@ -18,6 +18,8 @@ app: idle_ttl: 5m # close a Chrome that has not served traffic for this long mega_timeout: 90s # max total wait for /mega/* requests; slow engines return partial results + block_trackers: true + block_resources: "image,font,css,media" extract: enabled: true diff --git a/core/middleware.go b/core/middleware.go index 64549b8..dd407c2 100644 --- a/core/middleware.go +++ b/core/middleware.go @@ -1,6 +1,7 @@ package core import ( + "context" "fmt" "strings" "time" @@ -62,6 +63,21 @@ func RequestContextMiddleware() fiber.Handler { } } +// RequestTimeoutMiddleware bounds wall-clock time per request by attaching a +// deadline to the user context, which fasthttp never cancels on client +// disconnect. /mega/* (MegaTimeout) and /extract (batch budget) are exempt. +func RequestTimeoutMiddleware(timeout time.Duration) fiber.Handler { + return func(c *fiber.Ctx) error { + if strings.HasPrefix(c.Path(), "/mega/") || c.Path() == "/extract" { + return c.Next() + } + ctx, cancel := context.WithTimeout(c.UserContext(), timeout) + defer cancel() + c.SetUserContext(ctx) + return c.Next() + } +} + func CORSMiddleware(cfg CORSConfig) fiber.Handler { cfg = normalizeCORSConfig(cfg) diff --git a/core/retry.go b/core/retry.go index 2ae4543..1127211 100644 --- a/core/retry.go +++ b/core/retry.go @@ -98,6 +98,33 @@ func RetryableSearch(ctx context.Context, cfg RetryConfig, engineName string, se } } +// requestTimeoutSlack covers per-request pipeline overhead that happens +// outside engine attempts: rate-limiter waits, proxy selection, parsing. +const requestTimeoutSlack = 5 * time.Second + +// RequestTimeoutForRetries derives a per-request deadline that a healthy +// request exhausting its full retry budget cannot hit: worst-case attempts +// (each bounded by attemptTimeout) plus worst-case jittered backoffs plus +// slack. Used as the server-wide request timeout so raising MaxRetries or +// the engine timeout never silently truncates retries. +func RequestTimeoutForRetries(attemptTimeout time.Duration, cfg RetryConfig) time.Duration { + if cfg.BackoffFactor <= 0 { + cfg.BackoffFactor = 2.0 + } + + budget := time.Duration(cfg.MaxRetries+1) * attemptTimeout + for attempt := 1; attempt <= cfg.MaxRetries; attempt++ { + // calculateBackoff jitters by (0.5 + rand[0,1)), so 1.5x is the worst + // case before the MaxBackoff cap. + worst := time.Duration(1.5 * float64(cfg.InitialBackoff) * math.Pow(cfg.BackoffFactor, float64(attempt-1))) + if worst > cfg.MaxBackoff || worst < 0 { + worst = cfg.MaxBackoff + } + budget += worst + } + return budget + requestTimeoutSlack +} + var nonRetryableSentinels = []struct { err error reason string diff --git a/core/server.go b/core/server.go index 271e968..8009fff 100644 --- a/core/server.go +++ b/core/server.go @@ -90,6 +90,13 @@ type ServerOptions struct { // as failed with a context-deadline error. Zero disables the bound // (legacy behavior — wait until the slowest engine finishes). MegaTimeout time.Duration + // RequestTimeout bounds wall-clock time of any request that does not + // manage its own deadline budget (/mega/* uses MegaTimeout, /extract has + // a per-batch budget). Exceeding it returns 504 request_timeout. Zero + // disables the bound. The serve command derives it from the engine + // timeout and retry budget via RequestTimeoutForRetries instead of + // exposing a separate config knob. + RequestTimeout time.Duration // BrowserResolver returns a pooled browser for rendered extraction. BrowserResolver BrowserResolver // Extract configures the URL extraction endpoint and search enrichment. @@ -115,7 +122,10 @@ func DefaultServerOptions() ServerOptions { }, Resilience: DefaultResilientConfig(), MegaTimeout: 90 * time.Second, - Extract: extractpkg.DefaultConfig(), + // 30s matches the default engine timeout (app.timeout); the serve + // command re-derives from the configured values. + RequestTimeout: RequestTimeoutForRetries(30*time.Second, DefaultRetryConfig()), + Extract: extractpkg.DefaultConfig(), } } @@ -161,6 +171,9 @@ func NewServerWithOptions(host string, port int, opts ServerOptions, searchEngin // extract, stats) so the process survives. app.Use(fiberrecover.New(fiberrecover.Config{EnableStackTrace: true})) app.Use(RequestContextMiddleware()) + if opts.RequestTimeout > 0 { + app.Use(RequestTimeoutMiddleware(opts.RequestTimeout)) + } if opts.EnableCORS { app.Use(CORSMiddleware(opts.CORS)) } diff --git a/core/server_timeout_test.go b/core/server_timeout_test.go new file mode 100644 index 0000000..709103c --- /dev/null +++ b/core/server_timeout_test.go @@ -0,0 +1,146 @@ +package core + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/gofiber/fiber/v2" +) + +// FP-2: every endpoint that doesn't manage its own deadline budget must get +// one from RequestTimeoutMiddleware; /mega/* (MegaTimeout) and /extract +// (batch budget) keep theirs. +func TestRequestTimeoutMiddlewareSetsDeadlineExceptBudgetedPaths(t *testing.T) { + app := fiber.New() + app.Use(RequestTimeoutMiddleware(time.Minute)) + + deadlines := map[string]bool{} + // c.Path() aliases fasthttp's reusable buffer, so capture the path + // explicitly instead of using it as a map key after the request ends. + record := func(path string) fiber.Handler { + return func(c *fiber.Ctx) error { + _, ok := c.UserContext().Deadline() + deadlines[path] = ok + return c.SendStatus(http.StatusOK) + } + } + app.Get("/google/search", record("/google/search")) + app.Post("/google/parse", record("/google/parse")) + app.Get("/mega/search", record("/mega/search")) + app.Get("/extract", record("/extract")) + + for path, method := range map[string]string{ + "/google/search": http.MethodGet, + "/google/parse": http.MethodPost, + "/mega/search": http.MethodGet, + "/extract": http.MethodGet, + } { + req := httptest.NewRequest(method, path, nil) + resp, err := app.Test(req, -1) + if err != nil { + t.Fatalf("request failed for %s: %v", path, err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("%s: handler did not run, status %d", path, resp.StatusCode) + } + } + + for path, want := range map[string]bool{ + "/google/search": true, + "/google/parse": true, + "/mega/search": false, + "/extract": false, + } { + if deadlines[path] != want { + t.Errorf("%s: deadline attached = %v, want %v", path, deadlines[path], want) + } + } +} + +// The derived deadline must never truncate a healthy request that exhausts +// its full retry budget: attempts + worst-case jittered backoffs + slack. +func TestRequestTimeoutForRetriesCoversFullRetryBudget(t *testing.T) { + cases := []struct { + name string + attemptTimeout time.Duration + cfg RetryConfig + want time.Duration + }{ + { + name: "defaults: 4x15s attempts + (1.5+3+6)s backoff + 5s slack", + attemptTimeout: 15 * time.Second, + cfg: DefaultRetryConfig(), + want: 75500 * time.Millisecond, + }, + { + name: "no retries: one attempt + slack", + attemptTimeout: 15 * time.Second, + cfg: RetryConfig{MaxRetries: 0}, + want: 20 * time.Second, + }, + { + name: "backoffs capped at MaxBackoff", + attemptTimeout: 10 * time.Second, + cfg: RetryConfig{ + MaxRetries: 2, + InitialBackoff: time.Minute, + MaxBackoff: 2 * time.Second, + BackoffFactor: 2.0, + }, + want: 30*time.Second + 4*time.Second + 5*time.Second, + }, + } + + for _, tc := range cases { + if got := RequestTimeoutForRetries(tc.attemptTimeout, tc.cfg); got != tc.want { + t.Errorf("%s: got %s, want %s", tc.name, got, tc.want) + } + } +} + +func TestHungEngineReturns504RequestTimeoutAndServerSurvives(t *testing.T) { + hung := func(ctx context.Context, _ Query) ([]SearchResult, error) { + <-ctx.Done() + return nil, ctx.Err() + } + engine := &engineMock{ + name: "google", + initialized: true, + searchFn: hung, + imageFn: hung, + } + + opts := DefaultServerOptions() + opts.RequestTimeout = 100 * time.Millisecond + opts.Resilience.Retry.MaxRetries = 0 + srv := NewServerWithOptions("127.0.0.1", 7140, opts, engine) + + for _, path := range []string{"/google/search?text=golang", "/google/image?text=golang"} { + started := time.Now() + resp := request(t, srv, path) + if resp.StatusCode != http.StatusGatewayTimeout { + t.Fatalf("%s: expected 504 for hung engine, got %d", path, resp.StatusCode) + } + if elapsed := time.Since(started); elapsed > 5*time.Second { + t.Fatalf("%s: request took %s, deadline did not bound it", path, elapsed) + } + var payload JSONErrorResponse + if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { + t.Fatalf("%s: decode error response: %v", path, err) + } + if payload.Error != "request_timeout" { + t.Fatalf("%s: expected error=request_timeout, got %q", path, payload.Error) + } + } + + // Server keeps serving after timed-out requests. + engine.searchFn = nil + second := request(t, srv, "/google/search?text=golang") + if second.StatusCode != http.StatusOK { + t.Fatalf("expected server to keep serving after timeouts, got %d", second.StatusCode) + } +} diff --git a/docs/PROXY_SETUP.md b/docs/PROXY_SETUP.md new file mode 100644 index 0000000..4a3be1b --- /dev/null +++ b/docs/PROXY_SETUP.md @@ -0,0 +1,322 @@ +# Proxy Setup + +This document is for the **backend engineer building the balancer** in front of OpenSERP. It explains the request/response contract for the SaaS `X-Proxy-URL` path, what each header does, what errors mean, and how to size and operate workers. + +If you only need a self-contained OpenSERP with locally configured proxies, scroll to [Configured Proxies (no balancer)](#configured-proxies-no-balancer). + +## Architecture + +```text +client ─▶ your balancer ─▶ OpenSERP worker ─▶ search engine + owns: applies: + - proxy provider - the proxy you supply + - country/class - sticky cookies/profile per session + - session minting - cache keyed by market metadata + - rotation policy - stable typed errors + - usage accounting +``` + +The balancer is the source of truth for _which_ proxy to use and _when_ to rotate. OpenSERP is stateless w.r.t. provider choice — it just executes the search through whatever proxy URL you hand it and reports back what happened. + +## Worker Configuration + +Enable the request-proxy-URL path on every worker fronted by your balancer: + +```yaml +proxies: + allow_request_proxy_url: true # required for X-Proxy-URL to be honored + lanes: + enabled: true + max_lanes: 100 # LRU cap on sticky lanes per worker + drop_cookies_on_challenge: true + +app: + max_processes: 4 # LRU cap on Chrome processes per worker + idle_ttl: 10m # close a Chrome that has not served traffic for this long +``` + +Worker rejects `X-Proxy-URL` with `400 bad_request` (`reason=REQUEST_PROXY_URL_DISABLED`) when the flag is off. Keep this off on any worker reachable by untrusted clients. + +CORS already lists every header the balancer sends; if you customise `cors.allow_headers`, keep `X-Proxy-URL, X-Proxy-Country, X-Proxy-Class, X-Proxy-Provider, X-Proxy-Session-ID, X-Tenant, X-Use-Proxy, X-Request-ID`. + +## Request Contract + +### Headers your balancer sends + +| Header | Required | Purpose | +| --------------------- | :------: | -------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `X-Proxy-URL` | ✅ | The actual proxy URL to route this request through. `http://`, `https://`, or unauthenticated `socks5://`/`socks5h://`. Authenticated SOCKS is rejected. | +| `X-Proxy-Country` | ⚠️ | Two-letter market code (`us`, `de`). **Cache key**. Without it, proxied responses bypass cache. | +| `X-Proxy-Class` | ⚠️ | `datacenter`, `residential`, `mobile`, etc. **Cache key**. | +| `X-Proxy-Provider` | ⚠️ | Provider id (`webshare`, `brightdata`, `internal`). **Cache key**. | +| `X-Proxy-Session-ID` | 🟢 | Sticky session id. **Lane key** — same id reuses cookies and profile. Rotate to get a clean lane. | +| `X-Tenant` | 🟢 | Multi-tenant scope. Lanes become `tenant + engine + session_id`. | +| `X-Use-Proxy: direct` | 🟢 | Force-disable proxy for this request, even if `X-Proxy-URL` is set. | +| `X-Request-ID` | 🟢 | Pass-through correlation id (UUID v7 generated if absent). | + +✅ required, ⚠️ recommended (cache won't engage without it), 🟢 optional. + +### Precedence + +1. `X-Use-Proxy: direct` +2. `X-Proxy-URL` (when allowed) +3. `X-Use-Proxy: ` +4. Per-engine configured tag (worker config) +5. `proxies.global` (worker config) +6. Direct (no proxy) + +### Endpoints + +| Path | What | +| ---------------------- | ---------------------------------------------------------- | +| `GET /{engine}/search` | Single engine: `google`, `yandex`, `baidu`, `bing`, `duck` | +| `GET /{engine}/image` | Single engine image search | +| `GET /mega/search` | Parallel across all (or `?engines=...`) engines | +| `GET /mega/image` | Parallel image search | +| `GET /stats/proxy` | Pool, lane, and Chrome-process stats | +| `GET /health` | Engine + circuit-breaker health | +| `GET /ready` | `503 draining` during graceful shutdown | + +Full schema at `/openapi.yaml` and Swagger UI at `/docs`. + +### Example call + +```bash +curl -G \ + -H "X-Proxy-URL: http://USER:PASS@proxy.example:8080" \ + -H "X-Proxy-Country: us" \ + -H "X-Proxy-Class: residential" \ + -H "X-Proxy-Provider: webshare" \ + -H "X-Proxy-Session-ID: sid-abc" \ + --data-urlencode "text=golang" \ + --data-urlencode "lang=EN" \ + --data-urlencode "limit=10" \ + http://worker.internal:7000/google/search +``` + +## Response Contract + +Every response carries: + +```text +X-Request-ID: 01HXYZ... # mirrors meta.request_id +X-Proxy-Mode: off | tag_pool | request_url # what actually ran +X-Proxy-Used: direct | http://proxy.example:8080 | multiple | mixed | pooled +X-Proxy-Tag: us # only when X-Proxy-Mode=tag_pool +X-Cache: HIT | MISS | BYPASS # only when cache is enabled +``` + +A balancer-driven request always sees `X-Proxy-Mode: request_url` (unless overridden via `X-Use-Proxy: direct`). `X-Proxy-Used` is the masked `scheme://host:port` of the proxy you sent — credentials are stripped. Use these headers to confirm OpenSERP actually applied your proxy and didn't quietly fall back. + +Successful body is the v1 envelope (`/openapi.yaml#/components/schemas/SearchEnvelope`). + +## Error Playbook + +All errors are JSON of shape: + +```json +{ + "error": "", + "code": 503, + "message": "", + "reason": "", + "meta": { + "engine": "google", + "proxy_used": "http://proxy.example:8080", + "proxy_country": "us", + "proxy_class": "residential", + "proxy_provider": "webshare", + "proxy_session_id": "sid-abc" + } +} +``` + +Credentials are **never** present in `meta.proxy_used`, response headers, logs, or stats. + +| HTTP | `error` | What it means | Balancer action | +| ---: | -------------------------------------------- | ---------------------------------------------------------------------------------------------- | ---------------------------------------------------------- | +| 400 | `bad_request` | Bad client input. See `reason`. | Don't retry. Surface to caller. | +| 400 | `bad_request` (`REQUEST_PROXY_URL_DISABLED`) | Worker has the feature off. | Misconfigured worker. Page oncall. | +| 400 | `bad_request` (`UNSUPPORTED_PROXY_SCHEME`) | Authenticated SOCKS in browser mode. | Don't send authenticated SOCKS. Use HTTP/HTTPS. | +| 403 | `blocked` | The search engine returned 403 to the proxy. | Rotate session id. Maybe rotate proxy. | +| 429 | `captcha_detected` | Captcha challenge page. Worker has dropped this lane's cookies. | Rotate session id. Cool the proxy. | +| 429 | `rate_limited` | Engine returned 429. | Slow the proxy. Rotate session id. | +| 502 | `parser_failure` | SERP parser drift on a successfully fetched page. | Don't blame the proxy. Page oncall — engine update needed. | +| 502 | `engine_internal` | Engine panic recovered, or all engines failed (mega). | Retry once on a different worker. | +| 503 | `proxy_connect` | TCP/TLS to the proxy failed. | Mark proxy bad. Retry on another proxy. | +| 503 | `proxy_auth` | 407 Proxy Auth Required. | Credentials wrong/expired. Don't retry the same one. | +| 503 | `proxy_timeout` | Network timeout on the proxy path. | Retry on another proxy. | +| 503 | `proxy_unavailable` | No healthy proxy left in the configured tag pool (worker-side, not your concern in SaaS mode). | Worker config issue. | +| 504 | `search_timeout` | Required SERP elements never appeared before timeout. | Retry once. If repeated, page oncall. | + +### Decision rules for the balancer + +- **Rotate session id** (mint a new `X-Proxy-Session-ID`) on `captcha_detected`, `blocked`, `rate_limited`. The lane keeps its profile but cookies are dropped on captcha; a new id starts fresh anyway. +- **Mark proxy bad** on `proxy_connect`, `proxy_auth`, `proxy_timeout`. Don't degrade the proxy on captcha/parser/engine errors — those aren't the proxy's fault. +- **Don't retry** on 400. Those are client bugs. + +## Sticky Lanes + +A **lane** is a (tenant, engine, session) tuple owned by one worker. + +```yaml +proxies: + lanes: + enabled: true + max_lanes: 100 + drop_cookies_on_challenge: true +``` + +What a lane holds: + +- The browser **profile** picked at first use (UA, viewport, languages, UA-CH brand list). +- The **cookies** harvested during navigation. + +Lane key: + +- `tenant + engine + session_id` when `X-Tenant` is present. +- `engine + session_id` otherwise. +- If `X-Proxy-Session-ID` is missing, the lane id is derived from `sha256(host:port|username)[:16]`. The password is never part of the key, so rotating credentials on the same proxy keeps the lane. + +What invalidates a lane: + +- `X-Proxy-Session-ID` change → fresh lane (the old one stays warm until LRU evicts). +- Captcha response → cookies dropped, profile retained (when `drop_cookies_on_challenge: true`). +- `max_lanes` LRU eviction. + +What does **not** invalidate a lane: + +- Block, rate-limit, parser, engine, or proxy-network errors. The balancer decides whether to rotate. + +If your balancer issues a sticky-session credential pattern (e.g. Bright Data `session-SID`), reuse the same `X-Proxy-Session-ID` for the same upstream sticky window. When you rotate, change both the proxy URL session token AND the `X-Proxy-Session-ID` in the same step. + +## Cache + +Cache key includes query fields + `country + class + provider`. It does **not** include the proxy URL, username, password, or session id. + +Behavior with proxied requests: + +- All three of country/class/provider missing → request bypasses cache (`X-Cache: BYPASS`). +- Country missing but `lang` present → falls back to `lang` as a weak market hint. +- Cross-engine fallback responses (`X-Fallback-Engine` set) are not cached. + +Translation: **send country/class/provider on every proxied request** if you want cache hits. + +## Browser Process Pool + +OpenSERP keeps one Chrome process per _authenticated proxy identity_ (`scheme + host + port + username`): + +| Upstream proxy | Chrome used | +| ----------------------------------------------- | -------------------------------------------- | +| `http://userA:passA@proxy:8080` | Dedicated Chrome `[http\|proxy:8080\|userA]` | +| `http://userA:passB@proxy:8080` (rotated pass) | Same Chrome (password ignored in key) | +| `http://userB:pass@proxy:8080` (different user) | New dedicated Chrome | +| `socks5://proxy:1080` (unauthenticated) | Shared "no-auth" Chrome, per-context proxy | +| no proxy / direct | Shared "no-auth" Chrome | + +Why dedicated processes for authenticated HTTP proxies: Chrome's per-`BrowserContext` auth callback is process-global and only answers the _next_ pending challenge — so concurrent requests with different credentials would race and subresources hang. Launching Chrome with `--proxy-server=...` per identity lets the OS-level Chrome auth path handle 407s natively for the main document AND every subresource. + +The pool grows lazily and is bounded by `app.max_processes` with LRU eviction, plus an idle sweeper that closes Chromes idle for `app.idle_ttl`. **One Chrome serves many concurrent requests** via per-page `BrowserContext` isolation — `max_processes` does NOT bound concurrent search requests, only the number of distinct authenticated proxy identities a worker can keep warm. + +### Sizing the pool + +If you expect `N` distinct authenticated identities to hit one worker concurrently, set `max_processes ≥ N`. Below that, the pool LRU-closes Chromes mid-burst, which adds Chrome-startup latency to the next request that needs the evicted identity. + +Rough memory budget: each Chrome ≈ 150-300 MiB resident. `max_processes: 4` → plan for ~1 GiB of Chrome RAM per worker, plus the rest of the Go process. + +`/debug/fingerprint-check` (when `app.debug_endpoints: true`) is intentionally **not** pooled — it spawns a fresh Chrome per call so you can verify each profile in isolation. + +## Stats Endpoint + +`GET /stats/proxy` returns: + +```json +{ + "configured_count": 0, + "healthy_count": 0, + "unhealthy_count": 0, + "request_proxy_url_enabled": true, + "lanes": { + "active": 12, + "evicted_lru": 7, + "cookies_dropped": 20 + }, + "browser_processes": { + "active": 3, + "max": 4, + "evicted_lru": 12, + "evicted_idle": 5 + }, + "tags": {}, + "entries": [] +} +``` + +Watch for: + +- `browser_processes.evicted_lru` rising → bump `max_processes`. +- `browser_processes.evicted_idle` rising while `active` stays low → traffic is bursty; that's healthy. +- `lanes.cookies_dropped` rising → the proxy is hitting captchas; consider rotating the session more aggressively. + +## Provider Examples + +Bright Data (residential, sticky session): + +```text +X-Proxy-URL: http://brd-customer-CUSTOMER-zone-res-country-us-session-SID:PASS@brd.superproxy.io:22225 +X-Proxy-Provider: brightdata +X-Proxy-Class: residential +X-Proxy-Country: us +X-Proxy-Session-ID: SID +``` + +Webshare: + +```text +X-Proxy-URL: http://USER:PASS@p.webshare.io:80 +X-Proxy-Provider: webshare +X-Proxy-Class: datacenter +X-Proxy-Country: us +``` + +Internal HTTP datacenter: + +```text +X-Proxy-URL: http://user:pass@dc-proxy.example:8080 +X-Proxy-Provider: internal +X-Proxy-Class: datacenter +X-Proxy-Country: de +``` + +Authenticated SOCKS proxies are rejected by browser mode because Chrome cannot safely answer SOCKS auth challenges via the DevTools auth callback. Unauthenticated `socks5://` and `socks5h://` are accepted on both `X-Proxy-URL` and configured pools. + +## Configured Proxies (no balancer) + +For OSS and local deployments without a balancer, OpenSERP can manage proxies directly: + +```yaml +proxies: + global: http://user:pass@127.0.0.1:8080 # one proxy for everything + +# OR a tagged pool: +proxies: + entries: + - url: http://user:pass@proxy-us.example:8080 + tags: [default, us] + - url: socks5h://127.0.0.1:1080 + tags: [eu] + health: + failure_threshold: 3 # disable a proxy after N consecutive network errors + +google: + proxy: default # opt this engine into the "default" tag pool +``` + +Per-request override (no balancer needed): + +```bash +curl -H "X-Use-Proxy: us" "http://127.0.0.1:7000/google/search?text=golang" +curl -H "X-Use-Proxy: direct" "http://127.0.0.1:7000/google/search?text=golang" +``` + +Pool health: a tag pool that exhausts (every member disabled) goes into a 5-minute quarantine. After quarantine, one proxy is re-enabled as a recovery probe. Network errors degrade health; captcha/parser/engine errors do not. From 04db5347a830d1fb047362570d3aefda5b901254 Mon Sep 17 00:00:00 2001 From: Rustem Kamalov Date: Fri, 12 Jun 2026 18:09:40 +0300 Subject: [PATCH 3/4] fix(core): harden extract and stabilize request protection - breaker: stop counting client cancellations/deadlines (incl. bare rate-limiter wait errors) and circuit-open as engine failures - rate limiting: cache limiters in SearchEngineOptions and rawEngine so pacing applies on raw/library paths; pool wrapper delegates - /extract SSRF guard: public-IP-only policy with dial-time IP pinning, redirect re-validation, rendered-mode preflight, http/https allow-list, ErrTargetNotAllowed -> HTTP 400, extract.allow_private_networks escape hatch (default off) - browser: no rod Must* on the request path; CLI parses block_resources without panicking --- cmd/root.go | 3 +- cmd/search.go | 5 +- cmd/serve.go | 17 +- cmd/serve_test.go | 11 ++ core/browser.go | 6 +- core/common.go | 33 +++- core/http_client.go | 35 +++- core/network_guard.go | 159 ++++++++++++++++ core/network_guard_test.go | 42 +++++ core/rate_limiter_test.go | 38 ++++ core/resilient.go | 25 ++- core/resilient_context_test.go | 85 +++++++++ core/server_extract.go | 67 ++++++- core/server_extract_test.go | 60 +++++- docs/PROXY_SETUP.md | 322 --------------------------------- extract/config.go | 11 +- extract/extractor.go | 7 +- extract/extractor_test.go | 4 +- 18 files changed, 569 insertions(+), 361 deletions(-) create mode 100644 core/network_guard.go create mode 100644 core/network_guard_test.go create mode 100644 core/rate_limiter_test.go delete mode 100644 docs/PROXY_SETUP.md diff --git a/cmd/root.go b/cmd/root.go index 5f6a4d4..482931a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -17,7 +17,7 @@ import ( ) const ( - version = "0.8.2" + version = "0.8.3" defaultConfigFilename = "config" envPrefix = "OPENSERP" ) @@ -412,6 +412,7 @@ func setConfigDefaults(v *viper.Viper) { v.SetDefault("extract.timeout", "20s") v.SetDefault("extract.max_bytes", 2*1024*1024) v.SetDefault("extract.max_concurrent", 2) + v.SetDefault("extract.allow_private_networks", false) // Keep stage2 defaults stable even when config file is absent. v.SetDefault("resilience.max_retries", 3) v.SetDefault("resilience.allow_endpoint_fallback", false) diff --git a/cmd/search.go b/cmd/search.go index d3ebeae..2e67ecb 100644 --- a/cmd/search.go +++ b/cmd/search.go @@ -100,7 +100,10 @@ func search(cmd *cobra.Command, args []string) { func searchBrowser(engineType string, query core.Query, browserProxyURL string, captchaSolverEnabled bool, captchaSolverAPIKey string) ([]core.SearchResult, error) { var engine core.SearchEngine - blockedResourceTypes := core.MustParseBlockedResourceTypes(config.App.BlockResources) + blockedResourceTypes, err := core.ParseBlockedResourceTypes(config.App.BlockResources) + if err != nil { + return nil, fmt.Errorf("invalid block_resources config: %w", err) + } if core.IsAuthenticatedSocksProxyURL(browserProxyURL) { return nil, fmt.Errorf( "%w: browser runtime does not support authenticated SOCKS proxy %s", diff --git a/cmd/serve.go b/cmd/serve.go index 86b28bd..aa0d067 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -27,7 +27,9 @@ import ( // rawEngine implements SearchEngine interface for raw HTTP requests type rawEngine struct { - name string + name string + limiterMu sync.Mutex + limiter *rate.Limiter } func (r *rawEngine) Search(ctx context.Context, q core.Query) ([]core.SearchResult, error) { @@ -60,8 +62,13 @@ func (r *rawEngine) IsInitialized() bool { } func (r *rawEngine) GetRateLimiter() *rate.Limiter { - // Use default rate limiter for raw requests - return rate.NewLimiter(rate.Every(time.Second), 5) + r.limiterMu.Lock() + defer r.limiterMu.Unlock() + if r.limiter == nil { + // Use default rate limiter for raw requests. + r.limiter = rate.NewLimiter(rate.Every(time.Second), 5) + } + return r.limiter } var serveCMD = &cobra.Command{ @@ -534,7 +541,6 @@ func (p *browserPool) close() error { type pooledBrowserEngine struct { name string - limiter *rate.Limiter opts core.SearchEngineOptions factory func(core.Browser, core.SearchEngineOptions) core.SearchEngine pool *browserPool @@ -578,7 +584,7 @@ func (e *pooledBrowserEngine) Name() string { } func (e *pooledBrowserEngine) GetRateLimiter() *rate.Limiter { - return e.limiter + return e.opts.GetRateLimiter() } func (e *pooledBrowserEngine) DropProxyLaneCookies(ctx context.Context, q core.Query) { @@ -703,7 +709,6 @@ func buildBrowserEngines(baseOpts core.BrowserOpts, proxyCfg core.ProxyConfig) ( opts.Init() base := &pooledBrowserEngine{ name: spec.name, - limiter: rate.NewLimiter(rate.Every(opts.GetRatelimit()), opts.RateBurst), opts: opts, factory: spec.factory, pool: pool, diff --git a/cmd/serve_test.go b/cmd/serve_test.go index 2358103..81e3390 100644 --- a/cmd/serve_test.go +++ b/cmd/serve_test.go @@ -8,6 +8,17 @@ import ( "github.com/karust/openserp/core" ) +func TestRawEngineCachesRateLimiter(t *testing.T) { + engine := &rawEngine{name: "google"} + first := engine.GetRateLimiter() + if first == nil { + t.Fatal("expected limiter") + } + if second := engine.GetRateLimiter(); second != first { + t.Fatal("expected rawEngine to return the cached limiter") + } +} + func TestBrowserPoolKey(t *testing.T) { cases := []struct { name string diff --git a/core/browser.go b/core/browser.go index dca7371..408b9e2 100644 --- a/core/browser.go +++ b/core/browser.go @@ -213,13 +213,15 @@ func (b *Browser) configureRequestBlocking(ctx context.Context, page *rod.Page) blocked := blockedResourceTypeSet(b.BlockResourceTypes) router := page.HijackRequests() - router.MustAdd("*", func(h *rod.Hijack) { + if err := router.Add("*", "", func(h *rod.Hijack) { if _, ok := blocked[h.Request.Type()]; ok { h.Response.Fail(proto.NetworkErrorReasonBlockedByClient) return } h.ContinueRequest(&proto.FetchContinueRequest{}) - }) + }); err != nil { + return fmt.Errorf("install resource-blocking route: %w", err) + } go router.Run() diff --git a/core/common.go b/core/common.go index f56e81c..6ce02e9 100644 --- a/core/common.go +++ b/core/common.go @@ -8,6 +8,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/gofiber/fiber/v2" @@ -294,6 +295,9 @@ type Query struct { ProxyOverride string // Insecure enables insecure TLS for request/browser execution. Insecure bool + // GuardPrivateNetworks rejects raw HTTP targets that resolve to private, + // loopback, link-local, multicast, or otherwise non-public addresses. + GuardPrivateNetworks bool } // String renders Query for logs with the proxy URL credentials masked. The @@ -447,8 +451,18 @@ type SearchEngineOptions struct { // IsSolveCaptcha enables automatic captcha solving when engine support and // solver credentials are configured. IsSolveCaptcha bool `mapstructure:"captcha"` + + limiterState *rateLimiterState +} + +type rateLimiterState struct { + limiter *rate.Limiter + every time.Duration + burst int } +var searchEngineOptionsLimiterMu sync.Mutex + // Init sets default option values when fields are zero. func (o *SearchEngineOptions) Init() { if o.RateRequests == 0 { @@ -471,10 +485,23 @@ func (o *SearchEngineOptions) GetRatelimit() time.Duration { return (time.Duration(o.RateTime) * time.Second) / time.Duration(o.RateRequests) } -// GetRateLimiter returns a limiter configured from SearchEngineOptions. -// Call Init() first so RateBurst is non-zero. +// GetRateLimiter returns a cached limiter configured from SearchEngineOptions. +// Call Init() first so RateBurst is non-zero. Do not copy SearchEngineOptions +// after first use; the limiter state is intentionally shared by each engine. func (o *SearchEngineOptions) GetRateLimiter() *rate.Limiter { - return rate.NewLimiter(rate.Every(o.GetRatelimit()), o.RateBurst) + every := o.GetRatelimit() + burst := o.RateBurst + searchEngineOptionsLimiterMu.Lock() + defer searchEngineOptionsLimiterMu.Unlock() + if o.limiterState == nil { + o.limiterState = &rateLimiterState{} + } + if o.limiterState.limiter == nil || o.limiterState.every != every || o.limiterState.burst != burst { + o.limiterState.limiter = rate.NewLimiter(rate.Every(every), burst) + o.limiterState.every = every + o.limiterState.burst = burst + } + return o.limiterState.limiter } // GetSelectorTimeout returns the selector wait timeout as time.Duration. diff --git a/core/http_client.go b/core/http_client.go index 48bf828..3e5f752 100644 --- a/core/http_client.go +++ b/core/http_client.go @@ -45,6 +45,11 @@ func DrainAndCloseResponse(resp *http.Response) { // derived from the query locale. The caller owns the returned response and // must drain/close it (see DrainAndCloseResponse). func RawSearchRequest(ctx context.Context, searchURL string, query Query) (*http.Response, error) { + if query.GuardPrivateNetworks { + if err := ValidatePublicHTTPURL(ctx, searchURL); err != nil { + return nil, err + } + } client, err := NewRawHTTPClient(query) if err != nil { return nil, err @@ -96,15 +101,26 @@ func NewRawHTTPClient(query Query) (*http.Client, error) { roundTripper = proxyErrorTransport{base: roundTripper} } - return &http.Client{ + client := &http.Client{ Transport: roundTripper, Timeout: rawHTTPTimeout, - }, nil + } + if query.GuardPrivateNetworks { + client.CheckRedirect = func(req *http.Request, via []*http.Request) error { + return ValidatePublicHTTPURL(req.Context(), req.URL.String()) + } + } + return client, nil } func newRawTransport(query Query) (*http.Transport, error) { + dialContext := dialNetworkUsageConn + if query.GuardPrivateNetworks { + dialContext = guardedDialNetworkUsageConn + } + transport := &http.Transport{ - DialContext: dialNetworkUsageConn, + DialContext: dialContext, } if query.Insecure { transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} @@ -122,12 +138,15 @@ func newRawTransport(query Query) (*http.Transport, error) { // Keep proxied requests on the standard transport path so SOCKS5/SOCKS5H // resolution and routing are handled by the configured proxy correctly. + // The extract SSRF guard validates the target URL before the request and + // on redirects; the proxy address itself may legitimately be local. + transport.DialContext = dialNetworkUsageConn transport.Proxy = http.ProxyURL(parsed) return transport, nil } transport.DialTLSContext = func(ctx context.Context, network, addr string) (net.Conn, error) { - rawConn, err := dialNetworkUsageConn(ctx, network, addr) + rawConn, err := dialContext(ctx, network, addr) if err != nil { return nil, err } @@ -175,6 +194,14 @@ func dialNetworkUsageConn(ctx context.Context, network, addr string) (net.Conn, return networkUsageConn{Conn: conn, ctx: ctx}, nil } +func guardedDialNetworkUsageConn(ctx context.Context, network, addr string) (net.Conn, error) { + conn, err := GuardedDialContext(ctx, network, addr) + if err != nil { + return nil, err + } + return networkUsageConn{Conn: conn, ctx: ctx}, nil +} + type networkUsageConn struct { net.Conn ctx context.Context diff --git a/core/network_guard.go b/core/network_guard.go new file mode 100644 index 0000000..384635d --- /dev/null +++ b/core/network_guard.go @@ -0,0 +1,159 @@ +package core + +import ( + "context" + "errors" + "fmt" + "net" + "net/netip" + "net/url" + "strings" +) + +// ErrTargetNotAllowed marks URL-guard policy rejections (bad scheme, missing +// host, or a target resolving to a non-public IP). Handlers match it with +// errors.Is to report a client error instead of an upstream failure. +var ErrTargetNotAllowed = errors.New("target not allowed") + +var carrierGradeNATPrefix = netip.MustParsePrefix("100.64.0.0/10") + +// GuardedDialContext dials only public IP targets. Hostnames are resolved first +// and the returned connection is made to the vetted IP, so DNS rebinding cannot +// swap in a private address between validation and dial. +func GuardedDialContext(ctx context.Context, network, addr string) (net.Conn, error) { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + ips, err := resolvePublicDialTargets(ctx, host) + if err != nil { + return nil, err + } + + dialer := &net.Dialer{} + var lastErr error + for _, ip := range ips { + if !ipMatchesNetwork(ip, network) { + continue + } + conn, err := dialer.DialContext(ctx, network, net.JoinHostPort(ip.String(), port)) + if err == nil { + return conn, nil + } + lastErr = err + } + if lastErr != nil { + return nil, lastErr + } + return nil, fmt.Errorf("no public IPs available for %s", host) +} + +func ValidatePublicHTTPURL(ctx context.Context, rawURL string) error { + parsed, err := validateHTTPURL(rawURL) + if err != nil { + return err + } + return validatePublicHost(ctx, parsed.Hostname()) +} + +func validateHTTPURL(rawURL string) (*url.URL, error) { + parsed, err := url.Parse(strings.TrimSpace(rawURL)) + if err != nil { + return nil, fmt.Errorf("%w: invalid URL: %v", ErrTargetNotAllowed, err) + } + if parsed.Scheme != "http" && parsed.Scheme != "https" { + return nil, fmt.Errorf("%w: unsupported URL scheme %q: only http and https are allowed", ErrTargetNotAllowed, parsed.Scheme) + } + if parsed.Hostname() == "" { + return nil, fmt.Errorf("%w: URL host is required", ErrTargetNotAllowed) + } + return parsed, nil +} + +// resolveHostIPs resolves host (or parses a literal IP) and partitions the +// addresses by the public-IP policy. +func resolveHostIPs(ctx context.Context, host string) (public, blocked []netip.Addr, err error) { + if ip, parseErr := netip.ParseAddr(host); parseErr == nil { + ip = ip.Unmap() + if isPublicIP(ip) { + return []netip.Addr{ip}, nil, nil + } + return nil, []netip.Addr{ip}, nil + } + ips, err := net.DefaultResolver.LookupNetIP(ctx, "ip", host) + if err != nil { + return nil, nil, err + } + for _, ip := range ips { + ip = ip.Unmap() + if isPublicIP(ip) { + public = append(public, ip) + } else { + blocked = append(blocked, ip) + } + } + return public, blocked, nil +} + +func resolvePublicDialTargets(ctx context.Context, host string) ([]netip.Addr, error) { + public, blocked, err := resolveHostIPs(ctx, host) + if err != nil { + return nil, err + } + if len(public) == 0 { + if len(blocked) == 0 { + return nil, fmt.Errorf("%w: target host %q resolved to no IP addresses", ErrTargetNotAllowed, host) + } + return nil, fmt.Errorf("%w: target host %q resolves only to non-public IPs: %s", ErrTargetNotAllowed, host, joinAddrs(blocked)) + } + return public, nil +} + +// validatePublicHost rejects a host when any of its addresses is non-public — +// stricter than the dial guard on purpose. The rendered path hands the URL to +// Chrome, which resolves DNS on its own, so a mixed public/private record set +// must fail closed here rather than rely on dial-time pinning. +func validatePublicHost(ctx context.Context, host string) error { + public, blocked, err := resolveHostIPs(ctx, host) + if err != nil { + return err + } + if len(blocked) > 0 { + return fmt.Errorf("%w: target host %q resolves to non-public IPs: %s", ErrTargetNotAllowed, host, joinAddrs(blocked)) + } + if len(public) == 0 { + return fmt.Errorf("%w: target host %q resolved to no IP addresses", ErrTargetNotAllowed, host) + } + return nil +} + +func joinAddrs(addrs []netip.Addr) string { + parts := make([]string, len(addrs)) + for i, addr := range addrs { + parts[i] = addr.String() + } + return strings.Join(parts, ", ") +} + +func isPublicIP(ip netip.Addr) bool { + ip = ip.Unmap() + return ip.IsValid() && + !ip.IsUnspecified() && + !ip.IsLoopback() && + !ip.IsPrivate() && + !ip.IsLinkLocalUnicast() && + !ip.IsLinkLocalMulticast() && + !ip.IsMulticast() && + !carrierGradeNATPrefix.Contains(ip) +} + +func ipMatchesNetwork(ip netip.Addr, network string) bool { + switch network { + case "tcp4": + return ip.Is4() + case "tcp6": + return ip.Is6() + default: + return true + } +} diff --git a/core/network_guard_test.go b/core/network_guard_test.go new file mode 100644 index 0000000..0a34b66 --- /dev/null +++ b/core/network_guard_test.go @@ -0,0 +1,42 @@ +package core + +import ( + "context" + "errors" + "strings" + "testing" +) + +func TestValidatePublicHTTPURLRejectsPrivateTargets(t *testing.T) { + tests := []string{ + "http://127.0.0.1/", + "http://[::1]/", + "http://10.0.0.1/", + "http://172.16.0.1/", + "http://192.168.1.1/", + "http://169.254.169.254/", + "http://100.64.0.1/", + } + + for _, rawURL := range tests { + t.Run(rawURL, func(t *testing.T) { + err := ValidatePublicHTTPURL(context.Background(), rawURL) + if !errors.Is(err, ErrTargetNotAllowed) { + t.Fatalf("expected ErrTargetNotAllowed, got %v", err) + } + if !strings.Contains(strings.ToLower(err.Error()), "non-public") { + t.Fatalf("expected non-public error, got %v", err) + } + }) + } +} + +func TestValidatePublicHTTPURLRejectsUnsupportedScheme(t *testing.T) { + err := ValidatePublicHTTPURL(context.Background(), "file:///etc/passwd") + if !errors.Is(err, ErrTargetNotAllowed) { + t.Fatalf("expected ErrTargetNotAllowed, got %v", err) + } + if !strings.Contains(err.Error(), "only http and https") { + t.Fatalf("unexpected error: %v", err) + } +} diff --git a/core/rate_limiter_test.go b/core/rate_limiter_test.go new file mode 100644 index 0000000..767f495 --- /dev/null +++ b/core/rate_limiter_test.go @@ -0,0 +1,38 @@ +package core + +import ( + "context" + "testing" + "time" +) + +func TestSearchEngineOptionsGetRateLimiterCachesLimiterAndPaces(t *testing.T) { + opts := SearchEngineOptions{ + RateRequests: 20, + RateTime: 1, + RateBurst: 1, + SelectorTimeout: 1, + } + + limiter := opts.GetRateLimiter() + if limiter == nil { + t.Fatal("expected limiter") + } + if got := opts.GetRateLimiter(); got != limiter { + t.Fatal("expected GetRateLimiter to return the cached limiter") + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := limiter.Wait(ctx); err != nil { + t.Fatalf("first wait failed: %v", err) + } + + start := time.Now() + if err := limiter.Wait(ctx); err != nil { + t.Fatalf("second wait failed: %v", err) + } + if elapsed := time.Since(start); elapsed < 40*time.Millisecond { + t.Fatalf("second wait elapsed %s, want pacing near 50ms", elapsed) + } +} diff --git a/core/resilient.go b/core/resilient.go index 6ba0ecb..6b0fc79 100644 --- a/core/resilient.go +++ b/core/resilient.go @@ -175,7 +175,7 @@ func (rs *ResilientSearcher) searchWithProtection(ctx context.Context, engine Se limiter := engine.GetRateLimiter() if limiter != nil { if err := limiter.Wait(callCtx); err != nil { - return nil, err + return nil, normalizeLimiterWaitErr(callCtx, err) } } @@ -223,7 +223,7 @@ func (rs *ResilientSearcher) searchWithProtection(ctx context.Context, engine Se }) if result.Err != nil { - if !errors.Is(result.Err, ErrProxyUnavailable) { + if shouldRecordCircuitFailure(result.Err) { cb.RecordFailure(engineCtx) } return nil, attemptMeta, result.Err @@ -233,6 +233,27 @@ func (rs *ResilientSearcher) searchWithProtection(ctx context.Context, engine Se return result.Results, attemptMeta, nil } +func shouldRecordCircuitFailure(err error) bool { + return err != nil && + !IsContextDone(err) && + !errors.Is(err, ErrProxyUnavailable) && + !errors.Is(err, ErrCircuitOpen) +} + +// normalizeLimiterWaitErr maps rate.Limiter.Wait failures back to the caller's +// context error. Wait reports a bare "would exceed context deadline" error that +// errors.Is cannot trace to the context, so without this an impatient client +// would be counted as an engine failure by the circuit breaker. +func normalizeLimiterWaitErr(ctx context.Context, err error) error { + if ctxErr := ctx.Err(); ctxErr != nil { + return ctxErr + } + if _, hasDeadline := ctx.Deadline(); hasDeadline { + return fmt.Errorf("%w: %v", context.DeadlineExceeded, err) + } + return err +} + // invokeEngine is the single panic-recovery point for every engine call made // through the resilient pipeline (browser, raw, and any future engine method). // A rod/CDP panic surfaces as ErrEngineInternal instead of killing the process. diff --git a/core/resilient_context_test.go b/core/resilient_context_test.go index 0a2e057..eea6c2d 100644 --- a/core/resilient_context_test.go +++ b/core/resilient_context_test.go @@ -73,3 +73,88 @@ func TestResilientSearchPrimary_CancelledContextStopsWithin100ms(t *testing.T) { t.Fatal("search did not stop after context cancellation") } } + +type staticErrorEngine struct { + name string + err error +} + +func (e *staticErrorEngine) Name() string { return e.name } + +func (e *staticErrorEngine) IsInitialized() bool { return true } + +func (e *staticErrorEngine) GetRateLimiter() *rate.Limiter { return nil } + +func (e *staticErrorEngine) Search(context.Context, Query) ([]SearchResult, error) { + return nil, e.err +} + +func (e *staticErrorEngine) SearchImage(context.Context, Query) ([]SearchResult, error) { + return nil, e.err +} + +func TestResilientSearchPrimary_ContextErrorsDoNotOpenCircuit(t *testing.T) { + engine := &staticErrorEngine{name: "cancelled", err: context.Canceled} + cfg := DefaultResilientConfig() + cfg.Retry.MaxRetries = 0 + cfg.CircuitBreaker.FailureThreshold = 1 + rs := NewResilientSearcher([]SearchEngine{engine}, cfg) + + _, _, _, err := rs.SearchPrimary(context.Background(), engine, Query{Text: "cancel-me"}) + if !errors.Is(err, context.Canceled) { + t.Fatalf("expected context.Canceled, got %v", err) + } + if state := rs.cbManager.Get(engine.Name()).State(); state != CircuitClosed { + t.Fatalf("circuit state = %s, want closed", state) + } +} + +type slowLimiterEngine struct { + staticErrorEngine + limiter *rate.Limiter +} + +func (e *slowLimiterEngine) GetRateLimiter() *rate.Limiter { return e.limiter } + +func TestResilientSearchPrimary_LimiterDeadlineDoesNotOpenCircuit(t *testing.T) { + // Drained limiter with a 1h refill: any deadline-bounded Wait fails with + // rate's bare "would exceed context deadline" error, not a context error. + limiter := rate.NewLimiter(rate.Every(time.Hour), 1) + if !limiter.Allow() { + t.Fatal("expected initial burst token") + } + engine := &slowLimiterEngine{ + staticErrorEngine: staticErrorEngine{name: "ratelimited"}, + limiter: limiter, + } + cfg := DefaultResilientConfig() + cfg.Retry.MaxRetries = 0 + cfg.CircuitBreaker.FailureThreshold = 1 + rs := NewResilientSearcher([]SearchEngine{engine}, cfg) + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + _, _, _, err := rs.SearchPrimary(ctx, engine, Query{Text: "limited"}) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("expected context.DeadlineExceeded, got %v", err) + } + if state := rs.cbManager.Get(engine.Name()).State(); state != CircuitClosed { + t.Fatalf("circuit state = %s, want closed", state) + } +} + +func TestResilientSearchPrimary_EngineFailureStillOpensCircuit(t *testing.T) { + engine := &staticErrorEngine{name: "parser", err: ErrParser} + cfg := DefaultResilientConfig() + cfg.Retry.MaxRetries = 0 + cfg.CircuitBreaker.FailureThreshold = 1 + rs := NewResilientSearcher([]SearchEngine{engine}, cfg) + + _, _, _, err := rs.SearchPrimary(context.Background(), engine, Query{Text: "break-me"}) + if !errors.Is(err, ErrParser) { + t.Fatalf("expected ErrParser, got %v", err) + } + if state := rs.cbManager.Get(engine.Name()).State(); state != CircuitOpen { + t.Fatalf("circuit state = %s, want open", state) + } +} diff --git a/core/server_extract.go b/core/server_extract.go index 46914c4..2cd0b85 100644 --- a/core/server_extract.go +++ b/core/server_extract.go @@ -3,6 +3,7 @@ package core import ( "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -47,6 +48,9 @@ func (s *Server) handleExtract(c *fiber.Ctx) error { result, err := extractor.Extract(requestCtx, req) if err != nil { WithRequest(requestCtx).WithError(err).Warn("Extract failed") + if errors.Is(err, ErrTargetNotAllowed) { + return &APIError{HTTPStatus: fiber.StatusBadRequest, ErrorCode: "invalid_extract_url", Message: err.Error()} + } return &APIError{HTTPStatus: fiber.StatusBadGateway, ErrorCode: "extract_failed", Message: "Failed to extract URL content"} } result.Meta.TookMs = time.Since(startedAt).Milliseconds() @@ -86,8 +90,12 @@ func (s *Server) extractRequestFromFiber(c *fiber.Ctx, cfg extractpkg.Config) (e if err != nil { return extractpkg.ExtractRequest{}, errInvalidParam("min_runes must be a non-negative integer") } + targetURL := extractpkg.NormalizeURL(strings.TrimSpace(firstNonEmpty(c.Query("url"), body.URL))) + if err := validateExtractTargetURL(c.UserContext(), targetURL, cfg.AllowPrivateNetworks); err != nil { + return extractpkg.ExtractRequest{}, errInvalidParam(err.Error()) + } return extractpkg.ExtractRequest{ - URL: firstNonEmpty(c.Query("url"), body.URL), + URL: targetURL, Mode: extractpkg.Mode(mode), ProxyURL: proxyURL, LangCode: strings.TrimSpace(c.Query("lang")), @@ -108,10 +116,15 @@ func (s *Server) newExtractor() extractpkg.Extractor { } func (s *Server) rawExtractFetch(ctx context.Context, req extractpkg.ExtractRequest) (*extractpkg.FetchResponse, error) { + cfg := s.opts.Extract.Normalized() + if err := validateExtractTargetURL(ctx, req.URL, cfg.AllowPrivateNetworks); err != nil { + return nil, err + } resp, err := RawSearchRequest(ctx, req.URL, Query{ - ProxyURL: req.ProxyURL, - LangCode: req.LangCode, - Insecure: s.opts.FingerprintBrowserOpts.Insecure, + ProxyURL: req.ProxyURL, + LangCode: req.LangCode, + Insecure: s.opts.FingerprintBrowserOpts.Insecure, + GuardPrivateNetworks: !cfg.AllowPrivateNetworks, }) if err != nil { return nil, err @@ -138,6 +151,10 @@ func (s *Server) renderedExtractFetch(ctx context.Context, req extractpkg.Extrac if s.opts.BrowserResolver == nil { return nil, fmt.Errorf("rendered extraction is unavailable") } + cfg := s.opts.Extract.Normalized() + if err := s.validateRenderedExtractNavigation(ctx, req, cfg); err != nil { + return nil, err + } browser, err := s.opts.BrowserResolver(req.ProxyURL) if err != nil { return nil, err @@ -160,6 +177,48 @@ func (s *Server) renderedExtractFetch(ctx context.Context, req extractpkg.Extrac return &extractpkg.FetchResponse{StatusCode: http.StatusOK, Body: body}, nil } +func (s *Server) validateRenderedExtractNavigation(ctx context.Context, req extractpkg.ExtractRequest, cfg extractpkg.Config) error { + if err := validateExtractTargetURL(ctx, req.URL, cfg.AllowPrivateNetworks); err != nil { + return err + } + if cfg.AllowPrivateNetworks { + return nil + } + + client, err := NewRawHTTPClient(Query{ + ProxyURL: req.ProxyURL, + LangCode: req.LangCode, + Insecure: s.opts.FingerprintBrowserOpts.Insecure, + GuardPrivateNetworks: true, + }) + if err != nil { + return err + } + preflight, err := http.NewRequestWithContext(ctx, http.MethodHead, req.URL, nil) + if err != nil { + return err + } + resp, err := client.Do(preflight) + if err != nil { + if errors.Is(err, ErrTargetNotAllowed) { + return err + } + WithRequest(ctx).WithError(err).Debug("Rendered extract redirect preflight failed; continuing after initial target validation") + return nil + } + _ = resp.Body.Close() + return nil +} + +func validateExtractTargetURL(ctx context.Context, rawURL string, allowPrivateNetworks bool) error { + rawURL = extractpkg.NormalizeURL(strings.TrimSpace(rawURL)) + if allowPrivateNetworks { + _, err := validateHTTPURL(rawURL) + return err + } + return ValidatePublicHTTPURL(ctx, rawURL) +} + func (s *Server) enrichEnvelopeWithExtraction(ctx context.Context, env *Envelope, q Query, format string) { cfg := s.opts.Extract.Normalized() if env == nil || !q.Extract || !cfg.Enabled { diff --git a/core/server_extract_test.go b/core/server_extract_test.go index bf1e164..3ac26f0 100644 --- a/core/server_extract_test.go +++ b/core/server_extract_test.go @@ -30,11 +30,12 @@ func TestEnrichEnvelopeWithExtractionRetriesThinAndFailedCandidates(t *testing.T opts := DefaultServerOptions() opts.Extract = extractpkg.Config{ - Enabled: true, - DefaultMode: string(extractpkg.ModeFast), - Timeout: time.Second, - MaxBytes: 256 * 1024, - MaxConcurrent: 2, + Enabled: true, + DefaultMode: string(extractpkg.ModeFast), + Timeout: time.Second, + MaxBytes: 256 * 1024, + MaxConcurrent: 2, + AllowPrivateNetworks: true, } s := &Server{opts: opts} env := &Envelope{Results: []Result{ @@ -59,3 +60,52 @@ func TestEnrichEnvelopeWithExtractionRetriesThinAndFailedCandidates(t *testing.T t.Fatalf("third candidate content = %q", env.Results[2].Extracted.Content) } } + +func TestExtractRejectsLinkLocalAddressByDefault(t *testing.T) { + opts := DefaultServerOptions() + opts.Extract = extractpkg.DefaultConfig() + s := NewServerWithOptions("127.0.0.1", 0, opts) + + req, err := http.NewRequest(http.MethodPost, "/extract", strings.NewReader(`{"url":"http://169.254.169.254/latest/meta-data/"}`)) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := s.app.Test(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusBadRequest) + } +} + +func TestExtractRejectsLocalhostByDefault(t *testing.T) { + opts := DefaultServerOptions() + opts.Extract = extractpkg.DefaultConfig() + s := NewServerWithOptions("127.0.0.1", 0, opts) + + req, err := http.NewRequest(http.MethodGet, "/extract?url=http://localhost/private", nil) + if err != nil { + t.Fatal(err) + } + + resp, err := s.app.Test(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusBadRequest) + } +} + +func TestValidateExtractTargetURLNormalizesBarePublicIP(t *testing.T) { + if err := validateExtractTargetURL(context.Background(), "1.1.1.1", false); err != nil { + t.Fatalf("expected bare public IP target to validate after scheme normalization: %v", err) + } +} diff --git a/docs/PROXY_SETUP.md b/docs/PROXY_SETUP.md deleted file mode 100644 index 4a3be1b..0000000 --- a/docs/PROXY_SETUP.md +++ /dev/null @@ -1,322 +0,0 @@ -# Proxy Setup - -This document is for the **backend engineer building the balancer** in front of OpenSERP. It explains the request/response contract for the SaaS `X-Proxy-URL` path, what each header does, what errors mean, and how to size and operate workers. - -If you only need a self-contained OpenSERP with locally configured proxies, scroll to [Configured Proxies (no balancer)](#configured-proxies-no-balancer). - -## Architecture - -```text -client ─▶ your balancer ─▶ OpenSERP worker ─▶ search engine - owns: applies: - - proxy provider - the proxy you supply - - country/class - sticky cookies/profile per session - - session minting - cache keyed by market metadata - - rotation policy - stable typed errors - - usage accounting -``` - -The balancer is the source of truth for _which_ proxy to use and _when_ to rotate. OpenSERP is stateless w.r.t. provider choice — it just executes the search through whatever proxy URL you hand it and reports back what happened. - -## Worker Configuration - -Enable the request-proxy-URL path on every worker fronted by your balancer: - -```yaml -proxies: - allow_request_proxy_url: true # required for X-Proxy-URL to be honored - lanes: - enabled: true - max_lanes: 100 # LRU cap on sticky lanes per worker - drop_cookies_on_challenge: true - -app: - max_processes: 4 # LRU cap on Chrome processes per worker - idle_ttl: 10m # close a Chrome that has not served traffic for this long -``` - -Worker rejects `X-Proxy-URL` with `400 bad_request` (`reason=REQUEST_PROXY_URL_DISABLED`) when the flag is off. Keep this off on any worker reachable by untrusted clients. - -CORS already lists every header the balancer sends; if you customise `cors.allow_headers`, keep `X-Proxy-URL, X-Proxy-Country, X-Proxy-Class, X-Proxy-Provider, X-Proxy-Session-ID, X-Tenant, X-Use-Proxy, X-Request-ID`. - -## Request Contract - -### Headers your balancer sends - -| Header | Required | Purpose | -| --------------------- | :------: | -------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `X-Proxy-URL` | ✅ | The actual proxy URL to route this request through. `http://`, `https://`, or unauthenticated `socks5://`/`socks5h://`. Authenticated SOCKS is rejected. | -| `X-Proxy-Country` | ⚠️ | Two-letter market code (`us`, `de`). **Cache key**. Without it, proxied responses bypass cache. | -| `X-Proxy-Class` | ⚠️ | `datacenter`, `residential`, `mobile`, etc. **Cache key**. | -| `X-Proxy-Provider` | ⚠️ | Provider id (`webshare`, `brightdata`, `internal`). **Cache key**. | -| `X-Proxy-Session-ID` | 🟢 | Sticky session id. **Lane key** — same id reuses cookies and profile. Rotate to get a clean lane. | -| `X-Tenant` | 🟢 | Multi-tenant scope. Lanes become `tenant + engine + session_id`. | -| `X-Use-Proxy: direct` | 🟢 | Force-disable proxy for this request, even if `X-Proxy-URL` is set. | -| `X-Request-ID` | 🟢 | Pass-through correlation id (UUID v7 generated if absent). | - -✅ required, ⚠️ recommended (cache won't engage without it), 🟢 optional. - -### Precedence - -1. `X-Use-Proxy: direct` -2. `X-Proxy-URL` (when allowed) -3. `X-Use-Proxy: ` -4. Per-engine configured tag (worker config) -5. `proxies.global` (worker config) -6. Direct (no proxy) - -### Endpoints - -| Path | What | -| ---------------------- | ---------------------------------------------------------- | -| `GET /{engine}/search` | Single engine: `google`, `yandex`, `baidu`, `bing`, `duck` | -| `GET /{engine}/image` | Single engine image search | -| `GET /mega/search` | Parallel across all (or `?engines=...`) engines | -| `GET /mega/image` | Parallel image search | -| `GET /stats/proxy` | Pool, lane, and Chrome-process stats | -| `GET /health` | Engine + circuit-breaker health | -| `GET /ready` | `503 draining` during graceful shutdown | - -Full schema at `/openapi.yaml` and Swagger UI at `/docs`. - -### Example call - -```bash -curl -G \ - -H "X-Proxy-URL: http://USER:PASS@proxy.example:8080" \ - -H "X-Proxy-Country: us" \ - -H "X-Proxy-Class: residential" \ - -H "X-Proxy-Provider: webshare" \ - -H "X-Proxy-Session-ID: sid-abc" \ - --data-urlencode "text=golang" \ - --data-urlencode "lang=EN" \ - --data-urlencode "limit=10" \ - http://worker.internal:7000/google/search -``` - -## Response Contract - -Every response carries: - -```text -X-Request-ID: 01HXYZ... # mirrors meta.request_id -X-Proxy-Mode: off | tag_pool | request_url # what actually ran -X-Proxy-Used: direct | http://proxy.example:8080 | multiple | mixed | pooled -X-Proxy-Tag: us # only when X-Proxy-Mode=tag_pool -X-Cache: HIT | MISS | BYPASS # only when cache is enabled -``` - -A balancer-driven request always sees `X-Proxy-Mode: request_url` (unless overridden via `X-Use-Proxy: direct`). `X-Proxy-Used` is the masked `scheme://host:port` of the proxy you sent — credentials are stripped. Use these headers to confirm OpenSERP actually applied your proxy and didn't quietly fall back. - -Successful body is the v1 envelope (`/openapi.yaml#/components/schemas/SearchEnvelope`). - -## Error Playbook - -All errors are JSON of shape: - -```json -{ - "error": "", - "code": 503, - "message": "", - "reason": "", - "meta": { - "engine": "google", - "proxy_used": "http://proxy.example:8080", - "proxy_country": "us", - "proxy_class": "residential", - "proxy_provider": "webshare", - "proxy_session_id": "sid-abc" - } -} -``` - -Credentials are **never** present in `meta.proxy_used`, response headers, logs, or stats. - -| HTTP | `error` | What it means | Balancer action | -| ---: | -------------------------------------------- | ---------------------------------------------------------------------------------------------- | ---------------------------------------------------------- | -| 400 | `bad_request` | Bad client input. See `reason`. | Don't retry. Surface to caller. | -| 400 | `bad_request` (`REQUEST_PROXY_URL_DISABLED`) | Worker has the feature off. | Misconfigured worker. Page oncall. | -| 400 | `bad_request` (`UNSUPPORTED_PROXY_SCHEME`) | Authenticated SOCKS in browser mode. | Don't send authenticated SOCKS. Use HTTP/HTTPS. | -| 403 | `blocked` | The search engine returned 403 to the proxy. | Rotate session id. Maybe rotate proxy. | -| 429 | `captcha_detected` | Captcha challenge page. Worker has dropped this lane's cookies. | Rotate session id. Cool the proxy. | -| 429 | `rate_limited` | Engine returned 429. | Slow the proxy. Rotate session id. | -| 502 | `parser_failure` | SERP parser drift on a successfully fetched page. | Don't blame the proxy. Page oncall — engine update needed. | -| 502 | `engine_internal` | Engine panic recovered, or all engines failed (mega). | Retry once on a different worker. | -| 503 | `proxy_connect` | TCP/TLS to the proxy failed. | Mark proxy bad. Retry on another proxy. | -| 503 | `proxy_auth` | 407 Proxy Auth Required. | Credentials wrong/expired. Don't retry the same one. | -| 503 | `proxy_timeout` | Network timeout on the proxy path. | Retry on another proxy. | -| 503 | `proxy_unavailable` | No healthy proxy left in the configured tag pool (worker-side, not your concern in SaaS mode). | Worker config issue. | -| 504 | `search_timeout` | Required SERP elements never appeared before timeout. | Retry once. If repeated, page oncall. | - -### Decision rules for the balancer - -- **Rotate session id** (mint a new `X-Proxy-Session-ID`) on `captcha_detected`, `blocked`, `rate_limited`. The lane keeps its profile but cookies are dropped on captcha; a new id starts fresh anyway. -- **Mark proxy bad** on `proxy_connect`, `proxy_auth`, `proxy_timeout`. Don't degrade the proxy on captcha/parser/engine errors — those aren't the proxy's fault. -- **Don't retry** on 400. Those are client bugs. - -## Sticky Lanes - -A **lane** is a (tenant, engine, session) tuple owned by one worker. - -```yaml -proxies: - lanes: - enabled: true - max_lanes: 100 - drop_cookies_on_challenge: true -``` - -What a lane holds: - -- The browser **profile** picked at first use (UA, viewport, languages, UA-CH brand list). -- The **cookies** harvested during navigation. - -Lane key: - -- `tenant + engine + session_id` when `X-Tenant` is present. -- `engine + session_id` otherwise. -- If `X-Proxy-Session-ID` is missing, the lane id is derived from `sha256(host:port|username)[:16]`. The password is never part of the key, so rotating credentials on the same proxy keeps the lane. - -What invalidates a lane: - -- `X-Proxy-Session-ID` change → fresh lane (the old one stays warm until LRU evicts). -- Captcha response → cookies dropped, profile retained (when `drop_cookies_on_challenge: true`). -- `max_lanes` LRU eviction. - -What does **not** invalidate a lane: - -- Block, rate-limit, parser, engine, or proxy-network errors. The balancer decides whether to rotate. - -If your balancer issues a sticky-session credential pattern (e.g. Bright Data `session-SID`), reuse the same `X-Proxy-Session-ID` for the same upstream sticky window. When you rotate, change both the proxy URL session token AND the `X-Proxy-Session-ID` in the same step. - -## Cache - -Cache key includes query fields + `country + class + provider`. It does **not** include the proxy URL, username, password, or session id. - -Behavior with proxied requests: - -- All three of country/class/provider missing → request bypasses cache (`X-Cache: BYPASS`). -- Country missing but `lang` present → falls back to `lang` as a weak market hint. -- Cross-engine fallback responses (`X-Fallback-Engine` set) are not cached. - -Translation: **send country/class/provider on every proxied request** if you want cache hits. - -## Browser Process Pool - -OpenSERP keeps one Chrome process per _authenticated proxy identity_ (`scheme + host + port + username`): - -| Upstream proxy | Chrome used | -| ----------------------------------------------- | -------------------------------------------- | -| `http://userA:passA@proxy:8080` | Dedicated Chrome `[http\|proxy:8080\|userA]` | -| `http://userA:passB@proxy:8080` (rotated pass) | Same Chrome (password ignored in key) | -| `http://userB:pass@proxy:8080` (different user) | New dedicated Chrome | -| `socks5://proxy:1080` (unauthenticated) | Shared "no-auth" Chrome, per-context proxy | -| no proxy / direct | Shared "no-auth" Chrome | - -Why dedicated processes for authenticated HTTP proxies: Chrome's per-`BrowserContext` auth callback is process-global and only answers the _next_ pending challenge — so concurrent requests with different credentials would race and subresources hang. Launching Chrome with `--proxy-server=...` per identity lets the OS-level Chrome auth path handle 407s natively for the main document AND every subresource. - -The pool grows lazily and is bounded by `app.max_processes` with LRU eviction, plus an idle sweeper that closes Chromes idle for `app.idle_ttl`. **One Chrome serves many concurrent requests** via per-page `BrowserContext` isolation — `max_processes` does NOT bound concurrent search requests, only the number of distinct authenticated proxy identities a worker can keep warm. - -### Sizing the pool - -If you expect `N` distinct authenticated identities to hit one worker concurrently, set `max_processes ≥ N`. Below that, the pool LRU-closes Chromes mid-burst, which adds Chrome-startup latency to the next request that needs the evicted identity. - -Rough memory budget: each Chrome ≈ 150-300 MiB resident. `max_processes: 4` → plan for ~1 GiB of Chrome RAM per worker, plus the rest of the Go process. - -`/debug/fingerprint-check` (when `app.debug_endpoints: true`) is intentionally **not** pooled — it spawns a fresh Chrome per call so you can verify each profile in isolation. - -## Stats Endpoint - -`GET /stats/proxy` returns: - -```json -{ - "configured_count": 0, - "healthy_count": 0, - "unhealthy_count": 0, - "request_proxy_url_enabled": true, - "lanes": { - "active": 12, - "evicted_lru": 7, - "cookies_dropped": 20 - }, - "browser_processes": { - "active": 3, - "max": 4, - "evicted_lru": 12, - "evicted_idle": 5 - }, - "tags": {}, - "entries": [] -} -``` - -Watch for: - -- `browser_processes.evicted_lru` rising → bump `max_processes`. -- `browser_processes.evicted_idle` rising while `active` stays low → traffic is bursty; that's healthy. -- `lanes.cookies_dropped` rising → the proxy is hitting captchas; consider rotating the session more aggressively. - -## Provider Examples - -Bright Data (residential, sticky session): - -```text -X-Proxy-URL: http://brd-customer-CUSTOMER-zone-res-country-us-session-SID:PASS@brd.superproxy.io:22225 -X-Proxy-Provider: brightdata -X-Proxy-Class: residential -X-Proxy-Country: us -X-Proxy-Session-ID: SID -``` - -Webshare: - -```text -X-Proxy-URL: http://USER:PASS@p.webshare.io:80 -X-Proxy-Provider: webshare -X-Proxy-Class: datacenter -X-Proxy-Country: us -``` - -Internal HTTP datacenter: - -```text -X-Proxy-URL: http://user:pass@dc-proxy.example:8080 -X-Proxy-Provider: internal -X-Proxy-Class: datacenter -X-Proxy-Country: de -``` - -Authenticated SOCKS proxies are rejected by browser mode because Chrome cannot safely answer SOCKS auth challenges via the DevTools auth callback. Unauthenticated `socks5://` and `socks5h://` are accepted on both `X-Proxy-URL` and configured pools. - -## Configured Proxies (no balancer) - -For OSS and local deployments without a balancer, OpenSERP can manage proxies directly: - -```yaml -proxies: - global: http://user:pass@127.0.0.1:8080 # one proxy for everything - -# OR a tagged pool: -proxies: - entries: - - url: http://user:pass@proxy-us.example:8080 - tags: [default, us] - - url: socks5h://127.0.0.1:1080 - tags: [eu] - health: - failure_threshold: 3 # disable a proxy after N consecutive network errors - -google: - proxy: default # opt this engine into the "default" tag pool -``` - -Per-request override (no balancer needed): - -```bash -curl -H "X-Use-Proxy: us" "http://127.0.0.1:7000/google/search?text=golang" -curl -H "X-Use-Proxy: direct" "http://127.0.0.1:7000/google/search?text=golang" -``` - -Pool health: a tag pool that exhausts (every member disabled) goes into a 5-minute quarantine. After quarantine, one proxy is re-enabled as a recovery probe. Network errors degrade health; captcha/parser/engine errors do not. diff --git a/extract/config.go b/extract/config.go index 04a50c5..11e0ccd 100644 --- a/extract/config.go +++ b/extract/config.go @@ -3,11 +3,12 @@ package extract import "time" type Config struct { - Enabled bool `json:"enabled" mapstructure:"enabled"` - DefaultMode string `json:"default_mode" mapstructure:"default_mode"` - Timeout time.Duration `json:"timeout" mapstructure:"timeout"` - MaxBytes int `json:"max_bytes" mapstructure:"max_bytes"` - MaxConcurrent int `json:"max_concurrent" mapstructure:"max_concurrent"` + Enabled bool `json:"enabled" mapstructure:"enabled"` + DefaultMode string `json:"default_mode" mapstructure:"default_mode"` + Timeout time.Duration `json:"timeout" mapstructure:"timeout"` + MaxBytes int `json:"max_bytes" mapstructure:"max_bytes"` + MaxConcurrent int `json:"max_concurrent" mapstructure:"max_concurrent"` + AllowPrivateNetworks bool `json:"allow_private_networks" mapstructure:"allow_private_networks"` } func DefaultConfig() Config { diff --git a/extract/extractor.go b/extract/extractor.go index ee29948..cec0580 100644 --- a/extract/extractor.go +++ b/extract/extractor.go @@ -101,7 +101,7 @@ func (e *Extractor) extractRendered(ctx context.Context, req ExtractRequest, sta } func normalizeRequest(req ExtractRequest, cfg Config) ExtractRequest { - req.URL = normalizeURL(strings.TrimSpace(req.URL)) + req.URL = NormalizeURL(strings.TrimSpace(req.URL)) if req.Timeout <= 0 { req.Timeout = cfg.Timeout } @@ -115,9 +115,8 @@ func normalizeRequest(req ExtractRequest, cfg Config) ExtractRequest { return req } -// normalizeURL defaults a missing scheme to https so callers can pass a bare host -// (e.g. "kamaloff.ru"). URLs that already carry a scheme are left untouched. -func normalizeURL(raw string) string { +// NormalizeURL defaults a missing scheme to https so callers can pass a bare host +func NormalizeURL(raw string) string { if raw == "" || strings.Contains(raw, "://") || strings.HasPrefix(raw, "//") { return raw } diff --git a/extract/extractor_test.go b/extract/extractor_test.go index a8cd101..316aa41 100644 --- a/extract/extractor_test.go +++ b/extract/extractor_test.go @@ -76,8 +76,8 @@ func TestNormalizeURL(t *testing.T) { "socks5h://127.0.0.1:80": "socks5h://127.0.0.1:80", } for in, want := range cases { - if got := normalizeURL(in); got != want { - t.Errorf("normalizeURL(%q) = %q, want %q", in, got, want) + if got := NormalizeURL(in); got != want { + t.Errorf("NormalizeURL(%q) = %q, want %q", in, got, want) } } } From 976ceeadf4ababc0c02ea03090be1c1e25b94ffd Mon Sep 17 00:00:00 2001 From: Rustem Kamalov Date: Fri, 12 Jun 2026 19:06:46 +0300 Subject: [PATCH 4/4] fix: report ErrSearchTimeout when results never hydrate; raise integration selector timeout to 15s --- baidu/search.go | 7 ++++++- baidu/search_integration_test.go | 2 +- bing/search_integration_test.go | 2 +- duckduckgo/search_integration_test.go | 2 +- ecosia/search_integration_test.go | 2 +- google/search_integration_test.go | 2 +- testutil/ithelper/ithelper.go | 7 +++++++ yandex/search_integration_test.go | 2 +- 8 files changed, 19 insertions(+), 7 deletions(-) diff --git a/baidu/search.go b/baidu/search.go index 503e19e..cbd6343 100644 --- a/baidu/search.go +++ b/baidu/search.go @@ -160,7 +160,12 @@ func (baid *Baidu) waitForParsedSearchResults(ctx context.Context, page *rod.Pag } return nil, core.ErrParser } - return nil, nil + // The page never reached a recognizable state: no result containers, no + // captcha or timeout markers. Baidu hydrates result cards client-side and + // can exceed the selector deadline, so report a timeout rather than a + // successful empty SERP — callers must retry/skip, not trust 0 results. + baid.logger.Debug("No result containers or block markers within selector timeout") + return nil, core.ErrSearchTimeout } // SearchImage executes a Baidu image search and returns normalized image diff --git a/baidu/search_integration_test.go b/baidu/search_integration_test.go index 2d32919..0257e5a 100644 --- a/baidu/search_integration_test.go +++ b/baidu/search_integration_test.go @@ -12,6 +12,6 @@ import ( func TestSearchBaidu(t *testing.T) { ithelper.RunEngineTests(t, func(b *core.Browser) core.SearchEngine { - return New(*b, core.SearchEngineOptions{}) + return New(*b, ithelper.EngineOptions()) }) } diff --git a/bing/search_integration_test.go b/bing/search_integration_test.go index 2ddb6d2..179f255 100644 --- a/bing/search_integration_test.go +++ b/bing/search_integration_test.go @@ -12,6 +12,6 @@ import ( func TestSearchBing(t *testing.T) { ithelper.RunEngineTests(t, func(b *core.Browser) core.SearchEngine { - return New(*b, core.SearchEngineOptions{}) + return New(*b, ithelper.EngineOptions()) }) } diff --git a/duckduckgo/search_integration_test.go b/duckduckgo/search_integration_test.go index 2f01f8b..3b52c3d 100644 --- a/duckduckgo/search_integration_test.go +++ b/duckduckgo/search_integration_test.go @@ -12,6 +12,6 @@ import ( func TestSearchDuckDuckGo(t *testing.T) { ithelper.RunEngineTests(t, func(b *core.Browser) core.SearchEngine { - return New(*b, core.SearchEngineOptions{}) + return New(*b, ithelper.EngineOptions()) }) } diff --git a/ecosia/search_integration_test.go b/ecosia/search_integration_test.go index 1047309..c6249fa 100644 --- a/ecosia/search_integration_test.go +++ b/ecosia/search_integration_test.go @@ -12,6 +12,6 @@ import ( func TestSearchEcosia(t *testing.T) { ithelper.RunEngineTests(t, func(b *core.Browser) core.SearchEngine { - return New(*b, core.SearchEngineOptions{}) + return New(*b, ithelper.EngineOptions()) }) } diff --git a/google/search_integration_test.go b/google/search_integration_test.go index 3dcf9b8..c65bc79 100644 --- a/google/search_integration_test.go +++ b/google/search_integration_test.go @@ -12,6 +12,6 @@ import ( func TestSearchGoogle(t *testing.T) { ithelper.RunEngineTests(t, func(b *core.Browser) core.SearchEngine { - return New(*b, core.SearchEngineOptions{}) + return New(*b, ithelper.EngineOptions()) }) } diff --git a/testutil/ithelper/ithelper.go b/testutil/ithelper/ithelper.go index 5c43e28..350fe82 100644 --- a/testutil/ithelper/ithelper.go +++ b/testutil/ithelper/ithelper.go @@ -47,6 +47,13 @@ func HandleError(t *testing.T, operation string, err error) { t.Skipf("skipping flaky live %s: %v", operation, err) } +// EngineOptions returns engine options tuned for live-site integration runs. +// Real SERPs (Baidu especially) hydrate result cards client-side and can take +// well past the 5s default selector timeout on a first visit. +func EngineOptions() core.SearchEngineOptions { + return core.SearchEngineOptions{SelectorTimeout: 15} +} + // CreateBrowser creates a browser configured for integration tests and closes // it when the test finishes. Respects OPENSERP_INTEGRATION_HEADFUL for // debugging (browser and page are left open for inspection). diff --git a/yandex/search_integration_test.go b/yandex/search_integration_test.go index 2fffa93..2a7c44f 100644 --- a/yandex/search_integration_test.go +++ b/yandex/search_integration_test.go @@ -12,6 +12,6 @@ import ( func TestSearchYandex(t *testing.T) { ithelper.RunEngineTests(t, func(b *core.Browser) core.SearchEngine { - return New(*b, core.SearchEngineOptions{}) + return New(*b, ithelper.EngineOptions()) }) }