Assessment: Implement L1 Filtering and Post-Processing#895
Conversation
…icate detection - Added L1 pipeline orchestrator to run topic relevance and duplicate detection filters in series. - Introduced duplicate detection logic to filter out vague submissions and check for duplicates against a corpus. - Created topic relevance filter to assess the relevance of submissions based on user-defined criteria. - Integrated L1 results handling in the assessment service, allowing for conditional processing based on L1 outcomes. - Updated export functionality to include L1 results in the output, enhancing the assessment reporting capabilities. - Added Celery task for executing the L1 pipeline and managing assessment run statuses.
📝 WalkthroughWalkthroughThis PR introduces a complete L1 (topic relevance and duplicate detection) prefilter pipeline for assessment runs, combined with post-processing utilities for export filtering/sorting, async task-based orchestration, and per-item attachment type detection. The changes span data models, CRUD operations, prefilter modules, export integration, API endpoints, and comprehensive test coverage. ChangesL1 Assessment Pipeline & Post-Processing
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…plicate detection
…ed type detection and improved utility functions
…ctor batch submission to Celery task
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (7)
backend/app/crud/assessment/batch.py (1)
306-316: 💤 Low valueDocstring is missing documentation for new parameters.
The
preloaded_rowsandrow_indicesparameters are not documented in the docstring.📝 Suggested docstring update
Args: session: Database session run: The AssessmentRun to process dataset: The dataset to read rows from config_blob: Resolved configuration blob assessment_input: Assessment input config (prompt_template, text_columns, etc.) organization_id: Organization ID project_id: Project ID + preloaded_rows: Pre-filtered rows to use instead of loading from dataset (e.g., post-L1) + row_indices: Original row indices for custom_id/key mapping when using filtered rows🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/crud/assessment/batch.py` around lines 306 - 316, The docstring for the function in backend/app/crud/assessment/batch.py is missing entries for the new parameters preloaded_rows and row_indices; update the Args section to document both: describe preloaded_rows as an optional sequence/list of pre-fetched dataset rows provided to avoid re-reading from the dataset (include expected element type, e.g., dict/Row), and describe row_indices as an optional sequence/list of integer indices specifying which dataset rows to process from the dataset/preloaded_rows; keep wording consistent with the existing entries (session, run, dataset, config_blob, assessment_input, organization_id, project_id) and ensure the new parameter descriptions include expected types and behavior when omitted.backend/app/services/assessment/utils/attachments.py (1)
100-114: ⚖️ Poor tradeoffConsider adding HEIC/HEIF magic byte detection.
The
_IMAGE_MIME_BY_EXTdictionary includes.heicand.heifextensions, but_image_mime_from_magicdoesn't detect these formats via magic bytes. HEIC/HEIF files start with aftypbox containing brand identifiers likeheic,heix,mif1.This means HEIC URLs without extensions will fail magic-byte detection and fall back to Content-Type probing, which may not always be reliable.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/utils/attachments.py` around lines 100 - 114, The _image_mime_from_magic function is missing HEIC/HEIF detection; extend it to detect the ISO BMFF 'ftyp' box by checking blob length (>=12), that blob[4:8] == b'ftyp', and that the brand field (e.g., blob[8:12] and/or subsequent bytes) matches known HEIC/HEIF brands like b'heic', b'heix', b'hevc', b'hevx', b'mif1' (or similar), and return the appropriate HEIF mime (e.g., "image/heif") when matched; this aligns magic detection with the _IMAGE_MIME_BY_EXT entries .heic/.heif so files without extensions are recognized.backend/app/core/config.py (1)
177-179: ⚡ Quick winTenant-specific store id as a global default is risky.
fileSearchStores/inquilabcorpus-782mxjcwisazlooks like a corpus belonging to a specific tenant. Shipping it as the built-in default means any deployment that forgets to set this env var silently runs duplicate detection against that corpus. Consider leaving the default empty and failing fast (or gating L1) when it's unset.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/core/config.py` around lines 177 - 179, The constant ASSESSMENT_L1_DUPLICATE_STORE_NAME currently hardcodes a tenant-specific store id; change its default to an empty string (or None) and add a startup/config validation that fails fast or disables/gates L1 duplicate detection if ASSESSMENT_L1_DUPLICATE_STORE_NAME is unset. Update the definition of ASSESSMENT_L1_DUPLICATE_STORE_NAME in config.py to be empty and add a clear validation check in the app initialization path (e.g., config validation or startup/init_assessment logic) that raises an error or disables L1 when the value is empty.backend/app/services/assessment/tasks.py (1)
13-13: 💤 Low valueImporting private function
_load_dataset_rowsfrom another module.The function
_load_dataset_rowsis prefixed with underscore, indicating it's intended as a module-private implementation detail. Importing it across module boundaries couples this code to an internal API that could change without notice.Consider either removing the underscore prefix in the batch module to make it public, or adding a public wrapper function for cross-module use.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/tasks.py` at line 13, The tasks.py module is importing the private function _load_dataset_rows from app.crud.assessment.batch; update usage to rely on a public API instead: add a public wrapper (e.g., load_dataset_rows) or rename the function in app.crud.assessment.batch to remove the leading underscore, then change the import in backend/app/services/assessment/tasks.py to import load_dataset_rows (and keep submit_assessment_batch as-is), ensuring callers reference the new public symbol rather than the module-private _load_dataset_rows.backend/app/services/assessment/utils/export.py (1)
674-696: 💤 Low valuePost-processing config not passed in multi-run zip export.
When exporting multiple runs as a zip file,
serialize_export_rowsis called withoutpost_processing_config(line 684), meaning computed columns, filtering, and sorting won't be applied. This may be intentional since each run could have different configs, but if users expect post-processing to apply consistently, this could be surprising.Consider whether post-processing should be applied per-run in the zip export path, or document this limitation.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/utils/export.py` around lines 674 - 696, The zip export omits post-processing because serialize_export_rows is called without post_processing_config for each run; update the multi-run export loop (the code iterating over runs_with_rows) to pass the appropriate post_processing_config into serialize_export_rows so computed columns/filtering/sorting are applied per-run (e.g., serialize_export_rows(rows, export_format, post_processing_config=post_processing_config)), or if per-run configs are needed, derive a per-run post_processing_config and pass that instead; ensure the variable name you pass matches the surrounding export function's parameter or local (post_processing_config) and keep export_format and file naming logic unchanged.backend/app/services/assessment/utils/post_processing.py (1)
144-158: ⚡ Quick winLoop variable
descnot bound in closure.The static analysis warning is valid:
descis captured by reference from the enclosing loop. While this works correctly here becausesorted()evaluates keys immediately before the next iteration, it's fragile and can break if the code is refactored (e.g., ifsort_keywere stored for later use).Bind
descas a default argument like_colalready is:♻️ Proposed fix
- def sort_key(row: dict[str, Any], _col: str = col) -> tuple: + def sort_key(row: dict[str, Any], _col: str = col, _desc: bool = desc) -> tuple: val = row.get(_col) if val is None: return (1, 0, "") try: - return (0, -float(val) if desc else float(val), "") + return (0, -float(val) if _desc else float(val), "") except (TypeError, ValueError): s = str(val).lower() return ( (0, 0, s) - if not desc - else (0, 0, "".join(chr(0x10FFFF - ord(c)) for c in s)) + if not _desc + else (0, 0, "".join(chr(0x10FFFF - ord(c)) for c in s)) )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/utils/post_processing.py` around lines 144 - 158, The closure sort_key captures the loop variable desc by reference which is fragile; update sort_key signature to bind desc as a default parameter (similar to _col) so it captures the current boolean value at definition time (e.g., def sort_key(row: dict[str, Any], _col: str = col, _desc: bool = desc): ...) and then use _desc inside the function when negating or inverting string order; keep the sorted(result, key=sort_key) call unchanged.backend/app/services/assessment/l1/pipeline.py (1)
121-135: Consider Gemini rate limits withASSESSMENT_L1_CONCURRENT_WORKERS.Both filters fan out one Gemini request per row at
workersconcurrency with no client-side throttling or retry/backoff. On large datasets this can trip provider rate limits, surfacing as per-row errors (ACCEPT/ERROR defaults) that silently degrade L1 quality. Worth bounding concurrency relative to the model's quota or adding backoff on 429s.Also applies to: 166-179
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/l1/pipeline.py` around lines 121 - 135, The concurrent ThreadPoolExecutor fan-out calling run_topic_relevance (and the similar block that submits run_answer_reasoning) can exceed Gemini rate limits; change the concurrency model to use a bounded semaphore or token bucket so actual in-flight Gemini requests are limited (e.g., cap at a safe value derived from ASSESSMENT_L1_CONCURRENT_WORKERS and model quota) and instrument request-level retries with exponential backoff and jitter for 429/5xx responses inside the Gemini call site used by run_topic_relevance (and the other submitted function), or wrap the client calls in a retry helper/ decorator that catches rate-limit responses and retries with backoff and a max attempts counter; update both executor submission sites to acquire the semaphore/token before submitting or perform semaphore acquisition inside the worker functions to ensure global throttling.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/app/celery/tasks/job_execution.py`:
- Around line 235-257: The task run_assessment_run is missing the gevent timeout
decorator used elsewhere (e.g., run_tts_result_processing); update
run_assessment_run to include
`@gevent_timeout`(settings.CELERY_TASK_SOFT_TIME_LIMIT, ...) above the task
definition (or provide a code comment justification in the function explaining
why gevent-based timeout/diagnostics are intentionally omitted) so timeout
behavior and logging are consistent with other tasks using gevent_timeout.
In `@backend/app/crud/assessment/batch.py`:
- Around line 174-178: Validate that row_indices, when provided, has the same
length as rows before the loop that iterates rows (the block initializing
type_cache and the for i, row in enumerate(rows) loop), and raise a clear
ValueError (or similar) if lengths differ; add the same check to the
build_google_jsonl function where row_indices is also accepted to prevent
IndexError or unused indices. Specifically, check if row_indices is not None and
len(row_indices) != len(rows) and fail early with a descriptive message
referencing rows and row_indices so callers must supply matching-length lists.
In `@backend/app/services/assessment/l1/duplicate_detection.py`:
- Around line 96-108: The generate_content call in duplicate_detection.py
(gemini_client.models.generate_content) lacks an http timeout and can hang when
file-search is slow; update this call to pass
http_options=types.HttpOptions(timeout=...) (same timeout value used in the
other fixes) so that _call_file_search and _check_vague have bounded HTTP
timeouts when dispatching concurrent file-search requests.
In `@backend/app/services/assessment/l1/topic_relevance.py`:
- Line 90: The response_schema currently uses lowercase OpenAPI type strings
(see output_schema) which the google-genai SDK expects in its internal
enum/casing (e.g., "OBJECT", "BOOLEAN"); update the construction of
output_schema used for response_schema so type values use the SDK's Type
constants (e.g., genai.types.Type.OBJECT/BOOLEAN) or the exact uppercase strings
the SDK requires, ensuring nested properties and arrays are converted likewise;
locate where output_schema is defined and replace lowercase
"object"/"boolean"/"array"/"string"/"integer" with the corresponding genai Type
constants or uppercase enum names so google-genai validation/structured output
will apply correctly.
- Around line 84-93: Add a per-request Gemini timeout and guard ThreadPool
futures: introduce ASSESSMENT_L1_REQUEST_TIMEOUT_MS in core/config.py (default
value in milliseconds), then pass an HttpOptions(timeout=...) to
gemini_client.models.generate_content calls in topic_relevance.py and both call
sites in duplicate_detection.py (convert ms -> seconds if HttpOptions expects
seconds), and update pipeline.py to call fut.result(timeout=...) using the same
timeout (converted to seconds) so ThreadPoolExecutor futures won't hang
indefinitely; reference the gemini_client.models.generate_content call sites,
types.HttpOptions (or HttpOptions), the ASSESSMENT_L1_REQUEST_TIMEOUT_MS config
symbol, and the fut.result(timeout=...) change in pipeline.py.
In `@backend/app/services/assessment/utils/attachments.py`:
- Around line 180-209: The _probe_url_type function currently calls requests.get
on user-provided URLs without SSRF protections; before making the request (and
before using _drive_file_id result) validate the URL: enforce allowed schemes
(e.g., require https), resolve the hostname to IP and block
localhost/private/reserved ranges (IPv4/IPv6), and check against an allowlist of
approved hosts or domains; if validation fails log and return None. Also disable
or tightly limit redirects (set allow_redirects=False or a small max), and
ensure the validation is re-run on redirected locations if redirects are
allowed.
---
Nitpick comments:
In `@backend/app/core/config.py`:
- Around line 177-179: The constant ASSESSMENT_L1_DUPLICATE_STORE_NAME currently
hardcodes a tenant-specific store id; change its default to an empty string (or
None) and add a startup/config validation that fails fast or disables/gates L1
duplicate detection if ASSESSMENT_L1_DUPLICATE_STORE_NAME is unset. Update the
definition of ASSESSMENT_L1_DUPLICATE_STORE_NAME in config.py to be empty and
add a clear validation check in the app initialization path (e.g., config
validation or startup/init_assessment logic) that raises an error or disables L1
when the value is empty.
In `@backend/app/crud/assessment/batch.py`:
- Around line 306-316: The docstring for the function in
backend/app/crud/assessment/batch.py is missing entries for the new parameters
preloaded_rows and row_indices; update the Args section to document both:
describe preloaded_rows as an optional sequence/list of pre-fetched dataset rows
provided to avoid re-reading from the dataset (include expected element type,
e.g., dict/Row), and describe row_indices as an optional sequence/list of
integer indices specifying which dataset rows to process from the
dataset/preloaded_rows; keep wording consistent with the existing entries
(session, run, dataset, config_blob, assessment_input, organization_id,
project_id) and ensure the new parameter descriptions include expected types and
behavior when omitted.
In `@backend/app/services/assessment/l1/pipeline.py`:
- Around line 121-135: The concurrent ThreadPoolExecutor fan-out calling
run_topic_relevance (and the similar block that submits run_answer_reasoning)
can exceed Gemini rate limits; change the concurrency model to use a bounded
semaphore or token bucket so actual in-flight Gemini requests are limited (e.g.,
cap at a safe value derived from ASSESSMENT_L1_CONCURRENT_WORKERS and model
quota) and instrument request-level retries with exponential backoff and jitter
for 429/5xx responses inside the Gemini call site used by run_topic_relevance
(and the other submitted function), or wrap the client calls in a retry helper/
decorator that catches rate-limit responses and retries with backoff and a max
attempts counter; update both executor submission sites to acquire the
semaphore/token before submitting or perform semaphore acquisition inside the
worker functions to ensure global throttling.
In `@backend/app/services/assessment/tasks.py`:
- Line 13: The tasks.py module is importing the private function
_load_dataset_rows from app.crud.assessment.batch; update usage to rely on a
public API instead: add a public wrapper (e.g., load_dataset_rows) or rename the
function in app.crud.assessment.batch to remove the leading underscore, then
change the import in backend/app/services/assessment/tasks.py to import
load_dataset_rows (and keep submit_assessment_batch as-is), ensuring callers
reference the new public symbol rather than the module-private
_load_dataset_rows.
In `@backend/app/services/assessment/utils/attachments.py`:
- Around line 100-114: The _image_mime_from_magic function is missing HEIC/HEIF
detection; extend it to detect the ISO BMFF 'ftyp' box by checking blob length
(>=12), that blob[4:8] == b'ftyp', and that the brand field (e.g., blob[8:12]
and/or subsequent bytes) matches known HEIC/HEIF brands like b'heic', b'heix',
b'hevc', b'hevx', b'mif1' (or similar), and return the appropriate HEIF mime
(e.g., "image/heif") when matched; this aligns magic detection with the
_IMAGE_MIME_BY_EXT entries .heic/.heif so files without extensions are
recognized.
In `@backend/app/services/assessment/utils/export.py`:
- Around line 674-696: The zip export omits post-processing because
serialize_export_rows is called without post_processing_config for each run;
update the multi-run export loop (the code iterating over runs_with_rows) to
pass the appropriate post_processing_config into serialize_export_rows so
computed columns/filtering/sorting are applied per-run (e.g.,
serialize_export_rows(rows, export_format,
post_processing_config=post_processing_config)), or if per-run configs are
needed, derive a per-run post_processing_config and pass that instead; ensure
the variable name you pass matches the surrounding export function's parameter
or local (post_processing_config) and keep export_format and file naming logic
unchanged.
In `@backend/app/services/assessment/utils/post_processing.py`:
- Around line 144-158: The closure sort_key captures the loop variable desc by
reference which is fragile; update sort_key signature to bind desc as a default
parameter (similar to _col) so it captures the current boolean value at
definition time (e.g., def sort_key(row: dict[str, Any], _col: str = col, _desc:
bool = desc): ...) and then use _desc inside the function when negating or
inverting string order; keep the sorted(result, key=sort_key) call unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 133027e3-00b7-48eb-985b-ce88b25541f8
📒 Files selected for processing (25)
backend/app/alembic/versions/064_add_l1_columns_to_assessment_run.pybackend/app/api/docs/assessment/update_post_processing.mdbackend/app/api/routes/assessment/runs.pybackend/app/celery/tasks/job_execution.pybackend/app/core/config.pybackend/app/crud/assessment/__init__.pybackend/app/crud/assessment/batch.pybackend/app/crud/assessment/core.pybackend/app/crud/assessment/cron.pybackend/app/models/assessment.pybackend/app/services/assessment/l1/__init__.pybackend/app/services/assessment/l1/duplicate_detection.pybackend/app/services/assessment/l1/pipeline.pybackend/app/services/assessment/l1/topic_relevance.pybackend/app/services/assessment/service.pybackend/app/services/assessment/tasks.pybackend/app/services/assessment/utils/attachments.pybackend/app/services/assessment/utils/export.pybackend/app/services/assessment/utils/post_processing.pybackend/app/tests/assessment/test_batch.pybackend/app/tests/assessment/test_cron.pybackend/app/tests/assessment/test_crud.pybackend/app/tests/assessment/test_export.pybackend/app/tests/assessment/test_service.pybackend/app/tests/assessment/test_topic_relevance.py
| # Memoize per-item type probes across all rows in this build. | ||
| type_cache: dict[str, str] = {} | ||
|
|
||
| for idx, row in enumerate(rows): | ||
| for i, row in enumerate(rows): | ||
| idx = row_indices[i] if row_indices is not None else i |
There was a problem hiding this comment.
Missing length validation between rows and row_indices.
When row_indices is provided, the code assumes len(row_indices) == len(rows). If they differ, row_indices[i] could raise IndexError (if row_indices is shorter) or some indices could be silently unused (if longer).
Consider adding a validation check:
🛡️ Proposed validation
def build_openai_jsonl(
rows: list[dict[str, str]],
text_columns: list[str],
attachments: list[AssessmentAttachment],
prompt_template: str | None,
openai_params: dict,
row_indices: list[int] | None = None,
) -> list[dict[str, Any]]:
+ if row_indices is not None and len(row_indices) != len(rows):
+ raise ValueError(
+ f"row_indices length ({len(row_indices)}) must match rows length ({len(rows)})"
+ )
jsonl_data = []Apply the same pattern to build_google_jsonl.
Also applies to: 237-238
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/app/crud/assessment/batch.py` around lines 174 - 178, Validate that
row_indices, when provided, has the same length as rows before the loop that
iterates rows (the block initializing type_cache and the for i, row in
enumerate(rows) loop), and raise a clear ValueError (or similar) if lengths
differ; add the same check to the build_google_jsonl function where row_indices
is also accepted to prevent IndexError or unused indices. Specifically, check if
row_indices is not None and len(row_indices) != len(rows) and fail early with a
descriptive message referencing rows and row_indices so callers must supply
matching-length lists.
| config=types.GenerateContentConfig( | ||
| system_instruction=user_prompt.strip(), | ||
| response_mime_type="application/json", | ||
| response_schema=output_schema, |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
google-genai response_schema accept plain dict OpenAPI lowercase type
💡 Result:
When using a plain dictionary for the response_schema in the google-genai SDK, the type field must be written in uppercase (e.g., 'OBJECT', 'STRING', 'INTEGER') [1][2]. While standard JSON Schema and OpenAPI specifications typically use lowercase types (e.g., 'object', 'string', 'integer'), the Google Gen AI SDK’s internal schema representation expects uppercase values [1][2]. If you provide a schema dictionary with lowercase types, it may lead to validation errors or unexpected behavior as the SDK expects the specific enum values defined by its internal Type class [3][1]. To ensure correct behavior, follow these practices: - Use uppercase strings for the type field in your schema dictionary (e.g., 'type': 'OBJECT', 'type': 'STRING') [1][2]. - Alternatively, you can use Pydantic models in the response_schema parameter, which allows the SDK to handle the schema definition and type mapping automatically, avoiding manual dictionary formatting issues [4][2]. - If you must use a raw dictionary, you can reference the genai.types.Type constants if using the Python SDK, or simply ensure your dictionary structure aligns with the required uppercase string values [3][1][2]. As of the current SDK implementation, providing a plain dictionary with lowercase types will not be correctly interpreted as a valid schema, as the SDK specifically expects uppercase type identifiers [1][2].
Citations:
- 1: https://cloud.google.com/vertex-ai/generative-ai/docs/multimodal/control-generated-output
- 2: https://googleapis.github.io/python-genai/
- 3: https://medium.com/google-cloud/structured-output-with-gemini-models-begging-borrowing-and-json-ing-f70ffd60eae6
- 4: https://deepwiki.com/googleapis/python-genai/3.5-structured-outputs-and-response-schemas
Fix response_schema dict type values for google-genai
output_schema is a plain dict using lowercase OpenAPI types (e.g., "object", "boolean"). The google-genai SDK’s response_schema expects the schema type values in the SDK’s internal casing/enums (e.g., "OBJECT", "BOOLEAN"). Update the schema (or build it with genai.types / Type constants) so the type strings match what the SDK expects; otherwise validation/structured output may fail or not apply correctly.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/app/services/assessment/l1/topic_relevance.py` at line 90, The
response_schema currently uses lowercase OpenAPI type strings (see
output_schema) which the google-genai SDK expects in its internal enum/casing
(e.g., "OBJECT", "BOOLEAN"); update the construction of output_schema used for
response_schema so type values use the SDK's Type constants (e.g.,
genai.types.Type.OBJECT/BOOLEAN) or the exact uppercase strings the SDK
requires, ensuring nested properties and arrays are converted likewise; locate
where output_schema is defined and replace lowercase
"object"/"boolean"/"array"/"string"/"integer" with the corresponding genai Type
constants or uppercase enum names so google-genai validation/structured output
will apply correctly.
…ipeline orchestrator
| exc_info=True, | ||
| ) | ||
|
|
||
| from app.crud.assessment.core import update_assessment_run_l1_stats |
There was a problem hiding this comment.
do we need it here or can be moved to the top
| # Assessment | ||
| ASSESSMENT_L1_GEMINI_MODEL: str = "gemini-3.1-flash-lite" | ||
| ASSESSMENT_L1_CONCURRENT_WORKERS: int = 8 | ||
| ASSESSMENT_L1_DUPLICATE_STORE_NAME: str = ( |
There was a problem hiding this comment.
is this sensitive information? why we need this?
There was a problem hiding this comment.
can we use some sort of org level variables so this becomes scalable so in future another org want to use assessment they can update value and two org can run assessment via kaapi
There was a problem hiding this comment.
The Gemini file search ID itself is not sensitive information, since access is still protected via the API key. I do agree that ideally this should be managed at the org level. However, right now only Inquilab is using this setup, and I had built it specifically around Inquilab’s use case and workflow, so that’s why it currently exists in this form. But in future it would be dynamic to org level.
|
|
||
| """ | ||
|
|
||
| import json |
There was a problem hiding this comment.
this is unused, can you check other such unused imports
… detection - Added a new prefilter pipeline orchestrator that runs topic relevance and duplicate detection filters in series. - Created `run_topic_relevance` and `run_duplicate_detection` functions to handle respective filtering logic. - Updated assessment service to utilize prefilter configuration instead of L1 configuration. - Modified assessment tasks to reflect the new prefilter processing status and error handling. - Adjusted utility functions and export logic to accommodate prefilter results. - Enhanced tests to cover the new prefilter functionality and ensure proper integration.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/assessment/tasks.py (1)
45-52:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRun left in dangling state when parent assessment is missing.
When the parent
Assessmentis not found, the run remains in its current status (likelypending) without being marked as failed. This leaves an orphaned run that won't be picked up by cron polling or cleaned up.Proposed fix
assessment = session.get(Assessment, run.assessment_id) if assessment is None: logger.error( "[execute_assessment_run] parent assessment %s not found for run %s", run.assessment_id, run_id, ) + update_assessment_run_status( + session=session, + run=run, + status="failed", + error_message="Parent assessment not found.", + ) return🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/tasks.py` around lines 45 - 52, When the parent Assessment is missing in execute_assessment_run, the run (variable run) must be transitioned out of the dangling state: set run.status to a failure state (e.g., RunStatus.FAILED or equivalent), populate run.error or run.finished_at with a brief message/timestamp indicating "parent assessment not found", persist the change via session.add()/session.commit() (or session.flush()/session.commit() as per existing patterns), and then log the error; update the block that currently returns after logger.error to perform these updates on run and commit before returning.
🧹 Nitpick comments (6)
backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.py (1)
25-25: 💤 Low valueMinor typo in column comment.
The comment says "prefilter filter results" which has a redundant "filter".
Suggested fix
- comment="S3 URL of stored prefilter filter results JSON", + comment="S3 URL of stored prefilter results JSON",🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.py` at line 25, Fix the typo in the column comment inside the migration file 064_add_prefilter_columns_to_assessment_run.py by changing the comment string from "S3 URL of stored prefilter filter results JSON" to "S3 URL of stored prefilter results JSON" where the column is defined (the place that currently has comment="S3 URL of stored prefilter filter results JSON"); ensure you update the same comment text in both the upgrade and any corresponding downgrade/column definition blocks if duplicated.backend/app/tests/assessment/test_batch.py (1)
476-481: ⚡ Quick winPreserve the validator’s URL-return contract in these probe tests.
These patches replace
validate_callback_urlwith a bare mock, sodetect_item_typecan pass a mock object intorequests.getand the assertions still succeed. The redirect case also only checks call counts, so it would miss a regression where the revalidated URL is ignored. Please make the validator mock return the input URL (for example viaside_effect=lambda url: url) and assert the validated URL(s) passed torequests.get.Also applies to: 495-500, 507-512, 529-545, 559-564
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/assessment/test_batch.py` around lines 476 - 481, The tests patching app.services.assessment.utils.attachments.validate_callback_url must preserve its URL-return contract: change those bare mocks to return the input URL (e.g., use side_effect=lambda url: url) so detect_item_type receives a real URL string rather than a Mock; then update assertions to verify the validated URL(s) were passed into requests.get (inspect mock_get.call_args / call_args_list) instead of only checking call counts. Apply this change to the mocks around validate_callback_url and requests.get in the probe tests (the blocks referencing validate_callback_url and mock_get at the shown locations) so redirects and revalidation actually use the validated URL.backend/app/tests/assessment/test_crud.py (2)
321-329: ⚡ Quick winVerify
flag_modifiedin theinput=Nonepath.This branch is the one most likely to miss JSON dirty tracking, but the test patches
flag_modifiedwithout asserting it was called. Checkingflag_modified(run, "input")here would lock down the persistence contract for the newly initialized blob.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/assessment/test_crud.py` around lines 321 - 329, The test test_none_input_handled currently patches flag_modified but doesn't assert it was invoked; update the test so after calling update_run_post_processing_config(session=session, run=run, config=None) you assert flag_modified was called with the run object and the "input" attribute (e.g., assert flag_modified.called and/or assert flag_modified.call_args showing (run, "input")), keeping the existing patch of flag_modified and verifying the persistence contract for update_run_post_processing_config.
227-246: ⚡ Quick winAssert the new prefilter counters in
build_run_stats.This fixture only proves the extra fields do not crash when they are
None. It will still pass ifbuild_run_statssilently dropsprefilter_total_rows,prefilter_total_passed, orprefilter_total_rejected. Please add a case with concrete values and assert they are exposed onstats[0].🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/assessment/test_crud.py` around lines 227 - 246, The test currently only covers None prefilter counters; add a second run in test_build_run_stats with concrete integer values for prefilter_total_rows, prefilter_total_passed, and prefilter_total_rejected so build_run_stats must preserve them; then assert on stats entry (e.g., stats[1] or the matching run object) that .prefilter_total_rows, .prefilter_total_passed, and .prefilter_total_rejected equal the expected integers to ensure build_run_stats exposes these fields unchanged. Ensure you reference the same SimpleNamespace fixture and the build_run_stats function when adding the new case.backend/app/tests/assessment/test_pipeline.py (2)
98-123: ⚡ Quick winCover original row-index preservation in the duplicate-detection path.
This case only exercises a surviving row at index
0, so it cannot catch a regression where the pipeline renumbers passed rows before callingrun_duplicate_detection. Since the downstream batch/export flow depends on original row indices, please add a case where an earlier row is filtered out and assert the duplicate-detection call still receives the original index.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/assessment/test_pipeline.py` around lines 98 - 123, Add a second scenario in test_duplicate_detection_runs_on_passed_rows where an earlier row is filtered out so the surviving row is not at index 0 (e.g., three rows where the first is irrelevant and the second survives); call run_prefilter_pipeline the same way and assert dup_mock (from _patches) was called with the original row identifier (e.g., "row_1" or matching the original index) and that results preserves the original index in duplicate_detection, ensuring run_duplicate_detection receives original row indices.
25-59: ⚡ Quick winAdd type hints to
_patches.
tr_side,dup_return, and the return value are currently untyped in changed Python code.As per coding guidelines,
**/*.py: Always add type hints to all function parameters and return values in Python code.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/assessment/test_pipeline.py` around lines 25 - 59, Add explicit type hints to the _patches function: annotate parameters tr_side and dup_return as Optional[Any] (or a more specific Union if you know the exact types used as side_effect/return), and annotate the return type as Tuple[MagicMock, MagicMock]; update the function signature (def _patches(stack: ExitStack, *, tr_side: Optional[Any] = None, dup_return: Optional[Any] = None) -> Tuple[MagicMock, MagicMock]:) and add the necessary imports (from typing import Optional, Any, Tuple and ensure MagicMock is imported from unittest.mock) so the function and its return are fully typed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@backend/app/services/assessment/tasks.py`:
- Around line 45-52: When the parent Assessment is missing in
execute_assessment_run, the run (variable run) must be transitioned out of the
dangling state: set run.status to a failure state (e.g., RunStatus.FAILED or
equivalent), populate run.error or run.finished_at with a brief
message/timestamp indicating "parent assessment not found", persist the change
via session.add()/session.commit() (or session.flush()/session.commit() as per
existing patterns), and then log the error; update the block that currently
returns after logger.error to perform these updates on run and commit before
returning.
---
Nitpick comments:
In `@backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.py`:
- Line 25: Fix the typo in the column comment inside the migration file
064_add_prefilter_columns_to_assessment_run.py by changing the comment string
from "S3 URL of stored prefilter filter results JSON" to "S3 URL of stored
prefilter results JSON" where the column is defined (the place that currently
has comment="S3 URL of stored prefilter filter results JSON"); ensure you update
the same comment text in both the upgrade and any corresponding downgrade/column
definition blocks if duplicated.
In `@backend/app/tests/assessment/test_batch.py`:
- Around line 476-481: The tests patching
app.services.assessment.utils.attachments.validate_callback_url must preserve
its URL-return contract: change those bare mocks to return the input URL (e.g.,
use side_effect=lambda url: url) so detect_item_type receives a real URL string
rather than a Mock; then update assertions to verify the validated URL(s) were
passed into requests.get (inspect mock_get.call_args / call_args_list) instead
of only checking call counts. Apply this change to the mocks around
validate_callback_url and requests.get in the probe tests (the blocks
referencing validate_callback_url and mock_get at the shown locations) so
redirects and revalidation actually use the validated URL.
In `@backend/app/tests/assessment/test_crud.py`:
- Around line 321-329: The test test_none_input_handled currently patches
flag_modified but doesn't assert it was invoked; update the test so after
calling update_run_post_processing_config(session=session, run=run, config=None)
you assert flag_modified was called with the run object and the "input"
attribute (e.g., assert flag_modified.called and/or assert
flag_modified.call_args showing (run, "input")), keeping the existing patch of
flag_modified and verifying the persistence contract for
update_run_post_processing_config.
- Around line 227-246: The test currently only covers None prefilter counters;
add a second run in test_build_run_stats with concrete integer values for
prefilter_total_rows, prefilter_total_passed, and prefilter_total_rejected so
build_run_stats must preserve them; then assert on stats entry (e.g., stats[1]
or the matching run object) that .prefilter_total_rows, .prefilter_total_passed,
and .prefilter_total_rejected equal the expected integers to ensure
build_run_stats exposes these fields unchanged. Ensure you reference the same
SimpleNamespace fixture and the build_run_stats function when adding the new
case.
In `@backend/app/tests/assessment/test_pipeline.py`:
- Around line 98-123: Add a second scenario in
test_duplicate_detection_runs_on_passed_rows where an earlier row is filtered
out so the surviving row is not at index 0 (e.g., three rows where the first is
irrelevant and the second survives); call run_prefilter_pipeline the same way
and assert dup_mock (from _patches) was called with the original row identifier
(e.g., "row_1" or matching the original index) and that results preserves the
original index in duplicate_detection, ensuring run_duplicate_detection receives
original row indices.
- Around line 25-59: Add explicit type hints to the _patches function: annotate
parameters tr_side and dup_return as Optional[Any] (or a more specific Union if
you know the exact types used as side_effect/return), and annotate the return
type as Tuple[MagicMock, MagicMock]; update the function signature (def
_patches(stack: ExitStack, *, tr_side: Optional[Any] = None, dup_return:
Optional[Any] = None) -> Tuple[MagicMock, MagicMock]:) and add the necessary
imports (from typing import Optional, Any, Tuple and ensure MagicMock is
imported from unittest.mock) so the function and its return are fully typed.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 9fd400ef-931b-4af1-b578-6e595abd868b
📒 Files selected for processing (22)
backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.pybackend/app/api/routes/assessment/runs.pybackend/app/celery/tasks/job_execution.pybackend/app/core/config.pybackend/app/crud/assessment/__init__.pybackend/app/crud/assessment/batch.pybackend/app/crud/assessment/core.pybackend/app/models/assessment.pybackend/app/services/assessment/prefilter/__init__.pybackend/app/services/assessment/prefilter/duplicate_detection.pybackend/app/services/assessment/prefilter/pipeline.pybackend/app/services/assessment/prefilter/topic_relevance.pybackend/app/services/assessment/service.pybackend/app/services/assessment/tasks.pybackend/app/services/assessment/utils/attachments.pybackend/app/services/assessment/utils/export.pybackend/app/tests/assessment/test_batch.pybackend/app/tests/assessment/test_crud.pybackend/app/tests/assessment/test_duplicate_detection.pybackend/app/tests/assessment/test_pipeline.pybackend/app/tests/assessment/test_post_processing.pybackend/app/tests/assessment/test_topic_relevance.py
✅ Files skipped from review due to trivial changes (1)
- backend/app/services/assessment/prefilter/init.py
🚧 Files skipped from review as they are similar to previous changes (6)
- backend/app/celery/tasks/job_execution.py
- backend/app/api/routes/assessment/runs.py
- backend/app/tests/assessment/test_topic_relevance.py
- backend/app/services/assessment/service.py
- backend/app/crud/assessment/batch.py
- backend/app/services/assessment/utils/attachments.py
Target issue is #904
Summary
What changed
PATCH /runs/{id}/post-processing.Checklist
Before submitting a pull request, please ensure that you mark these task.
fastapi run --reload app/main.pyordocker compose upin the repository root and test.