Skip to content

feat: Added a new trait to expose SchemaProvider#1621

Open
parmesant wants to merge 48 commits into
parseablehq:mainfrom
parmesant:query-updates
Open

feat: Added a new trait to expose SchemaProvider#1621
parmesant wants to merge 48 commits into
parseablehq:mainfrom
parmesant:query-updates

Conversation

@parmesant

@parmesant parmesant commented Apr 15, 2026

Copy link
Copy Markdown
Contributor

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Expanded crate root public exports for easier downstream access.
    • Added Arrow Flight support to return a streaming DoGet payload for record batches.
    • Introduced a new Prometheus counter and public helper to track hot-tier files scanned by date, stream, and tenant.
  • Improvements

    • Added an overridable global schema-provider mechanism to customize how schemas are created and how physical optimizer rules are extended.
    • Made additional schema/partitioning/session-state helpers publicly accessible and improved execution-path reuse.

@coderabbitai

coderabbitai Bot commented Apr 15, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 5495bde0-9286-4d7a-9349-306cdd03b764

📥 Commits

Reviewing files that changed from the base of the PR and between 2a23ced and f4334ca.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (6)
  • Cargo.toml
  • src/lib.rs
  • src/metrics/mod.rs
  • src/query/mod.rs
  • src/query/stream_schema_provider.rs
  • src/utils/arrow/flight.rs
🚧 Files skipped from review as they are similar to previous changes (6)
  • src/lib.rs
  • src/utils/arrow/flight.rs
  • Cargo.toml
  • src/query/mod.rs
  • src/metrics/mod.rs
  • src/query/stream_schema_provider.rs

Walkthrough

This PR adds crate-root re-exports, a configurable schema-provider hook, query execution and session-state updates, hot-tier and scan metrics, public stream helpers, and an Arrow Flight streaming converter. It also adds the datafusion-proto dependency.

Changes

Query runtime extensions

Layer / File(s) Summary
Crate exports and dependency
src/lib.rs, Cargo.toml
Crate-root re-exports add arrow_array, arrow_flight, arrow_ipc, catalog as parseable_catalog, datafusion, datafusion_proto, and utils as parseable_utils; Cargo.toml adds datafusion-proto = "53.1.0".
Schema-provider hooks and session wiring
src/query/mod.rs
Adds ParseableSchemaProvider, SCHEMA_PROVIDER, and ADDITIONAL_PHYSICAL_OPTIMIZER_RULES; schema registration now uses the configured provider when present and passes providers into DataFusion with into().
Session state and execution flow
src/query/mod.rs
Query::create_session_state is public and appends physical optimizer rules before build; Query::execute reuses one session context for plan creation and task context lookup.
Hot-tier scan metrics and partitioning
src/query/stream_schema_provider.rs, src/metrics/mod.rs
Hot-tier manifest handling increments TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE; partitioned_files(...) is exported and computes partitioning, statistics, and query-scan billing metrics.
Public helpers and Flight streaming
src/query/stream_schema_provider.rs, src/utils/arrow/flight.rs
reversed_mem_table, PartialTimeFilter::try_from_expr, expr_in_boundary, and extract_timestamp_bound are public; into_flight_data_stream maps a SendableRecordBatchStream into Arrow Flight data.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Query
    participant SCHEMA_PROVIDER
    participant SessionState
    participant stream_schema_provider
    participant metrics
    participant flight

    Client->>Query: create_session_context(...)
    alt SCHEMA_PROVIDER configured
        Query->>SCHEMA_PROVIDER: new_provider(storage, tenant_id)
    else default provider
        Query->>stream_schema_provider: GlobalSchemaProvider
    end
    Query->>SessionState: register_schema(schema_provider.into())
    Query->>Query: create_session_state()
    Query->>SessionState: with_physical_optimizer_rule(...)
    Client->>Query: execute(query)
    Query->>SessionState: create_physical_plan(...)
    Query->>stream_schema_provider: partitioned_files(...)
    stream_schema_provider->>metrics: increment_files_scanned_in_query_by_date(...)
    Query->>metrics: increment_files_scanned_in_hottier_by_date(...)
    Query->>flight: into_flight_data_stream(stream)
    flight-->>Client: DoGetStream
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~40 minutes

Possibly related PRs

Suggested reviewers

  • nitisht

Poem

