Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Add prefilter pipeline columns to assessment_run

Revision ID: 064
Revises: 063
Create Date: 2026-05-27 00:00:00.000000

"""

import sqlalchemy as sa
from alembic import op

revision = "064"
down_revision = "063"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"assessment_run",
sa.Column(
"prefilter_object_store_url",
sa.String(),
nullable=True,
comment="S3 URL of stored prefilter filter results JSON",
),
)
op.add_column(
"assessment_run",
sa.Column(
"prefilter_total_rows",
sa.Integer(),
nullable=True,
comment="Total rows fed into prefilter pipeline",
),
)
op.add_column(
"assessment_run",
sa.Column(
"prefilter_total_passed",
sa.Integer(),
nullable=True,
comment="Rows that passed topic relevance and went to L2",
),
)
op.add_column(
"assessment_run",
sa.Column(
"prefilter_total_rejected",
sa.Integer(),
nullable=True,
comment="Rows rejected by topic relevance, stopped at prefilter",
),
)


def downgrade() -> None:
op.drop_column("assessment_run", "prefilter_total_rejected")
op.drop_column("assessment_run", "prefilter_total_passed")
op.drop_column("assessment_run", "prefilter_total_rows")
op.drop_column("assessment_run", "prefilter_object_store_url")
15 changes: 15 additions & 0 deletions backend/app/api/docs/assessment/update_post_processing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Save post-processing config for a single assessment run.

Stores the config inside the run's `input` JSON blob (key
`post_processing_config`). It is applied at export/preview time and never
re-runs the LLM, so it can be edited after the run completes.

The config has three optional sections:

- `computed_columns`: derived columns from formulas, e.g.
`{"name": "Total_Score", "formula": "@Novelty_score + @Usefulness_score"}`.
Formulas reference columns with `@` and support `+ - * /` and parentheses.
- `filter`: row filters combined with AND logic.
- `sort`: sort rules applied in priority order.

Pass `null` (or an empty body) to clear post-processing for the run.
42 changes: 40 additions & 2 deletions backend/app/api/routes/assessment/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from typing import Any, Literal

from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Body, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse

from app.api.deps import AuthContextDep, SessionDep
Expand All @@ -12,6 +12,7 @@
get_assessment_by_id,
get_assessment_run_by_id as get_run_by_id,
list_assessment_runs as list_runs,
update_run_post_processing_config,
)
from app.models.assessment import (
Assessment,
Expand All @@ -33,6 +34,7 @@
load_export_rows_for_run,
sort_export_rows,
)
from app.services.assessment.utils.post_processing import apply_post_processing
from app.utils import APIResponse, load_description

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -65,6 +67,10 @@ def _build_run_public(
total_items=run.total_items,
error_message=run.error_message,
input=run.input,
prefilter_total_rows=run.prefilter_total_rows,
prefilter_total_passed=run.prefilter_total_passed,
prefilter_total_rejected=run.prefilter_total_rejected,
post_processing_config=(run.input or {}).get("post_processing_config"),
inserted_at=run.inserted_at,
updated_at=run.updated_at,
)
Expand Down Expand Up @@ -212,12 +218,44 @@ def export_assessment_run_results(
)
)

post_processing_config = (run.input or {}).get("post_processing_config") or None
base_label = assessment.experiment_name if assessment else f"run_{run.id}"

if export_format != "json":
return build_export_response(
export_rows=export_rows,
export_format=export_format,
base_name=f"{base_label}_run_{run.id}_results",
post_processing_config=post_processing_config,
)

return APIResponse.success_response(data=build_json_export_rows(export_rows))
rows = build_json_export_rows(export_rows)
rows = apply_post_processing(rows, post_processing_config)
return APIResponse.success_response(data=rows)


@router.patch(
"/runs/{run_id}/post-processing",
description=load_description("assessment/update_post_processing.md"),
response_model=APIResponse[AssessmentRunPublic],
dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))],
)
def update_post_processing(
run_id: int,
session: SessionDep,
auth_context: AuthContextDep,
config: dict[str, Any] | None = Body(default=None),
) -> APIResponse[AssessmentRunPublic]:
"""Save post-processing config (computed columns, sort, filter) for a run."""
run = get_run_by_id(
session=session,
run_id=run_id,
organization_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
)
if run is None:
raise HTTPException(status_code=404, detail="Run not found")

run = update_run_post_processing_config(session=session, run=run, config=config)

return APIResponse.success_response(data=_build_run_public(session, run))
23 changes: 23 additions & 0 deletions backend/app/celery/tasks/job_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,29 @@ def run_tts_batch_submission(
)


@celery_app.task(bind=True, queue="low_priority", priority=1)
@gevent_timeout(settings.ASSESSMENT_RUN_SOFT_TIME_LIMIT, "run_assessment_run")
def run_assessment_run(
self,
run_id: int,
organization_id: int,
project_id: int,
trace_id: str,
**kwargs,
):
from app.services.assessment.tasks import execute_assessment_run

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_assessment_run(
run_id=run_id,
organization_id=organization_id,
project_id=project_id,
),
)


@celery_app.task(bind=True, queue="low_priority", priority=1)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_tts_result_processing")
def run_tts_result_processing(
Expand Down
14 changes: 14 additions & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,20 @@ def AWS_S3_BUCKET(self) -> str:
DOC_TRANSFORMATION_PENDING_THRESHOLD_MINUTES: int = 30
PENDING_JOB_QUERY_TIMEOUT_MS: int = 1000

# Assessment
ASSESSMENT_PREFILTER_GEMINI_MODEL: str = "gemini-3.1-flash-lite"
ASSESSMENT_PREFILTER_CONCURRENT_WORKERS: int = 8
ASSESSMENT_PREFILTER_DUPLICATE_STORE_NAME: str = (
"fileSearchStores/inquilabcorpus-782mxjcwisaz"
)
# Soft timeout for the full assessment run task (prefilter pipeline + batch
# submission). Larger than the default task limit because prefilter runs many
# concurrent LLM calls over the whole dataset. Seconds. Default 2 hours.
ASSESSMENT_RUN_SOFT_TIME_LIMIT: int = 7200
# Timeout for prefilter Gemini calls to prevent pipeline stalls from slow/hung requests
# (default: 2 minutes, in ms)
ASSESSMENT_PREFILTER_REQUEST_TIMEOUT_MS: int = 120000

@computed_field # type: ignore[prop-decorator]
@property
def COMPUTED_CELERY_WORKER_CONCURRENCY(self) -> int:
Expand Down
4 changes: 4 additions & 0 deletions backend/app/crud/assessment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
list_assessment_runs,
list_assessments,
recompute_assessment_status,
update_assessment_run_prefilter_stats,
update_assessment_run_status,
update_run_post_processing_config,
)
from app.crud.assessment.dataset import (
create_assessment_dataset,
Expand Down Expand Up @@ -42,5 +44,7 @@
"list_assessment_datasets",
"list_assessments",
"recompute_assessment_status",
"update_assessment_run_prefilter_stats",
"update_assessment_run_status",
"update_run_post_processing_config",
]
90 changes: 23 additions & 67 deletions backend/app/crud/assessment/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,8 @@
normalize_llm_text,
)
from app.services.assessment.utils.attachments import (
build_gemini_attachment_parts,
resolve_attachment_values,
resolve_image_mime_and_payload,
split_attachment_urls,
split_data_url,
to_direct_attachment_url,
)
from app.services.llm.providers.registry import LLMProvider

Expand Down Expand Up @@ -161,6 +158,7 @@ def build_openai_jsonl(
attachments: list[AssessmentAttachment],
prompt_template: str | None,
openai_params: dict,
row_indices: list[int] | None = None,
) -> list[dict[str, Any]]:
"""Build OpenAI batch JSONL data from dataset rows.

