Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/test-suite.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ jobs:
AWS_ATHENA_WORKGROUP: pyathena
AWS_ATHENA_SPARK_WORKGROUP: pyathena-spark
AWS_ATHENA_MANAGED_WORKGROUP: pyathena-managed
# Registered S3 Tables catalog (s3tablescatalog/<table-bucket>) and namespace
# for the SQLAlchemy S3 Tables tests; the table bucket, namespace, and the
# AWS analytics-services integration are provisioned out of band.
AWS_ATHENA_S3_TABLES_CATALOG: s3tablescatalog/pyathena
AWS_ATHENA_S3_TABLES_NAMESPACE: pyathena_test

strategy:
fail-fast: false
Expand Down
48 changes: 48 additions & 0 deletions docs/sqlalchemy.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,54 @@ If you want to limit the column options to specific table names only, specify th
awsathena+rest://:@athena.us-west-2.amazonaws.com:443/default?partition=table1.column1%2Ctable1.column2&cluster=table2.column1%2Ctable2.column2&...
```

## Amazon S3 Tables

[Amazon S3 Tables](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables.html) are Iceberg-backed
tables stored in a dedicated table bucket with their own data catalog. Athena registers each table bucket as a
catalog named `s3tablescatalog/<table-bucket>`, so an S3 Tables table is addressed by a three-part identifier
`catalog.namespace.table`.

To target an S3 Tables table, set the `schema` to the dotted `s3tablescatalog/<table-bucket>.<namespace>` form.
The dialect quotes the catalog, namespace, and table independently, and because S3 Tables use managed storage it
omits the `LOCATION` clause automatically (so `awsathena_location` is not required).

```python
table = Table(
"some_table",
MetaData(schema="s3tablescatalog/my-bucket.my_namespace"),
Column("id", types.Integer),
Column("dt", types.Date, awsathena_partition=True, awsathena_partition_transform="day"),
awsathena_tblproperties={"table_type": "ICEBERG"},
)
table.create(bind=conn)
```

which builds the following statement:

```sql
CREATE TABLE `s3tablescatalog/my-bucket`.my_namespace.some_table (
id INT,
dt DATE
)
PARTITIONED BY (
day(dt)
)
TBLPROPERTIES (
'table_type' = 'ICEBERG'
)
```

All Iceberg partition transforms (`year`, `month`, `day`, `hour`, `bucket`, `truncate`) are supported, the same as
for other Iceberg tables. CTAS (`CREATE TABLE ... AS SELECT`) is not modeled as a SQLAlchemy construct; issue it as
raw SQL via `text()`, using the double-quoted three-part identifier:

```python
conn.execute(text(
'CREATE TABLE "s3tablescatalog/my-bucket"."my_namespace"."some_table" '
"WITH (table_type = 'ICEBERG') AS SELECT 1 AS id"
))
```

## Temporal/Time-travel with Iceberg

Athena supports time-travel queries on Iceberg tables by either a version_id or a timestamp. The `FOR TIMESTAMP AS OF`
Expand Down
35 changes: 35 additions & 0 deletions pyathena/sqlalchemy/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@
_DialectArgDict = Mapping[str, Any]
CreateColumn = Any

# Prefix of the Athena data catalog name registered for an Amazon S3 Tables
# table bucket (e.g. ``s3tablescatalog/my-bucket``). S3 Tables are
# Iceberg-backed and use managed storage, so their CREATE TABLE statements must
# not include a LOCATION clause.
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-integrations-query-athena.html
S3_TABLES_CATALOG_PREFIX = "s3tablescatalog/"


class AthenaTypeCompiler(GenericTypeCompiler):
"""Type compiler for Amazon Athena SQL types.
Expand Down Expand Up @@ -305,6 +312,11 @@ class AthenaDDLCompiler(DDLCompiler):

- External table creation (EXTERNAL keyword for Hive-style tables)
- Iceberg table creation (managed tables with ACID support)
- Amazon S3 Tables (Iceberg-backed, managed storage): targeted by a
three-part ``catalog.namespace.table`` identifier whose catalog segment is
``s3tablescatalog/<table-bucket>``; the LOCATION clause is omitted since
storage is managed. Set the table's ``schema`` to
``s3tablescatalog/<table-bucket>.<namespace>``.
- File formats: PARQUET, ORC, TEXTFILE, JSON, AVRO, etc.
- Row formats with SerDe specifications
- Compression settings for various file formats
Expand Down Expand Up @@ -445,6 +457,26 @@ def _get_serde_properties_specification(
text.append(")")
return "\n".join(text)

@staticmethod
def _is_s3_tables(table: Table) -> bool:
"""Return whether the table targets an Amazon S3 Tables catalog.

S3 Tables are addressed by a three-part identifier whose catalog segment
is ``s3tablescatalog/<table-bucket>`` (e.g.
``s3tablescatalog/my-bucket.namespace.table``). Such tables use managed
storage, so their DDL must omit the LOCATION clause.

Args:
table: The table being compiled.

Returns:
True if the table's schema catalog segment is an S3 Tables catalog.
"""
schema = table.schema
if not schema:
return False
return schema.split(".", 1)[0].startswith(S3_TABLES_CATALOG_PREFIX)

def _get_table_location(
self, table: Table, dialect_opts: _DialectArgDict, connect_opts: Mapping[str, Any]
) -> str | None:
Expand All @@ -466,6 +498,9 @@ def _get_table_location(
def _get_table_location_specification(
self, table: Table, dialect_opts: _DialectArgDict, connect_opts: Mapping[str, Any]
) -> str:
if self._is_s3_tables(table):
# S3 Tables use managed storage; CREATE TABLE must not set a LOCATION.
return ""
location = self._get_table_location(table, dialect_opts, connect_opts)
text = []
if location:
Expand Down
40 changes: 38 additions & 2 deletions pyathena/sqlalchemy/preparer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,46 @@
from pyathena.sqlalchemy.constants import DDL_RESERVED_WORDS, SELECT_STATEMENT_RESERVED_WORDS

if TYPE_CHECKING:
from typing import Any

from sqlalchemy import Dialect


class AthenaDMLIdentifierPreparer(IdentifierPreparer):
class AthenaBaseIdentifierPreparer(IdentifierPreparer):
"""Base identifier preparer shared by Athena's DML and DDL preparers.

Athena identifiers can be three-part (``catalog.namespace.table``), which is
required to target S3 Tables catalogs such as ``s3tablescatalog/<bucket>``.
SQLAlchemy's default :meth:`IdentifierPreparer.quote_schema` treats the whole
schema string as a single identifier, so a dotted schema like
``s3tablescatalog/bucket.ns`` collapses into one quoted token
(catalog and namespace merged) instead of two separately quoted tokens.

See Also:
:class:`AthenaDMLIdentifierPreparer`: Preparer for DML statements.
:class:`AthenaDDLIdentifierPreparer`: Preparer for DDL statements.
"""

def quote_schema(self, schema: str, force: Any = None) -> str:
"""Quote a possibly multi-part (``catalog.namespace``) schema name.

Each dot-separated part is quoted independently so the catalog and
namespace round-trip correctly in both DDL (backtick) and DML
(double-quote) statements. Athena database and namespace names cannot
contain a dot, so the separator is unambiguous; a schema without a dot
is quoted exactly as before.

Args:
schema: The schema name, optionally qualified as ``catalog.namespace``.
force: Unused; kept for SQLAlchemy API compatibility.

Returns:
The quoted, dot-joined schema identifier.
"""
return ".".join(self.quote(part) for part in schema.split("."))


class AthenaDMLIdentifierPreparer(AthenaBaseIdentifierPreparer):
"""Identifier preparer for Athena DML (SELECT, INSERT, etc.) statements.

This preparer handles quoting and escaping of identifiers in DML statements.
Expand All @@ -29,7 +65,7 @@ class AthenaDMLIdentifierPreparer(IdentifierPreparer):
reserved_words: set[str] = SELECT_STATEMENT_RESERVED_WORDS


class AthenaDDLIdentifierPreparer(IdentifierPreparer):
class AthenaDDLIdentifierPreparer(AthenaBaseIdentifierPreparer):
"""Identifier preparer for Athena DDL (CREATE, ALTER, DROP) statements.

This preparer handles quoting and escaping of identifiers in DDL statements.
Expand Down
24 changes: 24 additions & 0 deletions scripts/cloudformation/github_actions_oidc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,30 @@ Resources:
Resource: [
!Sub "arn:aws:s3:::*"
]
- PolicyName: s3tables-access
PolicyDocument:
Version: "2012-10-17"
Statement:
# Amazon S3 Tables data plane: create/query/drop namespaces and
# tables in the table bucket used by the SQLAlchemy S3 Tables tests.
- Effect: Allow
Action: [
"s3tables:*"
]
Resource: [
!Sub "arn:aws:s3tables:${AWS::Region}:${AWS::AccountId}:bucket/*"
]
# Resolve the federated `s3tablescatalog` Glue catalog and obtain
# Lake Formation data access when querying S3 Tables from Athena.
- Effect: Allow
Action: [
"glue:GetCatalog",
"glue:GetCatalogs",
"lakeformation:GetDataAccess"
]
Resource: [
"*"
]
SparkRole:
Type: AWS::IAM::Role
Properties:
Expand Down
7 changes: 7 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ def __init__(self):
)
self.default_work_group = os.getenv("AWS_ATHENA_DEFAULT_WORKGROUP", "primary")
self.managed_work_group = os.getenv("AWS_ATHENA_MANAGED_WORKGROUP")
# Optional Amazon S3 Tables configuration for the SQLAlchemy dialect tests.
# `s3tables_catalog` is the registered table-bucket catalog, e.g.
# "s3tablescatalog/<table-bucket>"; `s3tables_namespace` is a namespace that
# already exists in it. These are optional so the suite stays runnable
# without an S3 Tables bucket; the S3 Tables tests skip when they are unset.
self.s3tables_catalog = os.getenv("AWS_ATHENA_S3_TABLES_CATALOG")
self.s3tables_namespace = os.getenv("AWS_ATHENA_S3_TABLES_NAMESPACE")
self.schema = "pyathena_test_" + "".join(
random.choices(string.ascii_lowercase + string.digits, k=10)
)
Expand Down
118 changes: 118 additions & 0 deletions tests/pyathena/sqlalchemy/test_base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import re
import textwrap
import uuid
Expand Down Expand Up @@ -26,6 +27,25 @@
)
from tests.pyathena.conftest import ENV

# Amazon S3 Tables tests need a pre-provisioned table-bucket catalog and namespace.
# Skip them unless AWS_ATHENA_S3_TABLES_CATALOG / AWS_ATHENA_S3_TABLES_NAMESPACE are set.
requires_s3_tables = pytest.mark.skipif(
not ENV.s3tables_catalog or not ENV.s3tables_namespace,
reason="AWS_ATHENA_S3_TABLES_CATALOG / AWS_ATHENA_S3_TABLES_NAMESPACE are not configured",
)


def unique_s3tables_table_name(base: str) -> str:
"""Return a per-run-unique S3 Tables table name.

Other integration tests isolate themselves with a random per-process
``ENV.schema``, but the S3 Tables tests share one fixed namespace
(``ENV.s3tables_namespace``). The CI matrix runs ``tests/pyathena`` once per
Python version in parallel against the same account, so a fixed table name
would collide across those concurrent jobs; a random suffix keeps them apart.
"""
return f"{base}_{uuid.uuid4().hex[:8]}"


class TestSQLAlchemyAthena:
@pytest.mark.parametrize(
Expand Down Expand Up @@ -2019,6 +2039,104 @@ def test_create_iceberg_table_with_multiple_partition_transform(self, engine):
tblproperties = actual.dialect_options["awsathena"]["tblproperties"]
assert tblproperties["table_type"] == "ICEBERG"

@requires_s3_tables
def test_create_s3tables_iceberg_table(self, engine):
engine, conn = engine
# S3 Tables are addressed by a three-part identifier
# (catalog.namespace.table) and use managed storage, so the emitted DDL
# quotes the catalog/namespace/table separately and omits LOCATION.
schema = f"{ENV.s3tables_catalog}.{ENV.s3tables_namespace}"
table_name = unique_s3tables_table_name("test_create_s3tables_iceberg_table")
table = Table(
table_name,
MetaData(schema=schema),
Column("col_1", types.String),
Column("col_2", types.Integer),
awsathena_tblproperties={"table_type": "ICEBERG"},
)
ddl = CreateTable(table).compile(bind=conn)

assert str(ddl) == textwrap.dedent(
f"""
CREATE TABLE `{ENV.s3tables_catalog}`.{ENV.s3tables_namespace}.{table_name} (
\tcol_1 STRING,
\tcol_2 INT
)
TBLPROPERTIES (
\t'table_type' = 'ICEBERG'
)
"""
)

table.create(bind=conn)
try:
# NOTE: SQLAlchemy reflection (autoload_with) of an S3 Tables table is
# not covered here. The dialect's introspection path passes the dotted
# schema straight through as the database name and does not split the
# catalog from the namespace, so reflecting a three-part identifier is
# a separate, currently-unsupported concern. Verify creation by querying
# the table directly with its double-quoted three-part identifier.
fqtn = f'"{ENV.s3tables_catalog}"."{ENV.s3tables_namespace}"."{table_name}"'
conn.execute(text(f"SELECT col_1, col_2 FROM {fqtn} LIMIT 0"))
finally:
with contextlib.suppress(Exception):
table.drop(bind=conn)

@requires_s3_tables
def test_create_s3tables_iceberg_table_with_partition_transform(self, engine):
engine, conn = engine
schema = f"{ENV.s3tables_catalog}.{ENV.s3tables_namespace}"
table_name = unique_s3tables_table_name("test_create_s3tables_partition_transform")
table = Table(
table_name,
MetaData(schema=schema),
Column("col_1", types.String),
Column("dt", types.Date, awsathena_partition=True, awsathena_partition_transform="day"),
awsathena_tblproperties={"table_type": "ICEBERG"},
)
ddl = CreateTable(table).compile(bind=conn)

assert str(ddl) == textwrap.dedent(
f"""
CREATE TABLE `{ENV.s3tables_catalog}`.{ENV.s3tables_namespace}.{table_name} (
\tcol_1 STRING,
\tdt DATE
)
PARTITIONED BY (
\tday(dt)
)
TBLPROPERTIES (
\t'table_type' = 'ICEBERG'
)
"""
)

table.create(bind=conn)
with contextlib.suppress(Exception):
table.drop(bind=conn)

@requires_s3_tables
def test_create_s3tables_table_as_select(self, engine):
engine, conn = engine
# CTAS is not modeled as a SQLAlchemy construct; exercise it as raw SQL.
# The three-part identifier uses double quotes (Presto/Trino convention).
table_name = unique_s3tables_table_name("test_create_s3tables_table_as_select")
fqtn = f'"{ENV.s3tables_catalog}"."{ENV.s3tables_namespace}"."{table_name}"'
conn.execute(text(f"DROP TABLE IF EXISTS {fqtn}"))
try:
conn.execute(
text(
f"CREATE TABLE {fqtn} "
"WITH (table_type = 'ICEBERG') AS "
"SELECT 1 AS id, 'a' AS name"
)
)
rows = conn.execute(text(f"SELECT id, name FROM {fqtn}")).fetchall()
assert rows == [(1, "a")]
finally:
with contextlib.suppress(Exception):
conn.execute(text(f"DROP TABLE IF EXISTS {fqtn}"))

def test_insert_from_select_cte_follows_insert_one(self, engine):
engine, conn = engine
metadata = MetaData(schema=ENV.schema)
Expand Down
Loading
Loading