🐰 I hopped through schemas, neat and bright,
New counters ticked in morning light.
Flight streams fluttered, swift and neat,
With one small hop, the query beat.

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The description is still the template placeholder text and lacks the required issue, goal, rationale, and change details. Fill in the issue number, describe the goal and chosen approach, summarize key changes, and note testing/docs status.
Docstring Coverage ⚠️ Warning Docstring coverage is 52.38% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly names the main change: adding a trait to expose SchemaProvider.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 OpenGrep (1.23.0)
src/query/mod.rs

┌──────────────┐
│ Opengrep CLI │
└──────────────┘

�[32m✔�[39m �[1mOpengrep OSS�[0m
�[32m✔�[39m Basic security coverage for first-party code vulnerabilities.

[00.16][ERROR]: unable to find a config; path .coderabbit-opengrep-fallback.yml does not exist

src/lib.rs

┌──────────────┐
│ Opengrep CLI │
└──────────────┘

�[32m✔�[39m �[1mOpengrep OSS�[0m
�[32m✔�[39m Basic security coverage for first-party code vulnerabilities.

[00.25][ERROR]: unable to find a config; path .coderabbit-opengrep-fallback.yml does not exist

src/utils/arrow/flight.rs

┌──────────────┐
│ Opengrep CLI │
└──────────────┘

�[32m✔�[39m �[1mOpengrep OSS�[0m
�[32m✔�[39m Basic security coverage for first-party code vulnerabilities.

[00.18][ERROR]: unable to find a config; path .coderabbit-opengrep-fallback.yml does not exist

  • 2 others

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/query/mod.rs (1)

77-100: ⚠️ Potential issue | 🟠 Major

Prevent late SCHEMA_PROVIDER registration from silently no-oping.

SCHEMA_PROVIDER is only consulted when schemas are registered, but QUERY_SESSION is a process-wide Lazy. If the cell is set after the first QUERY_SESSION access, the default GlobalSchemaProvider remains registered for the lifetime of that session, so the new extension point never takes effect for existing schemas. Please either enforce provider initialization before any session access or rebuild the session context when the provider is installed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/query/mod.rs` around lines 77 - 100, SCHEMA_PROVIDER can be registered
too late and never affect the process-wide QUERY_SESSION (and
QUERY_SESSION_STATE) because QUERY_SESSION is a Lazy created with the old
provider; fix by ensuring provider initialization happens before any session
access or by rebuilding the session when a provider is installed: update the
code that sets SCHEMA_PROVIDER to, after successful OnceCell::set, call
Query::create_session_context(PARSEABLE.storage()) and replace the stored
session/context (QUERY_SESSION or its InMemorySessionContext.session_context)
and likewise refresh QUERY_SESSION_STATE via Query::create_session_state(...) so
the new ParseableSchemaProvider takes effect for existing sessions.
🧹 Nitpick comments (2)
src/lib.rs (1)

59-64: Clarify stability expectations for these new crate-root re-exports.

At Line 59, Line 60, Line 61, Line 62, Line 64, and Line 73, these pub use additions expand the public API surface. Please document whether they are part of a stable contract (or move them under a dedicated namespace) to avoid accidental long-term semver lock-in.

Also applies to: 73-73

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lib.rs` around lines 59 - 64, These new crate-root re-exports
(arrow_array, arrow_flight, arrow_ipc, catalog as parseable_catalog, datafusion,
datafusion_proto) expand the public API surface; either mark them explicitly as
unstable/internal or move them under a dedicated namespace/module (e.g.,
reexports::arrow::*) and add a clear doc-comment on each symbol indicating
stability guarantees (stable API vs internal/experimental) so consumers won’t be
accidentally semver-locked; update the lib.rs entries for the listed pub use
items to point to the new module or add #[doc = "... stability: ..."] comments
and/or cfg(feature = "internal-reexports") gating as appropriate.
src/query/stream_schema_provider.rs (1)

701-701: Document the new public helper.

try_from_expr is now part of the public surface. Please add rustdoc describing the accepted expression shapes and the time_partition == None behavior so custom schema providers can depend on it safely.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/query/stream_schema_provider.rs` at line 701, Add Rustdoc for the newly
public helper try_from_expr in stream_schema_provider.rs: describe the accepted
expression shapes (e.g., exact/match on column names, literal values, supported
Expr variants) and any constraints the function assumes from Expr, and
explicitly document the behavior when time_partition is None (what is
returned/assumed and how partition-sensitive logic behaves). Reference the
function name try_from_expr and the Expr type in the doc so external/custom
schema providers know how to call it and what results or None means; keep the
doc concise and include examples of expected expression shapes in prose.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/query/mod.rs`:
- Around line 211-239: The schema provider registration is using
PARSEABLE.storage().get_object_store() instead of the storage passed into
create_session_context(storage), so schemas get registered against the wrong
backend; update both branches where SCHEMA_PROVIDER.new_provider and
GlobalSchemaProvider are constructed to use the caller's storage (the function
parameter named storage) by passing Some(storage.get_object_store()) or
storage.get_object_store() and tenant_id as before, and then call
catalog.register_schema as-is so the catalog is registered against the provided
storage rather than PARSEABLE.storage().

---

Outside diff comments:
In `@src/query/mod.rs`:
- Around line 77-100: SCHEMA_PROVIDER can be registered too late and never
affect the process-wide QUERY_SESSION (and QUERY_SESSION_STATE) because
QUERY_SESSION is a Lazy created with the old provider; fix by ensuring provider
initialization happens before any session access or by rebuilding the session
when a provider is installed: update the code that sets SCHEMA_PROVIDER to,
after successful OnceCell::set, call
Query::create_session_context(PARSEABLE.storage()) and replace the stored
session/context (QUERY_SESSION or its InMemorySessionContext.session_context)
and likewise refresh QUERY_SESSION_STATE via Query::create_session_state(...) so
the new ParseableSchemaProvider takes effect for existing sessions.

---

Nitpick comments:
In `@src/lib.rs`:
- Around line 59-64: These new crate-root re-exports (arrow_array, arrow_flight,
arrow_ipc, catalog as parseable_catalog, datafusion, datafusion_proto) expand
the public API surface; either mark them explicitly as unstable/internal or move
them under a dedicated namespace/module (e.g., reexports::arrow::*) and add a
clear doc-comment on each symbol indicating stability guarantees (stable API vs
internal/experimental) so consumers won’t be accidentally semver-locked; update
the lib.rs entries for the listed pub use items to point to the new module or
add #[doc = "... stability: ..."] comments and/or cfg(feature =
"internal-reexports") gating as appropriate.

In `@src/query/stream_schema_provider.rs`:
- Line 701: Add Rustdoc for the newly public helper try_from_expr in
stream_schema_provider.rs: describe the accepted expression shapes (e.g.,
exact/match on column names, literal values, supported Expr variants) and any
constraints the function assumes from Expr, and explicitly document the behavior
when time_partition is None (what is returned/assumed and how
partition-sensitive logic behaves). Reference the function name try_from_expr
and the Expr type in the doc so external/custom schema providers know how to
call it and what results or None means; keep the doc concise and include
examples of expected expression shapes in prose.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 2b7d7002-715b-4e6c-a3c9-30b2bb3d8300

📥 Commits

Reviewing files that changed from the base of the PR and between eacb1b9 and 2aa314b.

📒 Files selected for processing (4)
  • Cargo.toml
  • src/lib.rs
  • src/query/mod.rs
  • src/query/stream_schema_provider.rs

Comment thread src/query/mod.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/query/mod.rs (1)

319-338: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Undefined variable ctx will cause a compilation error.

The variable ctx is referenced on lines 319, 322, and 338 but is never defined. The execute method calls QUERY_SESSION.get_ctx() inline on line 302 but does not assign it to a local variable. This code will not compile.

🐛 Proposed fix

Add a local ctx binding before usage:

         if fields.is_empty() && !is_streaming {
             return Ok((Either::Left(vec![]), fields));
         }

+        let ctx = QUERY_SESSION.get_ctx();
         let plan = ctx.state().create_physical_plan(df.logical_plan()).await?;

         let results = if !is_streaming {
             let task_ctx = ctx.task_ctx();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/query/mod.rs` around lines 319 - 338, The error is that `ctx` is used but
not defined: before calling ctx.state(), ctx.task_ctx(), etc., assign the
session context returned from QUERY_SESSION.get_ctx() to a local variable (e.g.,
let ctx = QUERY_SESSION.get_ctx()?) in the execute function so subsequent calls
like ctx.state().create_physical_plan(...), collect_partitioned(plan.clone(),
ctx.task_ctx().clone()), get_total_bytes_scanned(&plan), and
increment_bytes_scanned_in_query_by_date(..., tenant) compile; locate the call
site where QUERY_SESSION.get_ctx() is currently invoked inline (in execute) and
replace it with a local binding named `ctx` used by create_physical_plan,
collect_partitioned, get_total_bytes_scanned, and
increment_bytes_scanned_in_query_by_date.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/query/mod.rs`:
- Around line 319-338: The error is that `ctx` is used but not defined: before
calling ctx.state(), ctx.task_ctx(), etc., assign the session context returned
from QUERY_SESSION.get_ctx() to a local variable (e.g., let ctx =
QUERY_SESSION.get_ctx()?) in the execute function so subsequent calls like
ctx.state().create_physical_plan(...), collect_partitioned(plan.clone(),
ctx.task_ctx().clone()), get_total_bytes_scanned(&plan), and
increment_bytes_scanned_in_query_by_date(..., tenant) compile; locate the call
site where QUERY_SESSION.get_ctx() is currently invoked inline (in execute) and
replace it with a local binding named `ctx` used by create_physical_plan,
collect_partitioned, get_total_bytes_scanned, and
increment_bytes_scanned_in_query_by_date.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 6cbe7d76-b2bf-4e69-a043-307af0a8fcc7

📥 Commits

Reviewing files that changed from the base of the PR and between 2aa314b and 4e50143.

📒 Files selected for processing (3)
  • src/lib.rs
  • src/query/mod.rs
  • src/query/stream_schema_provider.rs
✅ Files skipped from review due to trivial changes (2)
  • src/query/stream_schema_provider.rs
  • src/lib.rs

coderabbitai[bot]
coderabbitai Bot previously approved these changes May 4, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
src/query/mod.rs (1)

209-229: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Use the caller's storage when constructing schema providers.

create_session_context(storage) still registers schemas against PARSEABLE.storage().get_object_store() in both branches. That gives callers a session state for one backend and schema providers for another whenever they pass a non-default ObjectStorageProvider.

Suggested fix
 pub fn create_session_context(storage: Arc<dyn ObjectStorageProvider>) -> SessionContext {
     let state = Self::create_session_state(storage.clone());
+    let object_store = storage.get_object_store();

     let catalog = state
         .catalog_list()
         .catalog(&state.config_options().catalog.default_catalog)
         .expect("default catalog is provided by datafusion");
@@
                 for t in tenants.iter() {
                     let schema_provider = if let Some(provider) = SCHEMA_PROVIDER.get() {
                         provider.new_provider(
-                            Some(PARSEABLE.storage().get_object_store()),
+                            Some(object_store.clone()),
                             &Some(t.to_owned()),
                         )
                     } else {
                         Box::new(GlobalSchemaProvider {
-                            storage: PARSEABLE.storage().get_object_store(),
+                            storage: object_store.clone(),
                             tenant_id: Some(t.to_owned()),
                         })
                     };
                     let _ = catalog.register_schema(t, schema_provider.into());
                 }
@@
             let schema_provider = if let Some(provider) = SCHEMA_PROVIDER.get() {
-                provider.new_provider(Some(PARSEABLE.storage().get_object_store()), &None)
+                provider.new_provider(Some(object_store.clone()), &None)
             } else {
                 Box::new(GlobalSchemaProvider {
-                    storage: PARSEABLE.storage().get_object_store(),
+                    storage: object_store,
                     tenant_id: None,
                 })
             };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/query/mod.rs` around lines 209 - 229, The session creation currently
always uses PARSEABLE.storage().get_object_store() when constructing schema
providers, causing providers to be bound to the default backend instead of the
caller-provided one; update create_session_context(storage) so both branches use
the passed-in storage (the function parameter named storage) when calling
SCHEMA_PROVIDER.get().new_provider(...) and when constructing
GlobalSchemaProvider (replace uses of PARSEABLE.storage().get_object_store()
with storage.get_object_store() or the appropriate accessor), ensuring
catalog.register_schema still receives the schema_provider created from the
caller's storage.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/metrics/mod.rs`:
- Around line 260-270: The new IntCounterVec
TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE is never registered with the custom
prometheus registry, so it won't be exposed; update the custom_metrics function
to call
METRICS_REGISTRY.register(Box::new(TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE.clone()))
(or the equivalent register method used in this module) alongside the other
metrics registration, handling any register errors consistently with existing
patterns so the counter is available on /metrics.

In `@src/utils/arrow/flight.rs`:
- Around line 161-163: Replace the unbounded Flight frame size usage by defining
a single explicit constant (e.g. MAX_FLIGHT_FRAME_SIZE = 16 * 1024 * 1024) and
use it wherever FlightDataEncoderBuilder::with_max_flight_data_size currently
receives usize::MAX; update both into_flight_data_stream and into_flight_data to
call .with_max_flight_data_size(MAX_FLIGHT_FRAME_SIZE) so the Arrow IPC encoder
chunks data consistently with gRPC's frame size instead of allowing unlimited
accumulation.

---

Duplicate comments:
In `@src/query/mod.rs`:
- Around line 209-229: The session creation currently always uses
PARSEABLE.storage().get_object_store() when constructing schema providers,
causing providers to be bound to the default backend instead of the
caller-provided one; update create_session_context(storage) so both branches use
the passed-in storage (the function parameter named storage) when calling
SCHEMA_PROVIDER.get().new_provider(...) and when constructing
GlobalSchemaProvider (replace uses of PARSEABLE.storage().get_object_store()
with storage.get_object_store() or the appropriate accessor), ensuring
catalog.register_schema still receives the schema_provider created from the
caller's storage.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 435a0108-16bb-4acd-b14f-90dcc5110196

📥 Commits

Reviewing files that changed from the base of the PR and between 4e50143 and 2a23ced.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • Cargo.toml
  • src/metrics/mod.rs
  • src/query/mod.rs
  • src/query/stream_schema_provider.rs
  • src/utils/arrow/flight.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/query/stream_schema_provider.rs

Comment thread src/metrics/mod.rs
Comment on lines +260 to +270
pub static TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new(
"total_files_scanned_in_hottier_by_date",
"Total files scanned in hottier by date",
)
.namespace(METRICS_NAMESPACE),
&["stream", "date", "tenant_id"],
)
.expect("metric can be created")
});

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="src/metrics/mod.rs"

echo "Occurrences of TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE:"
rg -nC2 'TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE' "$FILE"

echo
echo "Registration block in custom_metrics:"
sed -n '512,580p' "$FILE"

Repository: parseablehq/parseable

Length of output: 3289


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="src/metrics/mod.rs"

# Find the custom_metrics function definition and check its bounds
echo "=== custom_metrics function definition and scope ==="
sed -n '463,470p' "$FILE"

# Get the complete custom_metrics function (find where it ends)
echo
echo "=== End of custom_metrics function ==="
sed -n '575,585p' "$FILE"

# Search for any other registration attempts with the new metric
echo
echo "=== All occurrences of register and TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE in same context ==="
rg -n 'register.*TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE|TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE.*register' "$FILE"

# Confirm the metric is not already registered elsewhere
echo
echo "=== Checking if the metric appears in any register calls ==="
rg -n '\.register\(' "$FILE" | grep -i 'hottier' || echo "No register calls found for hottier metric"

Repository: parseablehq/parseable

Length of output: 920


Register this counter in custom_metrics.

TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE is declared at line 260 and incremented in the helper at lines 704-706, but it is never added to METRICS_REGISTRY inside custom_metrics. With the custom registry setup in this file, the metric will not be exposed on /metrics.

Proposed fix
     registry
         .register(Box::new(TOTAL_QUERY_CALLS_BY_DATE.clone()))
         .expect("metric can be registered");
+    registry
+        .register(Box::new(TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE.clone()))
+        .expect("metric can be registered");
     registry
         .register(Box::new(TOTAL_FILES_SCANNED_IN_QUERY_BY_DATE.clone()))
         .expect("metric can be registered");
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/metrics/mod.rs` around lines 260 - 270, The new IntCounterVec
TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE is never registered with the custom
prometheus registry, so it won't be exposed; update the custom_metrics function
to call
METRICS_REGISTRY.register(Box::new(TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE.clone()))
(or the equivalent register method used in this module) alongside the other
metrics registration, handling any register errors consistently with existing
patterns so the counter is available on /metrics.

Comment thread src/utils/arrow/flight.rs
Comment on lines +161 to +163
let flight_data_stream = FlightDataEncoderBuilder::new()
.with_max_flight_data_size(usize::MAX)
.with_options(write_options)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify all Arrow Flight encoder call sites still use unbounded size.
rg -nP --type=rust 'with_max_flight_data_size\(\s*usize::MAX\s*\)'

Repository: parseablehq/parseable

Length of output: 219


🏁 Script executed:

#!/bin/bash
# Read the full context of both functions to understand their purpose and design
sed -n '146,190p' src/utils/arrow/flight.rs | cat -n

Repository: parseablehq/parseable

Length of output: 2116


🏁 Script executed:

#!/bin/bash
# Search for any configuration or constants related to Flight frame sizing
rg -i 'max.*flight.*size|flight.*chunk|frame.*size' --type=rust src/

Repository: parseablehq/parseable

Length of output: 365


🏁 Script executed:

#!/bin/bash
# Look for any comments or documentation about the encoding choice
sed -n '140,195p' src/utils/arrow/flight.rs

Repository: parseablehq/parseable

Length of output: 1956


Bound Flight frame size instead of usize::MAX.

Both into_flight_data_stream and into_flight_data use unbounded with_max_flight_data_size(usize::MAX), which allows the Arrow IPC encoder to accumulate data excessively before chunking. Although gRPC's max_frame_size is set to 16 MB in src/handlers/airplane.rs, this creates an implicit and inefficient constraint. Define an explicit constant and use it consistently across both functions instead.

Proposed fix
+const MAX_FLIGHT_DATA_SIZE: usize = 16 * 1024 * 1024;
+
 pub fn into_flight_data_stream(
     stream: datafusion::execution::SendableRecordBatchStream,
 ) -> Result<Response<DoGetStream>, Box<Status>> {
     let record_stream = stream.map_err(|e| {
         arrow_flight::error::FlightError::Arrow(arrow_schema::ArrowError::ExternalError(
             Box::new(e),
         ))
     });

     let write_options = IpcWriteOptions::default()
         .try_with_compression(Some(arrow_ipc::CompressionType(1)))
         .map_err(|err| Status::failed_precondition(err.to_string()))?;

     let flight_data_stream = FlightDataEncoderBuilder::new()
-        .with_max_flight_data_size(usize::MAX)
+        .with_max_flight_data_size(MAX_FLIGHT_DATA_SIZE)
         .with_options(write_options)
         .build(record_stream);

     let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string()));

     Ok(Response::new(Box::pin(flight_data_stream) as DoGetStream))
 }

 pub fn into_flight_data(records: Vec<RecordBatch>) -> Result<Response<DoGetStream>, Box<Status>> {
     let input_stream = futures::stream::iter(records.into_iter().map(Ok));
     let write_options = IpcWriteOptions::default()
         .try_with_compression(Some(arrow_ipc::CompressionType(1)))
         .map_err(|err| Status::failed_precondition(err.to_string()))?;

     let flight_data_stream = FlightDataEncoderBuilder::new()
-        .with_max_flight_data_size(usize::MAX)
+        .with_max_flight_data_size(MAX_FLIGHT_DATA_SIZE)
         .with_options(write_options)
         // .with_schema(schema.into())
         .build(input_stream);

     let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string()));

     Ok(Response::new(Box::pin(flight_data_stream) as DoGetStream))
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let flight_data_stream = FlightDataEncoderBuilder::new()
.with_max_flight_data_size(usize::MAX)
.with_options(write_options)
const MAX_FLIGHT_DATA_SIZE: usize = 16 * 1024 * 1024;
pub fn into_flight_data_stream(
stream: datafusion::execution::SendableRecordBatchStream,
) -> Result<Response<DoGetStream>, Box<Status>> {
let record_stream = stream.map_err(|e| {
arrow_flight::error::FlightError::Arrow(arrow_schema::ArrowError::ExternalError(
Box::new(e),
))
});
let write_options = IpcWriteOptions::default()
.try_with_compression(Some(arrow_ipc::CompressionType(1)))
.map_err(|err| Status::failed_precondition(err.to_string()))?;
let flight_data_stream = FlightDataEncoderBuilder::new()
.with_max_flight_data_size(MAX_FLIGHT_DATA_SIZE)
.with_options(write_options)
.build(record_stream);
let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string()));
Ok(Response::new(Box::pin(flight_data_stream) as DoGetStream))
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/arrow/flight.rs` around lines 161 - 163, Replace the unbounded
Flight frame size usage by defining a single explicit constant (e.g.
MAX_FLIGHT_FRAME_SIZE = 16 * 1024 * 1024) and use it wherever
FlightDataEncoderBuilder::with_max_flight_data_size currently receives
usize::MAX; update both into_flight_data_stream and into_flight_data to call
.with_max_flight_data_size(MAX_FLIGHT_FRAME_SIZE) so the Arrow IPC encoder
chunks data consistently with gRPC's frame size instead of allowing unlimited
accumulation.

nikhilsinhaparseable and others added 21 commits June 24, 2026 14:34
* fix: hottier downloads

Make hottier downloads streaming

* replace streaming with parallel downloads

* new task per range

* expose new env vars

* parallel file download per stream

* concurrent writes instead of mutex

* crash safety by using .partial file

* separate out historic and latest hottier tasks

* add logs

* two loops instead of clockwerk

* per-stream hottier tasks

* fix: deepsource, coderabbit suggestions

* hottier deletion bug

* x86 build + other fixes

* add traces to hottier

* Updates: reduce object-store calls, limitstore for metastore

* try hottier abort

* fix: New runtime for hottier

Running multiple parallel chunked downloads on the main runtime resulted in a slowdown in other incoming requests.
This was mitigated by creating a new runtime for hottier downloads.

* coderabbit + deepsource

* server startup function
if env OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_TRACES_ENDPOINT 
set to `stdout` export the traces to stdout
…arseablehq#1648)

add series hash in metrics ingestion, bounded streaming merge and
sort by metric name for otel-metrics stream
- DiskWriter and MemWriter expect and unwrap replaced
- New cli env var `P_DATAFUSION_TARGET_PARTITIONS` for controlling number of partitions (default num cpu / 2)
- Streaming response uses unbounded channel now

---------

Co-authored-by: Nikhil Sinha <nikhil.sinha@parseable.com>
Co-authored-by: AdheipSingh <adheip@parseable.com>
Signed-off-by: Nitish Tiwari <nitish@parseable.com>
Addressed with a smaller invariant-based implementation. 
Static S3 credentials and IRSA web identity values are 
now validated as pairs


fixes parseablehq#1646
* cli var for dataset stats

* perf: batch staging arrow writes

  Batch disk staging writes per output filename before writing to
  Arrow IPC. This reduces per-request writer mutex hold time and cuts
  the number of DiskWriter::write calls during high-volume OTEL metrics
  ingest.

  Also keep targeted hotpath probes around ingest, JSON conversion,
  staging, and parquet conversion paths, and skip object-store sync work
  for streams without local parquet/schema files.

* perf: improve otel metrics ingest path

  Batch staging arrow writes per output file and prepare OTEL metric
  parquet row groups off-thread before sequential writes. This reduces
  request-path writer contention and hides row-group concat/sort work
  behind merge/write.

  Reduce JSON allocation churn in OTEL metric flattening and generic
  flattening by reusing owned maps, pre-sizing containers, inserting
  attributes/exemplars directly, and avoiding per-row known-field set
  construction for series hashing.

  Also guard shutdown local sync against concurrent local sync cycles and
  avoid panicking on transient arrow-file metadata races.

* added hotpath as a feature flag

* fix: coderabbit suggestions
* fix: commit schema bug

* fix: flush pending batches to disk, schema mutex

pending batches by default get pushed to disk near instantly

schema writer is behind a mutex to prevent incorrect schemas

* fix: deepsource and coderabbit
* Investigations for ingestion optimization

Try to find more areas for optimization.
- compress arrow files while writing (lz4_frame or zstd)
- create new .part file based on size
- one parquet per arrow file (converted in parallel)
- separate runtimes to run ingestion tasks, sync and conversion tasks

* add env vars for sort pruning and parquet creation

* revert pruning env var
* Remove async

- make MemWriter optional
- remove unnecessary async

* separate ingestion runtime

* bump MSRV

* make MemWriter optional

* docker image versions bump
…d querier (parseablehq#1673)

---------

Co-authored-by: AdheipSingh <adheip@parseable.com>
parmesant and others added 25 commits June 24, 2026 14:34
* fix: deadlock in metric pruning

in case schema hashes grow beyond NUM_CPU, deadlock can occur during sort and prune

* chunk arrow files for conversion

* coderabbit suggestion
calculate field stats directly from parquet record batches
remove old datafusion table path logic

add bounded high-cardinality handling
per-file stats IDs for correct flattened-row aggregation

improve logging and keep partial stats when individual parquet batches fail
* update ingestion script to use tenant id and api key

* update ps1 script
* chore: reuse time bin logic across apis

remove binning logic in counts api
make reusable component
reuse in counts api, errors api, agent-observability related apis

* add validations

* fix bin logic for the last bin
1. on fresh deployment - server startup - when alerts and targets load
2. on delete stream - at query node - delete stats
3. on delete stream - at query node - delete staging
4. get_objects in object store
)

* track insertion time instead of data time for eviction

* Use DashMap for concurrent cache access
- Bump versions of multiple crates
- Introduce new env var `P_SQL_TIMEOUT` to let users control timeout for SQL execution (defaults to 300s)
* dashboard: allow optional fields in dashboard json

currently we allow optional fields for each tile in the dashboard
this change allows fields to be added from caller in the dashboard section also

useful when client adds more feature to dashboard like - settings, variables etc

* update dashboard fix
in standalone mode, the `.schema` file from staging wasn't getting pushed to storage due to the improper usage of `path().extension()` function.
It was always returning `None`. Instead of checking whether the extension is `schema`, check if the file path ends with `.schema`
* feat: log context api

request -
```
{
    "stream": "teststream",
    "contextWindow": "1m",
    "pTimestamp": "2026-06-18T07:39:59.995Z",
    "pageSize": 500,
    "message": "Application started",
    "conditions": {
        "operator": "and",
        "groups": [
            {
                "operator": "or",
                "conditionConfig": [
                    {
                        "column": "level",
                        "operator": "=",
                        "value": "warn",
                        "type": "text"
                    }
                ]
            }
        ]
    }
}
```

response -
```
{
    "scope": "contextWindow",
    "contextStartTime": "2026-06-18T07:38:00Z",
    "contextEndTime": "2026-06-18T07:40:00Z",
    "limit": 500,
    "anchorIndex": 10,
    "duplicateAnchorCount": 15,
    "anchoredDuplicate": "first",
    "records": [
        {
            "app_meta": "okcequedfmkqlgzheaidrcce",
            "device_id": 126.0,
            "host": "172.162.1.120",
            "level": "warn",
            "location": "uqwetjbuvjameflh",
            "message": "Application is failing",
            "meta-containerimage": "ghcr.io/parseablehq/quest",
            "meta-containername": "log-generator",
            "meta-host": "10.116.0.3",
            "meta-namespace": "go-apasdp",
            "meta-podlabels": "app=go-app,pod-template-hash=6c87bc9cc9",
            "meta-source": "quest-test",
            "os": "Windows",
            "p_src_ip": "127.0.0.1",
            "p_timestamp": "2026-06-18T07:39:59.995",
            "p_user_agent": "Grafana k6/1.6.1",
            "request_body": "vlywlgkpmciorkiklfruxcfnzaspahyscsazpmnqgquqrtahrzhmtojwvackzcqngscesuadnupwpdsryfrvlifembjotnftzuwx",
            "session_id": "pqr",
            "source_time": "2026-06-18T07:39:59.991",
            "status_code": 500.0,
            "user_id": 98513.0,
            "uuid": "169fa593-fa27-4625-8576-1faab8b9cc71",
            "version": "1.2.0"
        }
    ],
    "queries": {
        "previous": {
            "query": "SELECT * FROM (SELECT * FROM \"teststream\" WHERE ((\"p_timestamp\" >= TIMESTAMP '2026-06-18 07:38:00.000' AND \"p_timestamp\" < TIMESTAMP '2026-06-18 07:40:00.000') AND ((\"level\" = 'warn'))) AND (\"p_timestamp\" > TIMESTAMP '2026-06-18 07:39:59.995' OR (\"p_timestamp\" = TIMESTAMP '2026-06-18 07:39:59.995' AND \"message\" < 'Application is failing')) ORDER BY \"p_timestamp\" ASC, \"message\" DESC LIMIT 500) AS log_context_seek_page ORDER BY \"p_timestamp\" DESC, \"message\" ASC",
            "startTime": "2026-06-18T07:38:00Z",
            "endTime": "2026-06-18T07:40:00Z",
            "sendNull": false
        },
        "next": {
            "query": "SELECT * FROM \"teststream\" WHERE ((\"p_timestamp\" >= TIMESTAMP '2026-06-18 07:38:00.000' AND \"p_timestamp\" < TIMESTAMP '2026-06-18 07:40:00.000') AND ((\"level\" = 'warn'))) AND (\"p_timestamp\" < TIMESTAMP '2026-06-18 07:39:59.662' OR (\"p_timestamp\" = TIMESTAMP '2026-06-18 07:39:59.662' AND \"message\" > 'Logging a request')) ORDER BY \"p_timestamp\" DESC, \"message\" ASC LIMIT 500",
            "startTime": "2026-06-18T07:38:00Z",
            "endTime": "2026-06-18T07:40:00Z",
            "sendNull": false
        }
    }
}
```

* query string to have limit and offset

* clippy fix
* update: remove historic sync from hottier

- Removed historic hottier sync task
- Removed env vars `P_HISTORIC_PER_TICK_CAP` and `P_HOT_TIER_HISTORIC_SYNC_MINUTES`
- Resolved a TOCTOU in hottier tasks addition

* remove
* add optional ingestion quota to tenant metadata

* clippy fix
context api request to have support to send either of -
1. context window (1m, 5m, 10m etc)
2. context start and end time

add validations -
1. p_timestamp of the reference log should be within the context window
2. window and start/end time - both should not be present in the request
3. start <= end time
This patch re-enables masking with a few backend changes
- Add mask_url() using url based parsing
- Apply consistent masking across Slack, Webhook, and AlertManager types.
- Add strict unit tests enforcing that secrets never hit the serializer.
* fix windows ingest script

* use PSScriptRoot join path
the download artifacts should look at Parseable_OSS* binaries only
so adding a filter to download only Parseable_OSS_* files and
skip other intermediate files like *.dockerbuild
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants