diff --git a/.github/workflows/test-suite.yaml b/.github/workflows/test-suite.yaml index a14d2087..8aff8e19 100644 --- a/.github/workflows/test-suite.yaml +++ b/.github/workflows/test-suite.yaml @@ -18,6 +18,11 @@ jobs: AWS_ATHENA_WORKGROUP: pyathena AWS_ATHENA_SPARK_WORKGROUP: pyathena-spark AWS_ATHENA_MANAGED_WORKGROUP: pyathena-managed + # Registered S3 Tables catalog (s3tablescatalog/) and namespace + # for the SQLAlchemy S3 Tables tests; the table bucket, namespace, and the + # AWS analytics-services integration are provisioned out of band. + AWS_ATHENA_S3_TABLES_CATALOG: s3tablescatalog/pyathena + AWS_ATHENA_S3_TABLES_NAMESPACE: pyathena_test strategy: fail-fast: false diff --git a/docs/sqlalchemy.md b/docs/sqlalchemy.md index 390dfaf6..4cb0a6b7 100644 --- a/docs/sqlalchemy.md +++ b/docs/sqlalchemy.md @@ -363,6 +363,54 @@ If you want to limit the column options to specific table names only, specify th awsathena+rest://:@athena.us-west-2.amazonaws.com:443/default?partition=table1.column1%2Ctable1.column2&cluster=table2.column1%2Ctable2.column2&... ``` +## Amazon S3 Tables + +[Amazon S3 Tables](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables.html) are Iceberg-backed +tables stored in a dedicated table bucket with their own data catalog. Athena registers each table bucket as a +catalog named `s3tablescatalog/`, so an S3 Tables table is addressed by a three-part identifier +`catalog.namespace.table`. + +To target an S3 Tables table, set the `schema` to the dotted `s3tablescatalog/.` form. +The dialect quotes the catalog, namespace, and table independently, and because S3 Tables use managed storage it +omits the `LOCATION` clause automatically (so `awsathena_location` is not required). + +```python +table = Table( + "some_table", + MetaData(schema="s3tablescatalog/my-bucket.my_namespace"), + Column("id", types.Integer), + Column("dt", types.Date, awsathena_partition=True, awsathena_partition_transform="day"), + awsathena_tblproperties={"table_type": "ICEBERG"}, +) +table.create(bind=conn) +``` + +which builds the following statement: + +```sql +CREATE TABLE `s3tablescatalog/my-bucket`.my_namespace.some_table ( + id INT, + dt DATE +) +PARTITIONED BY ( + day(dt) +) +TBLPROPERTIES ( + 'table_type' = 'ICEBERG' +) +``` + +All Iceberg partition transforms (`year`, `month`, `day`, `hour`, `bucket`, `truncate`) are supported, the same as +for other Iceberg tables. CTAS (`CREATE TABLE ... AS SELECT`) is not modeled as a SQLAlchemy construct; issue it as +raw SQL via `text()`, using the double-quoted three-part identifier: + +```python +conn.execute(text( + 'CREATE TABLE "s3tablescatalog/my-bucket"."my_namespace"."some_table" ' + "WITH (table_type = 'ICEBERG') AS SELECT 1 AS id" +)) +``` + ## Temporal/Time-travel with Iceberg Athena supports time-travel queries on Iceberg tables by either a version_id or a timestamp. The `FOR TIMESTAMP AS OF` diff --git a/pyathena/sqlalchemy/compiler.py b/pyathena/sqlalchemy/compiler.py index f45ab310..ba83efb0 100644 --- a/pyathena/sqlalchemy/compiler.py +++ b/pyathena/sqlalchemy/compiler.py @@ -39,6 +39,13 @@ _DialectArgDict = Mapping[str, Any] CreateColumn = Any +# Prefix of the Athena data catalog name registered for an Amazon S3 Tables +# table bucket (e.g. ``s3tablescatalog/my-bucket``). S3 Tables are +# Iceberg-backed and use managed storage, so their CREATE TABLE statements must +# not include a LOCATION clause. +# https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-integrations-query-athena.html +S3_TABLES_CATALOG_PREFIX = "s3tablescatalog/" + class AthenaTypeCompiler(GenericTypeCompiler): """Type compiler for Amazon Athena SQL types. @@ -305,6 +312,11 @@ class AthenaDDLCompiler(DDLCompiler): - External table creation (EXTERNAL keyword for Hive-style tables) - Iceberg table creation (managed tables with ACID support) + - Amazon S3 Tables (Iceberg-backed, managed storage): targeted by a + three-part ``catalog.namespace.table`` identifier whose catalog segment is + ``s3tablescatalog/``; the LOCATION clause is omitted since + storage is managed. Set the table's ``schema`` to + ``s3tablescatalog/.``. - File formats: PARQUET, ORC, TEXTFILE, JSON, AVRO, etc. - Row formats with SerDe specifications - Compression settings for various file formats @@ -445,6 +457,26 @@ def _get_serde_properties_specification( text.append(")") return "\n".join(text) + @staticmethod + def _is_s3_tables(table: Table) -> bool: + """Return whether the table targets an Amazon S3 Tables catalog. + + S3 Tables are addressed by a three-part identifier whose catalog segment + is ``s3tablescatalog/`` (e.g. + ``s3tablescatalog/my-bucket.namespace.table``). Such tables use managed + storage, so their DDL must omit the LOCATION clause. + + Args: + table: The table being compiled. + + Returns: + True if the table's schema catalog segment is an S3 Tables catalog. + """ + schema = table.schema + if not schema: + return False + return schema.split(".", 1)[0].startswith(S3_TABLES_CATALOG_PREFIX) + def _get_table_location( self, table: Table, dialect_opts: _DialectArgDict, connect_opts: Mapping[str, Any] ) -> str | None: @@ -466,6 +498,9 @@ def _get_table_location( def _get_table_location_specification( self, table: Table, dialect_opts: _DialectArgDict, connect_opts: Mapping[str, Any] ) -> str: + if self._is_s3_tables(table): + # S3 Tables use managed storage; CREATE TABLE must not set a LOCATION. + return "" location = self._get_table_location(table, dialect_opts, connect_opts) text = [] if location: diff --git a/pyathena/sqlalchemy/preparer.py b/pyathena/sqlalchemy/preparer.py index f884ff6a..5815c9e3 100644 --- a/pyathena/sqlalchemy/preparer.py +++ b/pyathena/sqlalchemy/preparer.py @@ -7,10 +7,46 @@ from pyathena.sqlalchemy.constants import DDL_RESERVED_WORDS, SELECT_STATEMENT_RESERVED_WORDS if TYPE_CHECKING: + from typing import Any + from sqlalchemy import Dialect -class AthenaDMLIdentifierPreparer(IdentifierPreparer): +class AthenaBaseIdentifierPreparer(IdentifierPreparer): + """Base identifier preparer shared by Athena's DML and DDL preparers. + + Athena identifiers can be three-part (``catalog.namespace.table``), which is + required to target S3 Tables catalogs such as ``s3tablescatalog/``. + SQLAlchemy's default :meth:`IdentifierPreparer.quote_schema` treats the whole + schema string as a single identifier, so a dotted schema like + ``s3tablescatalog/bucket.ns`` collapses into one quoted token + (catalog and namespace merged) instead of two separately quoted tokens. + + See Also: + :class:`AthenaDMLIdentifierPreparer`: Preparer for DML statements. + :class:`AthenaDDLIdentifierPreparer`: Preparer for DDL statements. + """ + + def quote_schema(self, schema: str, force: Any = None) -> str: + """Quote a possibly multi-part (``catalog.namespace``) schema name. + + Each dot-separated part is quoted independently so the catalog and + namespace round-trip correctly in both DDL (backtick) and DML + (double-quote) statements. Athena database and namespace names cannot + contain a dot, so the separator is unambiguous; a schema without a dot + is quoted exactly as before. + + Args: + schema: The schema name, optionally qualified as ``catalog.namespace``. + force: Unused; kept for SQLAlchemy API compatibility. + + Returns: + The quoted, dot-joined schema identifier. + """ + return ".".join(self.quote(part) for part in schema.split(".")) + + +class AthenaDMLIdentifierPreparer(AthenaBaseIdentifierPreparer): """Identifier preparer for Athena DML (SELECT, INSERT, etc.) statements. This preparer handles quoting and escaping of identifiers in DML statements. @@ -29,7 +65,7 @@ class AthenaDMLIdentifierPreparer(IdentifierPreparer): reserved_words: set[str] = SELECT_STATEMENT_RESERVED_WORDS -class AthenaDDLIdentifierPreparer(IdentifierPreparer): +class AthenaDDLIdentifierPreparer(AthenaBaseIdentifierPreparer): """Identifier preparer for Athena DDL (CREATE, ALTER, DROP) statements. This preparer handles quoting and escaping of identifiers in DDL statements. diff --git a/scripts/cloudformation/github_actions_oidc.yaml b/scripts/cloudformation/github_actions_oidc.yaml index 68cec963..832d600d 100644 --- a/scripts/cloudformation/github_actions_oidc.yaml +++ b/scripts/cloudformation/github_actions_oidc.yaml @@ -120,6 +120,30 @@ Resources: Resource: [ !Sub "arn:aws:s3:::*" ] + - PolicyName: s3tables-access + PolicyDocument: + Version: "2012-10-17" + Statement: + # Amazon S3 Tables data plane: create/query/drop namespaces and + # tables in the table bucket used by the SQLAlchemy S3 Tables tests. + - Effect: Allow + Action: [ + "s3tables:*" + ] + Resource: [ + !Sub "arn:aws:s3tables:${AWS::Region}:${AWS::AccountId}:bucket/*" + ] + # Resolve the federated `s3tablescatalog` Glue catalog and obtain + # Lake Formation data access when querying S3 Tables from Athena. + - Effect: Allow + Action: [ + "glue:GetCatalog", + "glue:GetCatalogs", + "lakeformation:GetDataAccess" + ] + Resource: [ + "*" + ] SparkRole: Type: AWS::IAM::Role Properties: diff --git a/tests/__init__.py b/tests/__init__.py index bbe284c9..be0927b8 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -31,6 +31,13 @@ def __init__(self): ) self.default_work_group = os.getenv("AWS_ATHENA_DEFAULT_WORKGROUP", "primary") self.managed_work_group = os.getenv("AWS_ATHENA_MANAGED_WORKGROUP") + # Optional Amazon S3 Tables configuration for the SQLAlchemy dialect tests. + # `s3tables_catalog` is the registered table-bucket catalog, e.g. + # "s3tablescatalog/"; `s3tables_namespace` is a namespace that + # already exists in it. These are optional so the suite stays runnable + # without an S3 Tables bucket; the S3 Tables tests skip when they are unset. + self.s3tables_catalog = os.getenv("AWS_ATHENA_S3_TABLES_CATALOG") + self.s3tables_namespace = os.getenv("AWS_ATHENA_S3_TABLES_NAMESPACE") self.schema = "pyathena_test_" + "".join( random.choices(string.ascii_lowercase + string.digits, k=10) ) diff --git a/tests/pyathena/sqlalchemy/test_base.py b/tests/pyathena/sqlalchemy/test_base.py index a42219ee..d29e7c11 100644 --- a/tests/pyathena/sqlalchemy/test_base.py +++ b/tests/pyathena/sqlalchemy/test_base.py @@ -1,3 +1,4 @@ +import contextlib import re import textwrap import uuid @@ -26,6 +27,25 @@ ) from tests.pyathena.conftest import ENV +# Amazon S3 Tables tests need a pre-provisioned table-bucket catalog and namespace. +# Skip them unless AWS_ATHENA_S3_TABLES_CATALOG / AWS_ATHENA_S3_TABLES_NAMESPACE are set. +requires_s3_tables = pytest.mark.skipif( + not ENV.s3tables_catalog or not ENV.s3tables_namespace, + reason="AWS_ATHENA_S3_TABLES_CATALOG / AWS_ATHENA_S3_TABLES_NAMESPACE are not configured", +) + + +def unique_s3tables_table_name(base: str) -> str: + """Return a per-run-unique S3 Tables table name. + + Other integration tests isolate themselves with a random per-process + ``ENV.schema``, but the S3 Tables tests share one fixed namespace + (``ENV.s3tables_namespace``). The CI matrix runs ``tests/pyathena`` once per + Python version in parallel against the same account, so a fixed table name + would collide across those concurrent jobs; a random suffix keeps them apart. + """ + return f"{base}_{uuid.uuid4().hex[:8]}" + class TestSQLAlchemyAthena: @pytest.mark.parametrize( @@ -2019,6 +2039,104 @@ def test_create_iceberg_table_with_multiple_partition_transform(self, engine): tblproperties = actual.dialect_options["awsathena"]["tblproperties"] assert tblproperties["table_type"] == "ICEBERG" + @requires_s3_tables + def test_create_s3tables_iceberg_table(self, engine): + engine, conn = engine + # S3 Tables are addressed by a three-part identifier + # (catalog.namespace.table) and use managed storage, so the emitted DDL + # quotes the catalog/namespace/table separately and omits LOCATION. + schema = f"{ENV.s3tables_catalog}.{ENV.s3tables_namespace}" + table_name = unique_s3tables_table_name("test_create_s3tables_iceberg_table") + table = Table( + table_name, + MetaData(schema=schema), + Column("col_1", types.String), + Column("col_2", types.Integer), + awsathena_tblproperties={"table_type": "ICEBERG"}, + ) + ddl = CreateTable(table).compile(bind=conn) + + assert str(ddl) == textwrap.dedent( + f""" + CREATE TABLE `{ENV.s3tables_catalog}`.{ENV.s3tables_namespace}.{table_name} ( + \tcol_1 STRING, + \tcol_2 INT + ) + TBLPROPERTIES ( + \t'table_type' = 'ICEBERG' + ) + """ + ) + + table.create(bind=conn) + try: + # NOTE: SQLAlchemy reflection (autoload_with) of an S3 Tables table is + # not covered here. The dialect's introspection path passes the dotted + # schema straight through as the database name and does not split the + # catalog from the namespace, so reflecting a three-part identifier is + # a separate, currently-unsupported concern. Verify creation by querying + # the table directly with its double-quoted three-part identifier. + fqtn = f'"{ENV.s3tables_catalog}"."{ENV.s3tables_namespace}"."{table_name}"' + conn.execute(text(f"SELECT col_1, col_2 FROM {fqtn} LIMIT 0")) + finally: + with contextlib.suppress(Exception): + table.drop(bind=conn) + + @requires_s3_tables + def test_create_s3tables_iceberg_table_with_partition_transform(self, engine): + engine, conn = engine + schema = f"{ENV.s3tables_catalog}.{ENV.s3tables_namespace}" + table_name = unique_s3tables_table_name("test_create_s3tables_partition_transform") + table = Table( + table_name, + MetaData(schema=schema), + Column("col_1", types.String), + Column("dt", types.Date, awsathena_partition=True, awsathena_partition_transform="day"), + awsathena_tblproperties={"table_type": "ICEBERG"}, + ) + ddl = CreateTable(table).compile(bind=conn) + + assert str(ddl) == textwrap.dedent( + f""" + CREATE TABLE `{ENV.s3tables_catalog}`.{ENV.s3tables_namespace}.{table_name} ( + \tcol_1 STRING, + \tdt DATE + ) + PARTITIONED BY ( + \tday(dt) + ) + TBLPROPERTIES ( + \t'table_type' = 'ICEBERG' + ) + """ + ) + + table.create(bind=conn) + with contextlib.suppress(Exception): + table.drop(bind=conn) + + @requires_s3_tables + def test_create_s3tables_table_as_select(self, engine): + engine, conn = engine + # CTAS is not modeled as a SQLAlchemy construct; exercise it as raw SQL. + # The three-part identifier uses double quotes (Presto/Trino convention). + table_name = unique_s3tables_table_name("test_create_s3tables_table_as_select") + fqtn = f'"{ENV.s3tables_catalog}"."{ENV.s3tables_namespace}"."{table_name}"' + conn.execute(text(f"DROP TABLE IF EXISTS {fqtn}")) + try: + conn.execute( + text( + f"CREATE TABLE {fqtn} " + "WITH (table_type = 'ICEBERG') AS " + "SELECT 1 AS id, 'a' AS name" + ) + ) + rows = conn.execute(text(f"SELECT id, name FROM {fqtn}")).fetchall() + assert rows == [(1, "a")] + finally: + with contextlib.suppress(Exception): + conn.execute(text(f"DROP TABLE IF EXISTS {fqtn}")) + def test_insert_from_select_cte_follows_insert_one(self, engine): engine, conn = engine metadata = MetaData(schema=ENV.schema) diff --git a/tests/pyathena/sqlalchemy/test_compiler.py b/tests/pyathena/sqlalchemy/test_compiler.py index 5fc20a65..685fba95 100644 --- a/tests/pyathena/sqlalchemy/test_compiler.py +++ b/tests/pyathena/sqlalchemy/test_compiler.py @@ -1,8 +1,9 @@ from unittest.mock import Mock import pytest -from sqlalchemy import Column, Integer, MetaData, String, Table, exc, func, select +from sqlalchemy import Column, Date, Integer, MetaData, String, Table, exc, func, select from sqlalchemy.sql import literal +from sqlalchemy.sql.ddl import CreateTable from pyathena.sqlalchemy.base import AthenaDialect from pyathena.sqlalchemy.compiler import AthenaTypeCompiler @@ -225,3 +226,60 @@ def test_visit_char_length_func_existing(self): sql_str = str(compiled) assert "length(" in sql_str + + +# Compile-only (no AWS) tests for S3 Tables / multi-part identifier support. +# Amazon S3 Tables are addressed by a three-part identifier whose catalog segment +# is ``s3tablescatalog/`` and use managed storage, so their CREATE +# TABLE statements must omit the LOCATION clause. + + +def test_create_table_s3tables_three_part_identifier_no_location(): + table = Table( + "tbl", + MetaData(schema="s3tablescatalog/bucket.ns"), + Column("id", Integer), + Column("name", String), + awsathena_tblproperties={"table_type": "ICEBERG"}, + ) + ddl = str(CreateTable(table).compile(dialect=AthenaDialect())) + assert "CREATE TABLE `s3tablescatalog/bucket`.ns.tbl (" in ddl + assert "CREATE EXTERNAL TABLE" not in ddl + assert "LOCATION" not in ddl + assert "'table_type' = 'ICEBERG'" in ddl + + +def test_select_s3tables_three_part_identifier(): + table = Table( + "tbl", + MetaData(schema="s3tablescatalog/bucket.ns"), + Column("id", Integer), + ) + dml = str(select(table).compile(dialect=AthenaDialect())) + assert 'FROM "s3tablescatalog/bucket".ns.tbl' in dml + + +def test_create_table_s3tables_partition_transform(): + table = Table( + "tbl", + MetaData(schema="s3tablescatalog/bucket.ns"), + Column("id", Integer), + Column("dt", Date, awsathena_partition=True, awsathena_partition_transform="day"), + awsathena_tblproperties={"table_type": "ICEBERG"}, + ) + ddl = str(CreateTable(table).compile(dialect=AthenaDialect())) + assert "PARTITIONED BY (" in ddl + assert "day(dt)" in ddl + assert "LOCATION" not in ddl + + +def test_create_iceberg_table_without_catalog_still_requires_location(): + # Regression: a non-S3-Tables Iceberg table must still demand a location. + table = Table( + "tbl", + MetaData(schema="some_db"), + Column("id", Integer), + awsathena_tblproperties={"table_type": "ICEBERG"}, + ) + with pytest.raises(exc.CompileError, match="location of the table should be specified"): + CreateTable(table).compile(dialect=AthenaDialect()) diff --git a/tests/pyathena/sqlalchemy/test_preparer.py b/tests/pyathena/sqlalchemy/test_preparer.py new file mode 100644 index 00000000..ad1db7ca --- /dev/null +++ b/tests/pyathena/sqlalchemy/test_preparer.py @@ -0,0 +1,29 @@ +from pyathena.sqlalchemy.base import AthenaDialect +from pyathena.sqlalchemy.preparer import ( + AthenaDDLIdentifierPreparer, + AthenaDMLIdentifierPreparer, +) + +# Multi-part identifier quoting for S3 Tables. Amazon S3 Tables are addressed by a +# three-part identifier (catalog.namespace.table) whose catalog segment is +# ``s3tablescatalog/``, so a dotted schema must be split and each +# part quoted independently. + + +def test_quote_schema_splits_catalog_and_namespace_ddl(): + # DDL preparer quotes with backticks; only the catalog needs quoting + # (it contains "/"), the namespace does not. + preparer = AthenaDDLIdentifierPreparer(AthenaDialect()) + assert preparer.quote_schema("s3tablescatalog/bucket.ns") == "`s3tablescatalog/bucket`.ns" + + +def test_quote_schema_splits_catalog_and_namespace_dml(): + # DML preparer quotes with double quotes (Presto/Trino convention). + preparer = AthenaDMLIdentifierPreparer(AthenaDialect()) + assert preparer.quote_schema("s3tablescatalog/bucket.ns") == '"s3tablescatalog/bucket".ns' + + +def test_quote_schema_single_token_unchanged(): + # A schema without a dot must be quoted exactly as before (no splitting). + preparer = AthenaDDLIdentifierPreparer(AthenaDialect()) + assert preparer.quote_schema("some_db") == "some_db"