Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions data_rentgen/consumer/extractors/batch_extraction_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,18 @@ def __repr__(self):
@staticmethod
def _add(context: dict[tuple, T], new_item: T) -> T:
key = new_item.unique_key
if key in context:
old_item = context[key]
if old_item is new_item:
return old_item

merged_item = old_item.merge(new_item)
context[key] = merged_item
return merged_item

context[key] = new_item
return new_item
old_item = context.get(key)
if not old_item:
context[key] = new_item
return new_item

if old_item is new_item:
# optimization
return old_item

merged_item = old_item.merge(new_item)
context[key] = merged_item
return merged_item

def add_location(self, location: LocationDTO):
return self._add(self._locations, location)
Expand Down
1 change: 1 addition & 0 deletions data_rentgen/db/models/column_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,5 @@ class ColumnLineage(Base):
primaryjoin="ColumnLineage.fingerprint == DatasetColumnRelation.fingerprint",
lazy="noload",
foreign_keys=[fingerprint],
order_by=(DatasetColumnRelation.source_column, DatasetColumnRelation.target_column),
)
1 change: 1 addition & 0 deletions data_rentgen/db/models/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class Location(Base):
"Address",
lazy="noload",
back_populates="location",
order_by="Address.url",
)

search_vector: Mapped[str] = mapped_column(
Expand Down
9 changes: 8 additions & 1 deletion data_rentgen/db/repositories/column_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,15 @@ async def create_bulk(self, items: list[ColumnLineageDTO]):
return

insert_statement = insert(ColumnLineage)
statement = insert_statement.on_conflict_do_nothing(
inserted_row = insert_statement.excluded
statement = insert_statement.on_conflict_do_update(
index_elements=[ColumnLineage.created_at, ColumnLineage.id],
set_={
# in case if job or run was changed - workaround for
# https://github.com/OpenLineage/OpenLineage/issues/3846
"job_id": inserted_row.job_id,
"run_id": inserted_row.run_id,
},
)

await self._session.execute(
Expand Down
10 changes: 7 additions & 3 deletions data_rentgen/db/repositories/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@
.options(selectinload(Dataset.tag_values).selectinload(TagValue.tag))
)

get_one_query = select(Dataset).where(
Dataset.location_id == bindparam("location_id"),
func.lower(Dataset.name) == bindparam("name_lower"),
get_one_query = (
select(Dataset)
.where(
Dataset.location_id == bindparam("location_id"),
func.lower(Dataset.name) == bindparam("name_lower"),
)
.limit(1)
)

get_stats_query = (
Expand Down
10 changes: 7 additions & 3 deletions data_rentgen/db/repositories/dataset_symlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@
),
)

get_one_query = select(DatasetSymlink).where(
DatasetSymlink.from_dataset_id == bindparam("from_dataset_id"),
DatasetSymlink.to_dataset_id == bindparam("to_dataset_id"),
get_one_query = (
select(DatasetSymlink)
.where(
DatasetSymlink.from_dataset_id == bindparam("from_dataset_id"),
DatasetSymlink.to_dataset_id == bindparam("to_dataset_id"),
)
.limit(1)
)


Expand Down
4 changes: 4 additions & 0 deletions data_rentgen/db/repositories/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
insert_statement = insert_statement.on_conflict_do_update(
index_elements=[Input.created_at, Input.id],
set_={
# in case if job or run was changed - workaround for
# https://github.com/OpenLineage/OpenLineage/issues/3846
"job_id": inserted_row.job_id,
"run_id": inserted_row.run_id,
"num_bytes": func.greatest(inserted_row.num_bytes, Input.num_bytes),
"num_rows": func.greatest(inserted_row.num_rows, Input.num_rows),
"num_files": func.greatest(inserted_row.num_files, Input.num_files),
Expand Down
10 changes: 7 additions & 3 deletions data_rentgen/db/repositories/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@
),
)

get_one_query = select(Job).where(
Job.location_id == bindparam("location_id"),
func.lower(Job.name) == bindparam("name_lower"),
get_one_query = (
select(Job)
.where(
Job.location_id == bindparam("location_id"),
func.lower(Job.name) == bindparam("name_lower"),
)
.limit(1)
)

get_list_query = (
Expand Down
10 changes: 7 additions & 3 deletions data_rentgen/db/repositories/job_dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@
),
)

get_one_query = select(JobDependency).where(
JobDependency.from_job_id == bindparam("from_job_id"),
JobDependency.to_job_id == bindparam("to_job_id"),
get_one_query = (
select(JobDependency)
.where(
JobDependency.from_job_id == bindparam("from_job_id"),
JobDependency.to_job_id == bindparam("to_job_id"),
)
.limit(1)
)


Expand Down
8 changes: 6 additions & 2 deletions data_rentgen/db/repositories/job_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
JobType.type == any_(bindparam("types")),
)

get_one_query = select(JobType).where(
JobType.type == bindparam("type"),
get_one_query = (
select(JobType)
.where(
JobType.type == bindparam("type"),
)
.limit(1)
)

get_distinct_query = select(JobType.type).distinct(JobType.type).order_by(JobType.type)
Expand Down
4 changes: 2 additions & 2 deletions data_rentgen/db/repositories/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
get_one_query = (
select(Location)
.from_statement(
get_one_by_name_query.union(get_one_by_addresses_query),
get_one_by_name_query.union(get_one_by_addresses_query).limit(1),
)
.options(selectinload(Location.addresses))
)
Expand All @@ -51,7 +51,7 @@
{
"location_id": bindparam("location_id"),
"url": bindparam("url"),
}
},
)
.on_conflict_do_nothing(index_elements=["location_id", "url"])
)
Expand Down
3 changes: 3 additions & 0 deletions data_rentgen/db/repositories/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ async def create_or_update_bulk(self, operations: list[OperationDTO]) -> None:
await self._session.execute(
update_statement.values(
{
# in case if run was changed - workaround for
# https://github.com/OpenLineage/OpenLineage/issues/3846
"run_id": bindparam("run_id"),
"name": func.coalesce(bindparam("name"), Operation.name),
"type": func.coalesce(bindparam("type"), Operation.type),
"status": func.greatest(bindparam("status"), Operation.status),
Expand Down
4 changes: 4 additions & 0 deletions data_rentgen/db/repositories/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
insert_statement = insert_statement.on_conflict_do_update(
index_elements=[Output.created_at, Output.id],
set_={
# in case if job or run was changed - workaround for
# https://github.com/OpenLineage/OpenLineage/issues/3846
"job_id": inserted_row.job_id,
"run_id": inserted_row.run_id,
"type": inserted_row.type.op("|")(Output.type),
"num_bytes": func.greatest(inserted_row.num_bytes, Output.num_bytes),
"num_rows": func.greatest(inserted_row.num_rows, Output.num_rows),
Expand Down
4 changes: 3 additions & 1 deletion data_rentgen/db/repositories/tag_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
),
),
)
get_one_query = select(TagValue).where(TagValue.tag_id == bindparam("tag_id"), TagValue.value == bindparam("value"))
get_one_query = (
select(TagValue).where(TagValue.tag_id == bindparam("tag_id"), TagValue.value == bindparam("value")).limit(1)
)


class TagValueRepository(Repository[TagValue]):
Expand Down
18 changes: 6 additions & 12 deletions data_rentgen/dto/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from __future__ import annotations

from copy import copy
from dataclasses import dataclass, field

from data_rentgen.dto.job_type import JobTypeDTO
Expand All @@ -26,23 +25,18 @@ def unique_key(self) -> tuple:

def merge(self, new: JobDTO) -> JobDTO:
self.id = new.id or self.id
self.location = self.location.merge(new.location)
if new.parent_job and self.parent_job:
self.parent_job = self.parent_job.merge(new.parent_job)
self.location.merge(new.location)

# Workaround for https://github.com/OpenLineage/OpenLineage/issues/3846
if new.parent_job and self.parent_job and new.parent_job.unique_key == self.parent_job.unique_key:
self.parent_job.merge(new.parent_job)
else:
self.parent_job = new.parent_job or self.parent_job

if new.type and self.type:
self.type = self.type.merge(new.type)
self.type.merge(new.type)
else:
self.type = new.type or self.type

self.tag_values.update(new.tag_values)

if self.name == "unknown" and new.name != "unknown":
# Workaround for https://github.com/OpenLineage/OpenLineage/issues/3846
result = copy(self)
result.name = new.name
return result

return self
4 changes: 2 additions & 2 deletions data_rentgen/dto/job_dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def unique_key(self) -> tuple:
return (self.from_job.unique_key, self.to_job.unique_key, self.type)

def merge(self, new: JobDependencyDTO) -> JobDependencyDTO:
self.from_job = self.from_job.merge(new.from_job)
self.to_job = self.to_job.merge(new.to_job)
self.from_job.merge(new.from_job)
self.to_job.merge(new.to_job)
self.id = new.id or self.id
return self
4 changes: 2 additions & 2 deletions data_rentgen/dto/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ def unique_key(self) -> tuple:
return (self.id,)

def merge(self, new: OperationDTO) -> OperationDTO:
self.run = self.run.merge(new.run)
self.run.merge(new.run)
if self.sql_query and new.sql_query:
self.sql_query = self.sql_query.merge(new.sql_query)
self.sql_query.merge(new.sql_query)
else:
self.sql_query = new.sql_query or self.sql_query

Expand Down
6 changes: 3 additions & 3 deletions data_rentgen/dto/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ def generate_id(self) -> UUID:
return generate_incremental_uuid(self.created_at, ".".join(id_components))

def merge(self, new: OutputDTO) -> OutputDTO:
self.operation = self.operation.merge(new.operation)
self.dataset = self.dataset.merge(new.dataset)
self.operation.merge(new.operation)
self.dataset.merge(new.dataset)

if self.schema and new.schema:
self.schema = self.schema.merge(new.schema)
self.schema.merge(new.schema)
else:
self.schema = new.schema or self.schema

Expand Down
16 changes: 11 additions & 5 deletions data_rentgen/dto/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,28 @@ def unique_key(self) -> tuple:
return (self.id,)

def merge(self, new: RunDTO) -> RunDTO:
self.job = self.job.merge(new.job)
if new.job.unique_key == self.job.unique_key:
self.job.merge(new.job)
else:
# Workaround for https://github.com/OpenLineage/OpenLineage/issues/3846
self.job = new.job

if new.parent_run and self.parent_run:
self.parent_run = self.parent_run.merge(new.parent_run)
self.parent_run.merge(new.parent_run)
else:
self.parent_run = new.parent_run or self.parent_run

if new.user and self.user:
self.user = self.user.merge(new.user)
self.user.merge(new.user)
else:
self.user = new.user or self.user

existing_dependencies = {item.unique_key: item for item in self.job_dependencies}
merged_dependencies = []
for job_dependency in new.job_dependencies:
if job_dependency.unique_key in existing_dependencies:
merged_dependencies.append(existing_dependencies[job_dependency.unique_key].merge(job_dependency))
old_job_dependency = existing_dependencies.get(job_dependency.unique_key)
if old_job_dependency:
merged_dependencies.append(old_job_dependency.merge(job_dependency))
else:
merged_dependencies.append(job_dependency)
self.job_dependencies = merged_dependencies
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ exclude = ["docs", "tests"]

[project.optional-dependencies]
server = [
"fastapi~=0.136.3",
"fastapi>=0.136.3,<0.139.0",
# starlette 1.0.0 break cookies
"starlette<1.0",
"uvicorn~=0.49",
Expand All @@ -81,7 +81,7 @@ consumer = [
"cramjam~=2.11.0",
]
http2kafka = [
"fastapi~=0.136.3",
"fastapi>=0.136.3,<0.139.0",
# starlette 1.0.0 break cookies
"starlette<1.0",
"uvicorn~=0.49",
Expand Down
Loading
Loading