Expand All @@ -173,8 +171,11 @@ def build_openai_jsonl(
}
"""
jsonl_data = []
# Memoize per-item type probes across all rows in this build.
type_cache: dict[str, str] = {}

for idx, row in enumerate(rows):
for i, row in enumerate(rows):
idx = row_indices[i] if row_indices is not None else i
Comment on lines +174 to +178
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Missing length validation between rows and row_indices.

When row_indices is provided, the code assumes len(row_indices) == len(rows). If they differ, row_indices[i] could raise IndexError (if row_indices is shorter) or some indices could be silently unused (if longer).

Consider adding a validation check:

🛡️ Proposed validation
 def build_openai_jsonl(
     rows: list[dict[str, str]],
     text_columns: list[str],
     attachments: list[AssessmentAttachment],
     prompt_template: str | None,
     openai_params: dict,
     row_indices: list[int] | None = None,
 ) -> list[dict[str, Any]]:
+    if row_indices is not None and len(row_indices) != len(rows):
+        raise ValueError(
+            f"row_indices length ({len(row_indices)}) must match rows length ({len(rows)})"
+        )
     jsonl_data = []

Apply the same pattern to build_google_jsonl.

Also applies to: 237-238

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/crud/assessment/batch.py` around lines 174 - 178, Validate that
row_indices, when provided, has the same length as rows before the loop that
iterates rows (the block initializing type_cache and the for i, row in
enumerate(rows) loop), and raise a clear ValueError (or similar) if lengths
differ; add the same check to the build_google_jsonl function where row_indices
is also accepted to prevent IndexError or unused indices. Specifically, check if
row_indices is not None and len(row_indices) != len(rows) and fail early with a
descriptive message referencing rows and row_indices so callers must supply
matching-length lists.

# Build input array
input_parts: list[dict[str, Any]] = []

Expand All @@ -186,7 +187,7 @@ def build_openai_jsonl(
# Attachments
for att in attachments:
cell_value = row.get(att.column, "")
input_parts.extend(resolve_attachment_values(cell_value, att))
input_parts.extend(resolve_attachment_values(cell_value, att, type_cache))

if not input_parts:
logger.warning("[build_openai_jsonl] Skipping empty row | idx=%s", idx)
Expand Down Expand Up @@ -219,6 +220,7 @@ def build_google_jsonl(
attachments: list[AssessmentAttachment],
prompt_template: str | None,
google_params: dict,
row_indices: list[int] | None = None,
) -> list[dict[str, Any]]:
"""Build Google (Gemini) batch JSONL data from dataset rows.

