Skip to content

Commit c0393ae

Browse files
jychpclaude
andauthored
feat(gcp): add BigQuery resource ingestion (#2433)
### Type of change - [x] New feature (non-breaking change that adds functionality) ### Summary Add BigQuery support to the GCP intel module, covering four resource types: - **Datasets** (`GCPBigQueryDataset`) — with `Database` ontology label - **Tables** (`GCPBigQueryTable`) — TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL (enriched via `tables.get` for numBytes, numRows, description, etc.) - **Routines** (`GCPBigQueryRoutine`) — stored procedures, UDFs, table-valued functions - **Connections** (`GCPBigQueryConnection`) — external data source connections (Cloud SQL, AWS, Azure, GCP Service Account, Spark, etc.) **Graph structure:** ``` (GCPProject)-[:RESOURCE]->(GCPBigQueryDataset)-[:HAS_TABLE]->(GCPBigQueryTable) (GCPProject)-[:RESOURCE]->(GCPBigQueryDataset)-[:HAS_ROUTINE]->(GCPBigQueryRoutine) (GCPProject)-[:RESOURCE]->(GCPBigQueryConnection) (GCPBigQueryTable)-[:USES_CONNECTION]->(GCPBigQueryConnection) (GCPBigQueryRoutine)-[:USES_CONNECTION]->(GCPBigQueryConnection) (GCPBigQueryConnection)-[:CONNECTS_TO]->(GCPCloudSQLInstance) (GCPBigQueryConnection)-[:CONNECTS_WITH]->(AWSRole) (GCPBigQueryConnection)-[:CONNECTS_WITH]->(EntraServicePrincipal) (GCPBigQueryConnection)-[:CONNECTS_WITH]->(GCPServiceAccount) ``` **Cross-cloud relationships:** BigQuery connections that reference external identity providers are linked to existing Cartography nodes: - `aws.accessRole.iamRoleId` → `AWSRole.id` (ARN) - `azure.federatedApplicationClientId` → `EntraServicePrincipal.id` - `cloudResource.serviceAccountId` → `GCPServiceAccount.email` **Implementation details:** - Uses BigQuery v2 discovery API (`build_client("bigquery", "v2")`) — consistent with all other GCP modules - Connections use a separate API service (`bigqueryconnection.googleapis.com` v1) - The BigQuery Connection API does not support a wildcard location — locations are discovered from datasets + default multi-region locations (us, eu), then queried individually - Connection IDs are normalized from the short dot-separated format (`project_number.location.name`) to the full resource name (`projects/.../locations/.../connections/...`) for consistent relationship matching - Tables are enriched via per-table `tables.get` calls for fields not available in `tables.list` (numBytes, numRows, description, friendlyName, externalDataConfiguration) - Returns `None` on API disabled / permission denied (403/404) — preserves existing data (no cleanup on failure) - Child resources (tables, routines) are aggregated across all datasets into a single `load()` call for performance - Uses direct indexing for required API fields (fail-fast on missing data) - Follows the flat-file pattern (like bigtable) with one file per resource type - Ontology mapping uses `dataset_id` (required field) for the Database name instead of `friendly_name` (optional) **Setup requirements** (documented in `docs/root/modules/gcp/config.md`): - APIs: `bigquery.googleapis.com`, `bigqueryconnection.googleapis.com` - IAM roles: `roles/bigquery.dataViewer`, `roles/bigquery.connectionUser` ### Related issues or links - Fixes #2128 - Supersedes #2229 ### How was this tested? - Integration tests covering all 4 resource types with mock API responses - Verifies all node properties, relationships (RESOURCE, HAS_TABLE, HAS_ROUTINE, USES_CONNECTION, CONNECTS_TO, CONNECTS_WITH), and Database ontology label - Tested against live GCP project with BigQuery Omni (AWS cross-cloud) connections - Linter passes (`make test_lint`) ### Checklist #### General - [x] I have read the [contributing guidelines](https://cartography-cncf.github.io/cartography/dev/developer-guide.html). - [x] The linter passes locally (`make lint`). - [x] I have added/updated tests that prove my fix is effective or my feature works. #### Proof of functionality - [x] New or updated unit/integration tests. #### If you are adding or modifying a synced entity - [x] Included Cartography sync logs from a real environment demonstrating successful synchronization of the new/modified entity. #### If you are changing a node or relationship - [x] Updated the [schema documentation](https://github.com/cartography-cncf/cartography/tree/master/docs/root/modules). #### If you are implementing a new intel module - [x] Used the NodeSchema [data model](https://cartography-cncf.github.io/cartography/dev/writing-intel-modules.html#defining-a-node). ### Notes for reviewers This builds on the work started in #2229 but addresses all review feedback: - Uses discovery API instead of `google.cloud.bigquery` client - One model per file in `cartography/models/gcp/bigquery/` - Standard error handling with `is_api_disabled_error()` + 403/404 handling - No catch-all exceptions — let errors propagate - No datetime conversion — passes native API values - Aggregates child resources across parents for single `load()` call - Integration tests (not unit tests) following the standard pattern - Passes credentials through to `build_client()` instead of using ADC independently - Required fields use direct indexing (`data["field"]`) for fail-fast semantics --------- Signed-off-by: Jeremy Chapeau <jeremy@subimage.io> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 305cce0 commit c0393ae

16 files changed

Lines changed: 1934 additions & 1 deletion

File tree

cartography/intel/gcp/__init__.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
from cartography.config import Config
1616
from cartography.graph.job import GraphJob
1717
from cartography.intel.gcp import artifact_registry
18+
from cartography.intel.gcp import bigquery_connection
19+
from cartography.intel.gcp import bigquery_dataset
20+
from cartography.intel.gcp import bigquery_routine
21+
from cartography.intel.gcp import bigquery_table
1822
from cartography.intel.gcp import bigtable_app_profile
1923
from cartography.intel.gcp import bigtable_backup
2024
from cartography.intel.gcp import bigtable_cluster
@@ -65,7 +69,7 @@
6569
# and https://cloud.google.com/service-usage/docs/reference/rest/v1/services#ServiceConfig
6670
Services = namedtuple(
6771
"Services",
68-
"compute storage gke dns iam kms bigtable cai aiplatform cloud_sql gcf secretsmanager artifact_registry cloud_run",
72+
"compute storage gke dns iam kms bigtable cai aiplatform cloud_sql gcf secretsmanager artifact_registry cloud_run bigquery bigquery_connection",
6973
)
7074
service_names = Services(
7175
compute="compute.googleapis.com",
@@ -82,6 +86,8 @@
8286
secretsmanager="secretmanager.googleapis.com",
8387
artifact_registry="artifactregistry.googleapis.com",
8488
cloud_run="run.googleapis.com",
89+
bigquery="bigquery.googleapis.com",
90+
bigquery_connection="bigqueryconnection.googleapis.com",
8591
)
8692

8793

@@ -549,6 +555,62 @@ def _sync_project_resources(
549555
common_job_parameters,
550556
)
551557

558+
# Build the BigQuery v2 client once — used for datasets/tables/routines
559+
# and also for location discovery when syncing connections.
560+
bigquery_client = None
561+
if service_names.bigquery in enabled_services:
562+
bigquery_client = build_client(
563+
"bigquery",
564+
"v2",
565+
credentials=credentials,
566+
)
567+
568+
if service_names.bigquery_connection in enabled_services:
569+
logger.info("Syncing GCP project %s for BigQuery connections.", project_id)
570+
bigquery_conn_client = build_client(
571+
"bigqueryconnection",
572+
"v1",
573+
credentials=credentials,
574+
)
575+
bigquery_connection.sync_bigquery_connections(
576+
neo4j_session,
577+
bigquery_conn_client,
578+
project_id,
579+
gcp_update_tag,
580+
common_job_parameters,
581+
bigquery_client=bigquery_client,
582+
)
583+
584+
datasets_raw = None
585+
if bigquery_client is not None:
586+
logger.info("Syncing GCP project %s for BigQuery.", project_id)
587+
datasets_raw = bigquery_dataset.sync_bigquery_datasets(
588+
neo4j_session,
589+
bigquery_client,
590+
project_id,
591+
gcp_update_tag,
592+
common_job_parameters,
593+
)
594+
595+
if bigquery_client is not None and datasets_raw is not None:
596+
bigquery_table.sync_bigquery_tables(
597+
neo4j_session,
598+
bigquery_client,
599+
datasets_raw,
600+
project_id,
601+
gcp_update_tag,
602+
common_job_parameters,
603+
)
604+
605+
bigquery_routine.sync_bigquery_routines(
606+
neo4j_session,
607+
bigquery_client,
608+
datasets_raw,
609+
project_id,
610+
gcp_update_tag,
611+
common_job_parameters,
612+
)
613+
552614
# Clean up project-level IAM resources (service accounts and project roles)
553615
# Only run cleanup if IAM sync succeeded to avoid deleting valid data
554616
# when sync was skipped due to permission issues.
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
import logging
2+
3+
import neo4j
4+
from googleapiclient.discovery import Resource
5+
from googleapiclient.errors import HttpError
6+
7+
from cartography.client.core.tx import load
8+
from cartography.graph.job import GraphJob
9+
from cartography.intel.gcp.util import gcp_api_execute_with_retry
10+
from cartography.intel.gcp.util import is_api_disabled_error
11+
from cartography.models.gcp.bigquery.connection import GCPBigQueryConnectionSchema
12+
from cartography.util import timeit
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
def _get_locations(bigquery_client: Resource, project_id: str) -> list[str]:
18+
"""
19+
List available BigQuery locations for a project using the BigQuery v2 API.
20+
21+
The BigQuery Connection API does not expose a locations.list endpoint, so we
22+
use the BigQuery v2 API (datasets.list with a dry-run or projects API) instead.
23+
BigQuery v2 does not have a dedicated locations endpoint either, so we query
24+
the Cloud Resource Manager locations via the datasets API — specifically, we
25+
list datasets to discover which locations the project uses, and supplement with
26+
standard multi-region locations to ensure we don't miss connections in locations
27+
without datasets.
28+
29+
Returns a deduplicated list of location IDs (e.g., ["us", "eu", "us-central1"]).
30+
"""
31+
# Standard BigQuery multi-region and common regional locations.
32+
# Connections can exist in any of these even without datasets.
33+
# See https://cloud.google.com/bigquery/docs/locations
34+
default_locations = {"us", "eu"}
35+
36+
# Discover additional locations from existing datasets
37+
locations: set[str] = set(default_locations)
38+
try:
39+
request = bigquery_client.datasets().list(projectId=project_id, all=True)
40+
while request is not None:
41+
response = gcp_api_execute_with_retry(request)
42+
for ds in response.get("datasets", []):
43+
loc = ds.get("location")
44+
if loc:
45+
locations.add(loc.lower())
46+
request = bigquery_client.datasets().list_next(
47+
previous_request=request,
48+
previous_response=response,
49+
)
50+
except HttpError as e:
51+
logger.debug(
52+
"Could not list datasets to discover locations for project %s - %s. "
53+
"Using default locations only.",
54+
project_id,
55+
e,
56+
)
57+
58+
return list(locations)
59+
60+
61+
@timeit
62+
def get_bigquery_connections(
63+
conn_client: Resource,
64+
project_id: str,
65+
bigquery_client: Resource | None = None,
66+
) -> list[dict] | None:
67+
"""
68+
Gets BigQuery connections for a project across all locations.
69+
70+
The BigQuery Connection API does not support a wildcard location, so we
71+
discover locations from the BigQuery v2 API (via dataset locations) plus
72+
standard multi-region locations, then query each one individually.
73+
74+
Args:
75+
conn_client: The bigqueryconnection v1 API client.
76+
project_id: The GCP project ID.
77+
bigquery_client: Optional BigQuery v2 API client for location discovery.
78+
If not provided, only default locations (us, eu) are queried.
79+
80+
Returns:
81+
list[dict]: List of BigQuery connections
82+
None: If the API is not enabled or access is denied
83+
84+
Raises:
85+
HttpError: For errors other than API disabled or permission denied
86+
"""
87+
if bigquery_client is not None:
88+
locations = _get_locations(bigquery_client, project_id)
89+
else:
90+
locations = ["us", "eu"]
91+
92+
connections: list[dict] = []
93+
for location in locations:
94+
parent = f"projects/{project_id}/locations/{location}"
95+
try:
96+
request = (
97+
conn_client.projects()
98+
.locations()
99+
.connections()
100+
.list(
101+
parent=parent,
102+
)
103+
)
104+
while request is not None:
105+
response = gcp_api_execute_with_retry(request)
106+
connections.extend(response.get("connections", []))
107+
request = (
108+
conn_client.projects()
109+
.locations()
110+
.connections()
111+
.list_next(
112+
previous_request=request,
113+
previous_response=response,
114+
)
115+
)
116+
except HttpError as e:
117+
if is_api_disabled_error(e) or e.resp.status in (403, 404):
118+
logger.warning(
119+
"Could not retrieve BigQuery connections for %s/%s - %s. "
120+
"Skipping location.",
121+
project_id,
122+
location,
123+
e,
124+
)
125+
continue
126+
raise
127+
128+
return connections
129+
130+
131+
def transform_connections(connections_data: list[dict], project_id: str) -> list[dict]:
132+
transformed: list[dict] = []
133+
for conn in connections_data:
134+
# Determine connection type from the oneOf fields in the API response
135+
connection_type = None
136+
for type_key in (
137+
"cloudSql",
138+
"aws",
139+
"azure",
140+
"cloudSpanner",
141+
"cloudResource",
142+
"spark",
143+
):
144+
if type_key in conn:
145+
connection_type = type_key
146+
break
147+
148+
cloud_sql = conn.get("cloudSql", {}) or {}
149+
aws = conn.get("aws", {}) or {}
150+
azure = conn.get("azure", {}) or {}
151+
cloud_resource = conn.get("cloudResource", {}) or {}
152+
transformed.append(
153+
{
154+
"name": conn["name"],
155+
"friendlyName": conn.get("friendlyName"),
156+
"description": conn.get("description"),
157+
"connection_type": connection_type,
158+
"creationTime": conn.get("creationTime"),
159+
"lastModifiedTime": conn.get("lastModifiedTime"),
160+
"hasCredential": conn.get("hasCredential"),
161+
"cloud_sql_instance_id": cloud_sql.get("instanceId"),
162+
"aws_role_arn": aws.get("accessRole", {}).get("iamRoleId"),
163+
"azure_app_client_id": azure.get("federatedApplicationClientId"),
164+
"service_account_id": cloud_resource.get("serviceAccountId"),
165+
"project_id": project_id,
166+
},
167+
)
168+
return transformed
169+
170+
171+
@timeit
172+
def load_bigquery_connections(
173+
neo4j_session: neo4j.Session,
174+
data: list[dict],
175+
project_id: str,
176+
update_tag: int,
177+
) -> None:
178+
load(
179+
neo4j_session,
180+
GCPBigQueryConnectionSchema(),
181+
data,
182+
lastupdated=update_tag,
183+
PROJECT_ID=project_id,
184+
)
185+
186+
187+
@timeit
188+
def cleanup_bigquery_connections(
189+
neo4j_session: neo4j.Session,
190+
common_job_parameters: dict,
191+
) -> None:
192+
GraphJob.from_node_schema(
193+
GCPBigQueryConnectionSchema(),
194+
common_job_parameters,
195+
).run(neo4j_session)
196+
197+
198+
@timeit
199+
def sync_bigquery_connections(
200+
neo4j_session: neo4j.Session,
201+
client: Resource,
202+
project_id: str,
203+
update_tag: int,
204+
common_job_parameters: dict,
205+
bigquery_client: Resource | None = None,
206+
) -> None:
207+
logger.info("Syncing BigQuery connections for project %s.", project_id)
208+
connections_raw = get_bigquery_connections(client, project_id, bigquery_client)
209+
210+
if connections_raw is not None:
211+
connections = transform_connections(connections_raw, project_id)
212+
load_bigquery_connections(neo4j_session, connections, project_id, update_tag)
213+
214+
cleanup_job_params = common_job_parameters.copy()
215+
cleanup_job_params["PROJECT_ID"] = project_id
216+
cleanup_bigquery_connections(neo4j_session, cleanup_job_params)

0 commit comments

Comments
 (0)