Expand All @@ -229,8 +231,11 @@ def build_google_jsonl(
}
"""
jsonl_data = []
# Memoize per-item type probes across all rows in this build.
type_cache: dict[str, str] = {}

for idx, row in enumerate(rows):
for i, row in enumerate(rows):
idx = row_indices[i] if row_indices is not None else i
parts: list[dict[str, Any]] = []

# Text prompt
Expand All @@ -240,64 +245,8 @@ def build_google_jsonl(

# Attachments (Gemini uses file_data for inline content)
for att in attachments:
cell_value = row.get(att.column, "").strip()
if not cell_value:
continue

cell_values = (
split_attachment_urls(cell_value)
if att.format == "url"
else [cell_value]
)

for item_value in cell_values:
normalized_value = (
to_direct_attachment_url(item_value, att.type)
if att.format == "url"
else item_value
)
if att.type == "image":
mime_type, payload = resolve_image_mime_and_payload(
normalized_value,
att.format,
)
if att.format == "url":
parts.append(
{
"fileData": {
"mimeType": mime_type,
"fileUri": normalized_value,
}
}
)
else:
parts.append(
{
"inlineData": {
"mimeType": mime_type,
"data": payload,
}
}
)
elif att.type == "pdf":
if att.format == "url":
parts.append(
{
"fileData": {
"mimeType": "application/pdf",
"fileUri": normalized_value,
}
}
)
else:
parts.append(
{
"inlineData": {
"mimeType": "application/pdf",
"data": split_data_url(normalized_value)[1],
}
}
)
cell_value = row.get(att.column, "")
parts.extend(build_gemini_attachment_parts(cell_value, att, type_cache))

if not parts:
logger.warning("[build_google_jsonl] Skipping empty row | idx=%s", idx)
Expand Down Expand Up @@ -349,6 +298,8 @@ def submit_assessment_batch(
assessment_input: dict[str, Any],
organization_id: int,
project_id: int,
preloaded_rows: list[dict[str, str]] | None = None,
row_indices: list[int] | None = None,
) -> BatchJob:
"""Build JSONL and submit a batch for one assessment run.

Expand All @@ -371,8 +322,11 @@ def submit_assessment_batch(
output_schema = assessment_input.get("output_schema")
attachments = [AssessmentAttachment(**a) for a in attachments_raw]

# Load dataset rows
rows = _load_dataset_rows(session, dataset)
# Use preloaded rows (post-prefilter filtered) if provided, else load from dataset.
if preloaded_rows is not None:
rows = preloaded_rows
else:
rows = _load_dataset_rows(session, dataset)
if not rows:
raise ValueError(f"Dataset {dataset.id} has no rows")

Expand Down Expand Up @@ -412,6 +366,7 @@ def submit_assessment_batch(
attachments=attachments,
prompt_template=prompt_template,
openai_params=mapped_params,
row_indices=row_indices,
)

# Get OpenAI client and submit
Expand Down Expand Up @@ -452,6 +407,7 @@ def submit_assessment_batch(
attachments=attachments,
prompt_template=prompt_template,
google_params=mapped_params,
row_indices=row_indices,
)

# Get Gemini client and submit
Expand Down
Loading
Loading