From 19e55744ce03c110f9ad94fff9f5da3db3941642 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 14:43:03 +0900 Subject: [PATCH 01/20] feat: define the driver interface and domain types --- internal/driver/backend.go | 80 +++++++++++++++++++++++++++ internal/driver/backend_test.go | 98 +++++++++++++++++++++++++++++++++ internal/driver/driver.go | 37 +++++++++++++ internal/driver/metric.go | 43 +++++++++++++++ internal/driver/statement.go | 30 ++++++++++ 5 files changed, 288 insertions(+) create mode 100644 internal/driver/backend.go create mode 100644 internal/driver/backend_test.go create mode 100644 internal/driver/driver.go create mode 100644 internal/driver/metric.go create mode 100644 internal/driver/statement.go diff --git a/internal/driver/backend.go b/internal/driver/backend.go new file mode 100644 index 0000000..ad09e44 --- /dev/null +++ b/internal/driver/backend.go @@ -0,0 +1,80 @@ +package driver + +import "time" + +// State is a backend's connection state, normalized across drivers. +type State uint8 + +const ( + StateUnknown State = iota + StateActive + StateIdle + StateIdleInTx + StateIdleInTxAborted +) + +func (s State) String() string { + switch s { + case StateUnknown: + return "unknown" + case StateActive: + return "active" + case StateIdle: + return "idle" + case StateIdleInTx: + return "idle-tx" + case StateIdleInTxAborted: + return "idle-tx-aborted" + } + + return "unknown" +} + +// Backend is one server-side connection in an Activity snapshot. +type Backend struct { + PID int64 + User string + DB string + State State + + // Empty when the backend is not waiting. + WaitType string + WaitEvent string + + Query string + + // PIDs blocking this backend; the ▲/⊘ markers are derived across rows. + BlockedBy []int64 + + // now() - query_start and now() - xact_start; nil when the timestamp is NULL. + QueryAge *time.Duration + XactAge *time.Duration + + // Non-client backend, hidden by default. + BackgroundWorker bool +} + +// Duration returns the DURATION-column value: transaction age for +// idle-in-transaction backends, query age otherwise. +func (b Backend) Duration() (time.Duration, bool) { + switch b.State { + case StateIdleInTx, StateIdleInTxAborted: + if b.XactAge != nil { + return *b.XactAge, true + } + case StateUnknown, StateActive, StateIdle: + if b.QueryAge != nil { + return *b.QueryAge, true + } + } + + return 0, false +} + +func (b Backend) Wait() string { + if b.WaitType == "" { + return "" + } + + return b.WaitType + ":" + b.WaitEvent +} diff --git a/internal/driver/backend_test.go b/internal/driver/backend_test.go new file mode 100644 index 0000000..9463bfa --- /dev/null +++ b/internal/driver/backend_test.go @@ -0,0 +1,98 @@ +package driver_test + +import ( + "testing" + "time" + + "github.com/mickamy/dbtop/internal/driver" +) + +func TestStateString(t *testing.T) { + t.Parallel() + + tests := []struct { + state driver.State + want string + }{ + {driver.StateActive, "active"}, + {driver.StateIdle, "idle"}, + {driver.StateIdleInTx, "idle-tx"}, + {driver.StateIdleInTxAborted, "idle-tx-aborted"}, + {driver.StateUnknown, "unknown"}, + } + + for _, tt := range tests { + if got := tt.state.String(); got != tt.want { + t.Errorf("State(%d).String() = %q, want %q", tt.state, got, tt.want) + } + } +} + +func TestBackendDuration(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + backend driver.Backend + want time.Duration + wantOK bool + }{ + { + name: "active uses query age", + backend: driver.Backend{State: driver.StateActive, QueryAge: new(3 * time.Second), XactAge: new(time.Minute)}, + want: 3 * time.Second, + wantOK: true, + }, + { + name: "idle-tx uses xact age", + backend: driver.Backend{State: driver.StateIdleInTx, QueryAge: new(time.Second), XactAge: new(90 * time.Second)}, + want: 90 * time.Second, + wantOK: true, + }, + { + name: "idle-tx-aborted uses xact age", + backend: driver.Backend{State: driver.StateIdleInTxAborted, XactAge: new(30 * time.Second)}, + want: 30 * time.Second, + wantOK: true, + }, + { + name: "active without query age is undefined", + backend: driver.Backend{State: driver.StateActive}, + wantOK: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got, ok := tt.backend.Duration() + if ok != tt.wantOK || got != tt.want { + t.Errorf("Duration() = (%v, %v), want (%v, %v)", got, ok, tt.want, tt.wantOK) + } + }) + } +} + +func TestBackendWait(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + backend driver.Backend + want string + }{ + {"lock wait", driver.Backend{WaitType: "Lock", WaitEvent: "tuple"}, "Lock:tuple"}, + {"not waiting", driver.Backend{}, ""}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + if got := tt.backend.Wait(); got != tt.want { + t.Errorf("Wait() = %q, want %q", got, tt.want) + } + }) + } +} diff --git a/internal/driver/driver.go b/internal/driver/driver.go new file mode 100644 index 0000000..f63ecd3 --- /dev/null +++ b/internal/driver/driver.go @@ -0,0 +1,37 @@ +package driver + +import "context" + +// Driver abstracts a database backend behind the screens the TUI renders. The +// TUI is driver-independent: it only consumes []Backend, MetricSample, and +// []Statement. +type Driver interface { + Activity(ctx context.Context) ([]Backend, error) + Metrics(ctx context.Context) (MetricSample, error) + Statements(ctx context.Context) ([]Statement, error) + ResetStatements(ctx context.Context) error + + // Cancel gracefully stops the running query (pg_cancel_backend / KILL QUERY). + Cancel(ctx context.Context, pid int64) error + + // Terminate forcibly closes the connection (pg_terminate_backend / KILL CONNECTION). + Terminate(ctx context.Context, pid int64) error + + Capabilities(ctx context.Context) (Caps, error) + Close() error +} + +// Caps reports the privileges and features available to the connected user, +// used to degrade gracefully and to render the startup banner. +type Caps struct { + Superuser bool + + // Can read other backends' full query text (pg_monitor / PROCESS). + Monitor bool + + // Can cancel/terminate backends (pg_signal_backend / CONNECTION_ADMIN). + Kill bool + + // Digest source available (pg_stat_statements / performance_schema digest). + Statements bool +} diff --git a/internal/driver/metric.go b/internal/driver/metric.go new file mode 100644 index 0000000..2332182 --- /dev/null +++ b/internal/driver/metric.go @@ -0,0 +1,43 @@ +package driver + +import "time" + +// MetricSample is one polling sample of server-wide health counters. Drivers +// return raw cumulative counters and gauges; per-second rates are derived +// downstream from the delta between successive samples. +type MetricSample struct { + At time.Time + + MaxConnections int + Conns ConnCounts + + // Monotonic counters; rate = delta / dt. + Commits int64 + Rollbacks int64 + BlocksHit int64 + BlocksRead int64 + TuplesInserted int64 + TuplesUpdated int64 + TuplesDeleted int64 + TuplesReturned int64 + TuplesFetched int64 + TempFiles int64 + TempBytes int64 + + // Point-in-time value; use as-is, not as a rate. + WaitingLocks int + + Replicas []ReplicaLag +} + +type ConnCounts struct { + Total int + Active int + Idle int + IdleInTx int +} + +type ReplicaLag struct { + Client string + Lag time.Duration +} diff --git a/internal/driver/statement.go b/internal/driver/statement.go new file mode 100644 index 0000000..e6ac9e6 --- /dev/null +++ b/internal/driver/statement.go @@ -0,0 +1,30 @@ +package driver + +import "time" + +// Statement is one normalized query in the cumulative digest ranking +// (pg_stat_statements / events_statements_summary_by_digest). +type Statement struct { + // queryid (PostgreSQL) or digest (MySQL). + ID string + Query string + + Calls int64 + Total time.Duration // total execution time; default sort key + Mean time.Duration // mean execution time per call + Rows int64 // total rows; per-call is Rows / Calls + + // Full scan without an index (MySQL SUM_NO_INDEX_USED); always false on PostgreSQL. + NoIndex bool + + Min time.Duration + Max time.Duration + Stddev time.Duration + + // PostgreSQL-only. + SharedBlocksHit int64 + SharedBlocksRead int64 + + // MySQL-only. + RowsExamined int64 +} From 0838c3948c1de61bcfef970c7dd43924e6d1975f Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 14:51:13 +0900 Subject: [PATCH 02/20] feat(postgres): add connection setup with a PG 14+ version guard --- go.mod | 10 ++++ go.sum | 26 ++++++++++ internal/driver/postgres/export_test.go | 3 ++ internal/driver/postgres/postgres.go | 63 +++++++++++++++++++++++ internal/driver/postgres/postgres_test.go | 32 ++++++++++++ 5 files changed, 134 insertions(+) create mode 100644 go.sum create mode 100644 internal/driver/postgres/export_test.go create mode 100644 internal/driver/postgres/postgres.go create mode 100644 internal/driver/postgres/postgres_test.go diff --git a/go.mod b/go.mod index 166d873..80f42f2 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,13 @@ module github.com/mickamy/dbtop go 1.26.3 + +require github.com/jackc/pgx/v5 v5.9.2 + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/text v0.29.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..f5b2410 --- /dev/null +++ b/go.sum @@ -0,0 +1,26 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw= +github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/driver/postgres/export_test.go b/internal/driver/postgres/export_test.go new file mode 100644 index 0000000..4f5ef09 --- /dev/null +++ b/internal/driver/postgres/export_test.go @@ -0,0 +1,3 @@ +package postgres + +var GuardVersion = guardVersion diff --git a/internal/driver/postgres/postgres.go b/internal/driver/postgres/postgres.go new file mode 100644 index 0000000..d23eb67 --- /dev/null +++ b/internal/driver/postgres/postgres.go @@ -0,0 +1,63 @@ +package postgres + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// PostgreSQL 14.0 (server_version_num); 14+ gives both total_exec_time and pg_blocking_pids. +const minServerVersionNum = 140000 + +// Small pool: a poll and a kill may run at once, nothing more. +const maxConns = 3 + +type Driver struct { + pool *pgxpool.Pool +} + +// Open connects to dsn and verifies the server is PostgreSQL 14+. The caller +// must Close the returned Driver. +func Open(ctx context.Context, dsn string) (*Driver, error) { + cfg, err := pgxpool.ParseConfig(dsn) + if err != nil { + return nil, fmt.Errorf("parse dsn: %w", err) + } + + cfg.MaxConns = maxConns + + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("connect: %w", err) + } + + var version int + if err := pool.QueryRow(ctx, "SELECT current_setting('server_version_num')::int").Scan(&version); err != nil { + pool.Close() + + return nil, fmt.Errorf("read server version: %w", err) + } + + if err := guardVersion(version); err != nil { + pool.Close() + + return nil, err + } + + return &Driver{pool: pool}, nil +} + +func (d *Driver) Close() error { + d.pool.Close() + + return nil +} + +func guardVersion(num int) error { + if num < minServerVersionNum { + return fmt.Errorf("dbtop requires PostgreSQL 14+ (server_version_num=%d)", num) + } + + return nil +} diff --git a/internal/driver/postgres/postgres_test.go b/internal/driver/postgres/postgres_test.go new file mode 100644 index 0000000..f6bcaee --- /dev/null +++ b/internal/driver/postgres/postgres_test.go @@ -0,0 +1,32 @@ +package postgres_test + +import ( + "testing" + + "github.com/mickamy/dbtop/internal/driver/postgres" +) + +func TestGuardVersion(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + num int + wantErr bool + }{ + {"pg 13 rejected", 130010, true}, + {"pg 14.0 accepted", 140000, false}, + {"pg 16.2 accepted", 160002, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + err := postgres.GuardVersion(tt.num) + if (err != nil) != tt.wantErr { + t.Errorf("GuardVersion(%d) error = %v, wantErr %v", tt.num, err, tt.wantErr) + } + }) + } +} From 8d62dc77d7fb1b88f033631d811446459eccba02 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 15:22:21 +0900 Subject: [PATCH 03/20] feat(postgres): implement the Activity snapshot --- internal/driver/postgres/activity.go | 83 +++++++++++++++++++++++ internal/driver/postgres/activity_test.go | 34 ++++++++++ internal/driver/postgres/export_test.go | 5 +- internal/durations/durations.go | 7 ++ internal/durations/durations_test.go | 31 +++++++++ internal/ptr/ptr.go | 18 +++++ internal/ptr/ptr_test.go | 34 ++++++++++ 7 files changed, 211 insertions(+), 1 deletion(-) create mode 100644 internal/driver/postgres/activity.go create mode 100644 internal/driver/postgres/activity_test.go create mode 100644 internal/durations/durations.go create mode 100644 internal/durations/durations_test.go create mode 100644 internal/ptr/ptr.go create mode 100644 internal/ptr/ptr_test.go diff --git a/internal/driver/postgres/activity.go b/internal/driver/postgres/activity.go new file mode 100644 index 0000000..42a376d --- /dev/null +++ b/internal/driver/postgres/activity.go @@ -0,0 +1,83 @@ +package postgres + +import ( + "context" + "fmt" + + "github.com/mickamy/dbtop/internal/driver" + "github.com/mickamy/dbtop/internal/durations" + "github.com/mickamy/dbtop/internal/ptr" +) + +const activityQuery = ` +SELECT a.pid, a.usename, a.datname, a.state, + a.wait_event_type, a.wait_event, a.query, a.backend_type, + pg_blocking_pids(a.pid) AS blocked_by, + EXTRACT(EPOCH FROM now() - a.query_start)::float8 AS query_age, + EXTRACT(EPOCH FROM now() - a.xact_start)::float8 AS xact_age +FROM pg_stat_activity a +WHERE a.pid <> pg_backend_pid() +ORDER BY query_age DESC NULLS LAST +` + +func (d *Driver) Activity(ctx context.Context) ([]driver.Backend, error) { + rows, err := d.pool.Query(ctx, activityQuery) + if err != nil { + return nil, fmt.Errorf("query activity: %w", err) + } + defer rows.Close() + + var backends []driver.Backend + + for rows.Next() { + var ( + pid int64 + user, db, state *string + waitType, waitEvent *string + query, backendType *string + blockedBy []int64 + queryAge, xactAge *float64 + ) + + if err := rows.Scan( + &pid, &user, &db, &state, &waitType, &waitEvent, &query, &backendType, &blockedBy, &queryAge, &xactAge, + ); err != nil { + return nil, fmt.Errorf("scan activity: %w", err) + } + + backends = append(backends, driver.Backend{ + PID: pid, + User: ptr.OrZero(user), + DB: ptr.OrZero(db), + State: parseState(ptr.OrZero(state)), + WaitType: ptr.OrZero(waitType), + WaitEvent: ptr.OrZero(waitEvent), + Query: ptr.OrZero(query), + BlockedBy: blockedBy, + QueryAge: ptr.Map(queryAge, durations.FromSeconds), + XactAge: ptr.Map(xactAge, durations.FromSeconds), + BackgroundWorker: ptr.OrZero(backendType) != "client backend", + }) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate activity: %w", err) + } + + return backends, nil +} + +func parseState(s string) driver.State { + switch s { + case "active": + return driver.StateActive + case "idle": + return driver.StateIdle + case "idle in transaction": + return driver.StateIdleInTx + case "idle in transaction (aborted)": + return driver.StateIdleInTxAborted + default: + return driver.StateUnknown + } +} diff --git a/internal/driver/postgres/activity_test.go b/internal/driver/postgres/activity_test.go new file mode 100644 index 0000000..599fed2 --- /dev/null +++ b/internal/driver/postgres/activity_test.go @@ -0,0 +1,34 @@ +package postgres_test + +import ( + "testing" + + "github.com/mickamy/dbtop/internal/driver" + "github.com/mickamy/dbtop/internal/driver/postgres" +) + +func TestParseState(t *testing.T) { + t.Parallel() + + tests := []struct { + in string + want driver.State + }{ + {"active", driver.StateActive}, + {"idle", driver.StateIdle}, + {"idle in transaction", driver.StateIdleInTx}, + {"idle in transaction (aborted)", driver.StateIdleInTxAborted}, + {"", driver.StateUnknown}, + {"fastpath function call", driver.StateUnknown}, + } + + for _, tt := range tests { + t.Run(tt.in, func(t *testing.T) { + t.Parallel() + + if got := postgres.ParseState(tt.in); got != tt.want { + t.Errorf("ParseState(%q) = %v, want %v", tt.in, got, tt.want) + } + }) + } +} diff --git a/internal/driver/postgres/export_test.go b/internal/driver/postgres/export_test.go index 4f5ef09..f4bc785 100644 --- a/internal/driver/postgres/export_test.go +++ b/internal/driver/postgres/export_test.go @@ -1,3 +1,6 @@ package postgres -var GuardVersion = guardVersion +var ( + GuardVersion = guardVersion + ParseState = parseState +) diff --git a/internal/durations/durations.go b/internal/durations/durations.go new file mode 100644 index 0000000..ff4c7ed --- /dev/null +++ b/internal/durations/durations.go @@ -0,0 +1,7 @@ +package durations + +import "time" + +func FromSeconds(s float64) time.Duration { + return time.Duration(s * float64(time.Second)) +} diff --git a/internal/durations/durations_test.go b/internal/durations/durations_test.go new file mode 100644 index 0000000..d74f197 --- /dev/null +++ b/internal/durations/durations_test.go @@ -0,0 +1,31 @@ +package durations_test + +import ( + "testing" + "time" + + "github.com/mickamy/dbtop/internal/durations" +) + +func TestFromSeconds(t *testing.T) { + t.Parallel() + + tests := []struct { + in float64 + want time.Duration + }{ + {1.5, 1500 * time.Millisecond}, + {0, 0}, + {90, 90 * time.Second}, + } + + for _, tt := range tests { + t.Run(tt.want.String(), func(t *testing.T) { + t.Parallel() + + if got := durations.FromSeconds(tt.in); got != tt.want { + t.Errorf("FromSeconds(%v) = %v, want %v", tt.in, got, tt.want) + } + }) + } +} diff --git a/internal/ptr/ptr.go b/internal/ptr/ptr.go new file mode 100644 index 0000000..096078b --- /dev/null +++ b/internal/ptr/ptr.go @@ -0,0 +1,18 @@ +package ptr + +func OrZero[T any](p *T) T { + if p == nil { + var zero T + return zero + } + + return *p +} + +func Map[T, U any](p *T, f func(T) U) *U { + if p == nil { + return nil + } + + return new(f(*p)) +} diff --git a/internal/ptr/ptr_test.go b/internal/ptr/ptr_test.go new file mode 100644 index 0000000..a957117 --- /dev/null +++ b/internal/ptr/ptr_test.go @@ -0,0 +1,34 @@ +package ptr_test + +import ( + "testing" + + "github.com/mickamy/dbtop/internal/ptr" +) + +func TestOrZero(t *testing.T) { + t.Parallel() + + if got := ptr.OrZero[string](nil); got != "" { + t.Errorf("OrZero(nil) = %q, want empty", got) + } + + if got := ptr.OrZero(new("hello")); got != "hello" { + t.Errorf("OrZero(*\"hello\") = %q, want \"hello\"", got) + } +} + +func TestMap(t *testing.T) { + t.Parallel() + + double := func(n int) int { return n * 2 } + + if got := ptr.Map[int, int](nil, double); got != nil { + t.Errorf("Map(nil) = %v, want nil", got) + } + + got := ptr.Map(new(21), double) + if got == nil || *got != 42 { + t.Errorf("Map(*21) = %v, want 42", got) + } +} From 861209314f4262fef4c54381ed9af9973d034ba0 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 15:24:46 +0900 Subject: [PATCH 04/20] feat(postgres): implement Cancel and Terminate --- internal/driver/postgres/control.go | 33 +++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 internal/driver/postgres/control.go diff --git a/internal/driver/postgres/control.go b/internal/driver/postgres/control.go new file mode 100644 index 0000000..3df97b6 --- /dev/null +++ b/internal/driver/postgres/control.go @@ -0,0 +1,33 @@ +package postgres + +import ( + "context" + "fmt" +) + +const ( + cancelQuery = `SELECT pg_cancel_backend($1)` + terminateQuery = `SELECT pg_terminate_backend($1)` +) + +func (d *Driver) Cancel(ctx context.Context, pid int64) error { + return d.signalBackend(ctx, cancelQuery, pid) +} + +func (d *Driver) Terminate(ctx context.Context, pid int64) error { + return d.signalBackend(ctx, terminateQuery, pid) +} + +func (d *Driver) signalBackend(ctx context.Context, query string, pid int64) error { + var sent bool + if err := d.pool.QueryRow(ctx, query, pid).Scan(&sent); err != nil { + return fmt.Errorf("signal backend %d: %w", pid, err) + } + + // false means there was no such backend to signal. + if !sent { + return fmt.Errorf("backend %d not found", pid) + } + + return nil +} From 9683256bebfa93b28e7db9d4f2cf2c836fe5ca62 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 15:28:17 +0900 Subject: [PATCH 05/20] feat(postgres): implement Capabilities probing --- internal/driver/driver.go | 8 +++--- internal/driver/postgres/capabilities.go | 32 ++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) create mode 100644 internal/driver/postgres/capabilities.go diff --git a/internal/driver/driver.go b/internal/driver/driver.go index f63ecd3..41f5e52 100644 --- a/internal/driver/driver.go +++ b/internal/driver/driver.go @@ -17,13 +17,13 @@ type Driver interface { // Terminate forcibly closes the connection (pg_terminate_backend / KILL CONNECTION). Terminate(ctx context.Context, pid int64) error - Capabilities(ctx context.Context) (Caps, error) + Capabilities(ctx context.Context) (Capabilities, error) Close() error } -// Caps reports the privileges and features available to the connected user, -// used to degrade gracefully and to render the startup banner. -type Caps struct { +// Capabilities reports the privileges and features available to the connected +// user, used to degrade gracefully and to render the startup banner. +type Capabilities struct { Superuser bool // Can read other backends' full query text (pg_monitor / PROCESS). diff --git a/internal/driver/postgres/capabilities.go b/internal/driver/postgres/capabilities.go new file mode 100644 index 0000000..8263cf1 --- /dev/null +++ b/internal/driver/postgres/capabilities.go @@ -0,0 +1,32 @@ +package postgres + +import ( + "context" + "fmt" + + "github.com/mickamy/dbtop/internal/driver" +) + +const capabilitiesQuery = ` +SELECT + current_setting('is_superuser')::bool, + pg_has_role(current_user, 'pg_monitor', 'MEMBER'), + pg_has_role(current_user, 'pg_signal_backend', 'MEMBER'), + EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_stat_statements') +` + +func (d *Driver) Capabilities(ctx context.Context) (driver.Capabilities, error) { + var capabilities driver.Capabilities + + err := d.pool.QueryRow(ctx, capabilitiesQuery).Scan( + &capabilities.Superuser, + &capabilities.Monitor, + &capabilities.Kill, + &capabilities.Statements, + ) + if err != nil { + return driver.Capabilities{}, fmt.Errorf("query capabilities: %w", err) + } + + return capabilities, nil +} From 45b24ccaabfbde178a7874d7fcb12c2b3b0c2fb2 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 15:34:34 +0900 Subject: [PATCH 06/20] feat(postgres): implement Statements and ResetStatements --- internal/driver/postgres/statements.go | 105 +++++++++++++++++++++++++ internal/durations/durations.go | 4 + internal/durations/durations_test.go | 23 ++++++ 3 files changed, 132 insertions(+) create mode 100644 internal/driver/postgres/statements.go diff --git a/internal/driver/postgres/statements.go b/internal/driver/postgres/statements.go new file mode 100644 index 0000000..37ce4e6 --- /dev/null +++ b/internal/driver/postgres/statements.go @@ -0,0 +1,105 @@ +package postgres + +import ( + "context" + "fmt" + "strconv" + + "github.com/mickamy/dbtop/internal/driver" + "github.com/mickamy/dbtop/internal/durations" + "github.com/mickamy/dbtop/internal/ptr" +) + +const statementsQuery = ` +SELECT + queryid, + query, + calls, + total_exec_time, + mean_exec_time, + min_exec_time, + max_exec_time, + stddev_exec_time, + rows, + shared_blks_hit, + shared_blks_read +FROM pg_stat_statements +ORDER BY total_exec_time DESC +LIMIT 100 +` + +const resetStatementsQuery = `SELECT pg_stat_statements_reset()` + +func (d *Driver) Statements(ctx context.Context) ([]driver.Statement, error) { + rows, err := d.pool.Query(ctx, statementsQuery) + if err != nil { + return nil, fmt.Errorf("query statements: %w", err) + } + defer rows.Close() + + var statements []driver.Statement + + for rows.Next() { + var ( + queryID *int64 + query *string + calls int64 + total, mean float64 + minTime, maxTime, stddevTime float64 + rowCount int64 + blocksHit, blocksRead int64 + ) + + if err := rows.Scan( + &queryID, + &query, + &calls, + &total, + &mean, + &minTime, + &maxTime, + &stddevTime, + &rowCount, + &blocksHit, + &blocksRead, + ); err != nil { + return nil, fmt.Errorf("scan statement: %w", err) + } + + statements = append(statements, driver.Statement{ + ID: queryIDString(queryID), + Query: ptr.OrZero(query), + Calls: calls, + Total: durations.FromMillis(total), + Mean: durations.FromMillis(mean), + Rows: rowCount, + Min: durations.FromMillis(minTime), + Max: durations.FromMillis(maxTime), + Stddev: durations.FromMillis(stddevTime), + SharedBlocksHit: blocksHit, + SharedBlocksRead: blocksRead, + }) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate statements: %w", err) + } + + return statements, nil +} + +func (d *Driver) ResetStatements(ctx context.Context) error { + if _, err := d.pool.Exec(ctx, resetStatementsQuery); err != nil { + return fmt.Errorf("reset statements: %w", err) + } + + return nil +} + +func queryIDString(id *int64) string { + if id == nil { + return "" + } + + return strconv.FormatInt(*id, 10) +} diff --git a/internal/durations/durations.go b/internal/durations/durations.go index ff4c7ed..641569f 100644 --- a/internal/durations/durations.go +++ b/internal/durations/durations.go @@ -5,3 +5,7 @@ import "time" func FromSeconds(s float64) time.Duration { return time.Duration(s * float64(time.Second)) } + +func FromMillis(ms float64) time.Duration { + return time.Duration(ms * float64(time.Millisecond)) +} diff --git a/internal/durations/durations_test.go b/internal/durations/durations_test.go index d74f197..90240c6 100644 --- a/internal/durations/durations_test.go +++ b/internal/durations/durations_test.go @@ -29,3 +29,26 @@ func TestFromSeconds(t *testing.T) { }) } } + +func TestFromMillis(t *testing.T) { + t.Parallel() + + tests := []struct { + in float64 + want time.Duration + }{ + {0.26, 260 * time.Microsecond}, + {0, 0}, + {1500, 1500 * time.Millisecond}, + } + + for _, tt := range tests { + t.Run(tt.want.String(), func(t *testing.T) { + t.Parallel() + + if got := durations.FromMillis(tt.in); got != tt.want { + t.Errorf("FromMillis(%v) = %v, want %v", tt.in, got, tt.want) + } + }) + } +} From 8bc52539fa36bc252a69a7e57daee47fb01430e9 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 15:38:51 +0900 Subject: [PATCH 07/20] feat(postgres): implement Metrics and assert the Driver interface --- internal/driver/postgres/metrics.go | 111 +++++++++++++++++++++++++++ internal/driver/postgres/postgres.go | 4 + 2 files changed, 115 insertions(+) create mode 100644 internal/driver/postgres/metrics.go diff --git a/internal/driver/postgres/metrics.go b/internal/driver/postgres/metrics.go new file mode 100644 index 0000000..5ed1e3b --- /dev/null +++ b/internal/driver/postgres/metrics.go @@ -0,0 +1,111 @@ +package postgres + +import ( + "context" + "fmt" + "time" + + "github.com/mickamy/dbtop/internal/driver" + "github.com/mickamy/dbtop/internal/durations" + "github.com/mickamy/dbtop/internal/ptr" +) + +const metricsQuery = ` +SELECT + current_setting('max_connections')::int, + (SELECT count(*) FROM pg_stat_activity WHERE backend_type = 'client backend'), + (SELECT count(*) FROM pg_stat_activity WHERE backend_type = 'client backend' AND state = 'active'), + (SELECT count(*) FROM pg_stat_activity WHERE backend_type = 'client backend' AND state = 'idle'), + (SELECT count(*) FROM pg_stat_activity + WHERE backend_type = 'client backend' + AND state IN ('idle in transaction', 'idle in transaction (aborted)')), + (SELECT count(*) FROM pg_locks WHERE NOT granted), + d.xact_commit, + d.xact_rollback, + d.blks_hit, + d.blks_read, + d.tup_inserted, + d.tup_updated, + d.tup_deleted, + d.tup_returned, + d.tup_fetched, + d.temp_files, + d.temp_bytes +FROM pg_stat_database d +WHERE d.datname = current_database() +` + +const replicaLagQuery = ` +SELECT + coalesce(nullif(client_addr::text, ''), application_name), + EXTRACT(EPOCH FROM replay_lag)::float8 +FROM pg_stat_replication +` + +func (d *Driver) Metrics(ctx context.Context) (driver.MetricSample, error) { + sample := driver.MetricSample{At: time.Now()} + + err := d.pool.QueryRow(ctx, metricsQuery).Scan( + &sample.MaxConnections, + &sample.Conns.Total, + &sample.Conns.Active, + &sample.Conns.Idle, + &sample.Conns.IdleInTx, + &sample.WaitingLocks, + &sample.Commits, + &sample.Rollbacks, + &sample.BlocksHit, + &sample.BlocksRead, + &sample.TuplesInserted, + &sample.TuplesUpdated, + &sample.TuplesDeleted, + &sample.TuplesReturned, + &sample.TuplesFetched, + &sample.TempFiles, + &sample.TempBytes, + ) + if err != nil { + return driver.MetricSample{}, fmt.Errorf("query metrics: %w", err) + } + + replicas, err := d.replicaLag(ctx) + if err != nil { + return driver.MetricSample{}, err + } + + sample.Replicas = replicas + + return sample, nil +} + +func (d *Driver) replicaLag(ctx context.Context) ([]driver.ReplicaLag, error) { + rows, err := d.pool.Query(ctx, replicaLagQuery) + if err != nil { + return nil, fmt.Errorf("query replica lag: %w", err) + } + defer rows.Close() + + var replicas []driver.ReplicaLag + + for rows.Next() { + var ( + client *string + lagSec *float64 + ) + + if err := rows.Scan(&client, &lagSec); err != nil { + return nil, fmt.Errorf("scan replica lag: %w", err) + } + + replicas = append(replicas, driver.ReplicaLag{ + Client: ptr.OrZero(client), + Lag: durations.FromSeconds(ptr.OrZero(lagSec)), + }) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate replica lag: %w", err) + } + + return replicas, nil +} diff --git a/internal/driver/postgres/postgres.go b/internal/driver/postgres/postgres.go index d23eb67..e8481a2 100644 --- a/internal/driver/postgres/postgres.go +++ b/internal/driver/postgres/postgres.go @@ -5,8 +5,12 @@ import ( "fmt" "github.com/jackc/pgx/v5/pgxpool" + + "github.com/mickamy/dbtop/internal/driver" ) +var _ driver.Driver = (*Driver)(nil) + // PostgreSQL 14.0 (server_version_num); 14+ gives both total_exec_time and pg_blocking_pids. const minServerVersionNum = 140000 From 179a8bdf927f04929108edc3c82fad6297eef807 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 15:45:18 +0900 Subject: [PATCH 08/20] fix(postgres): sort activity by the displayed duration --- internal/driver/postgres/activity.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/driver/postgres/activity.go b/internal/driver/postgres/activity.go index 42a376d..f9c4bcc 100644 --- a/internal/driver/postgres/activity.go +++ b/internal/driver/postgres/activity.go @@ -17,7 +17,11 @@ SELECT a.pid, a.usename, a.datname, a.state, EXTRACT(EPOCH FROM now() - a.xact_start)::float8 AS xact_age FROM pg_stat_activity a WHERE a.pid <> pg_backend_pid() -ORDER BY query_age DESC NULLS LAST +ORDER BY CASE + WHEN a.state IN ('idle in transaction', 'idle in transaction (aborted)') + THEN EXTRACT(EPOCH FROM now() - a.xact_start) + ELSE EXTRACT(EPOCH FROM now() - a.query_start) + END DESC NULLS LAST ` func (d *Driver) Activity(ctx context.Context) ([]driver.Backend, error) { From 39906704684204201dfa2fe94e3a11a5e35dded0 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 15:46:29 +0900 Subject: [PATCH 09/20] perf(postgres): aggregate activity connection counts in one scan --- internal/driver/postgres/metrics.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/internal/driver/postgres/metrics.go b/internal/driver/postgres/metrics.go index 5ed1e3b..bedf068 100644 --- a/internal/driver/postgres/metrics.go +++ b/internal/driver/postgres/metrics.go @@ -13,12 +13,10 @@ import ( const metricsQuery = ` SELECT current_setting('max_connections')::int, - (SELECT count(*) FROM pg_stat_activity WHERE backend_type = 'client backend'), - (SELECT count(*) FROM pg_stat_activity WHERE backend_type = 'client backend' AND state = 'active'), - (SELECT count(*) FROM pg_stat_activity WHERE backend_type = 'client backend' AND state = 'idle'), - (SELECT count(*) FROM pg_stat_activity - WHERE backend_type = 'client backend' - AND state IN ('idle in transaction', 'idle in transaction (aborted)')), + c.total, + c.active, + c.idle, + c.idle_tx, (SELECT count(*) FROM pg_locks WHERE NOT granted), d.xact_commit, d.xact_rollback, @@ -32,6 +30,15 @@ SELECT d.temp_files, d.temp_bytes FROM pg_stat_database d +CROSS JOIN ( + SELECT + count(*) AS total, + count(*) FILTER (WHERE state = 'active') AS active, + count(*) FILTER (WHERE state = 'idle') AS idle, + count(*) FILTER (WHERE state IN ('idle in transaction', 'idle in transaction (aborted)')) AS idle_tx + FROM pg_stat_activity + WHERE backend_type = 'client backend' +) c WHERE d.datname = current_database() ` From e8b27084e180f99b493b739e3e82b6aa646222b1 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 15:49:32 +0900 Subject: [PATCH 10/20] fix(postgres): exclude dbtop's own connections from metrics counts --- internal/driver/postgres/metrics.go | 3 ++- internal/driver/postgres/postgres.go | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/driver/postgres/metrics.go b/internal/driver/postgres/metrics.go index bedf068..8a3f27f 100644 --- a/internal/driver/postgres/metrics.go +++ b/internal/driver/postgres/metrics.go @@ -38,6 +38,7 @@ CROSS JOIN ( count(*) FILTER (WHERE state IN ('idle in transaction', 'idle in transaction (aborted)')) AS idle_tx FROM pg_stat_activity WHERE backend_type = 'client backend' + AND application_name <> $1 ) c WHERE d.datname = current_database() ` @@ -52,7 +53,7 @@ FROM pg_stat_replication func (d *Driver) Metrics(ctx context.Context) (driver.MetricSample, error) { sample := driver.MetricSample{At: time.Now()} - err := d.pool.QueryRow(ctx, metricsQuery).Scan( + err := d.pool.QueryRow(ctx, metricsQuery, appName).Scan( &sample.MaxConnections, &sample.Conns.Total, &sample.Conns.Active, diff --git a/internal/driver/postgres/postgres.go b/internal/driver/postgres/postgres.go index e8481a2..73cde6b 100644 --- a/internal/driver/postgres/postgres.go +++ b/internal/driver/postgres/postgres.go @@ -17,6 +17,9 @@ const minServerVersionNum = 140000 // Small pool: a poll and a kill may run at once, nothing more. const maxConns = 3 +// appName tags dbtop's own connections so they can be excluded from metrics. +const appName = "dbtop" + type Driver struct { pool *pgxpool.Pool } @@ -30,6 +33,7 @@ func Open(ctx context.Context, dsn string) (*Driver, error) { } cfg.MaxConns = maxConns + cfg.ConnConfig.RuntimeParams["application_name"] = appName pool, err := pgxpool.NewWithConfig(ctx, cfg) if err != nil { From ff6e7b6279b24a3ff3f698afad365619d83e1d77 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 15:51:25 +0900 Subject: [PATCH 11/20] test(postgres): add integration tests behind a build tag --- internal/driver/postgres/integration_test.go | 96 ++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 internal/driver/postgres/integration_test.go diff --git a/internal/driver/postgres/integration_test.go b/internal/driver/postgres/integration_test.go new file mode 100644 index 0000000..61888f0 --- /dev/null +++ b/internal/driver/postgres/integration_test.go @@ -0,0 +1,96 @@ +//go:build integration + +package postgres_test + +import ( + "os" + "testing" + + "github.com/mickamy/dbtop/internal/driver/postgres" +) + +// Run with: DBTOP_TEST_DSN=postgres://... go test -tags integration ./internal/driver/postgres/ +func openTestDriver(t *testing.T) *postgres.Driver { + t.Helper() + + dsn := os.Getenv("DBTOP_TEST_DSN") + if dsn == "" { + t.Skip("set DBTOP_TEST_DSN to run integration tests") + } + + d, err := postgres.Open(t.Context(), dsn) + if err != nil { + t.Fatalf("Open: %v", err) + } + + t.Cleanup(func() { + _ = d.Close() + }) + + return d +} + +func TestIntegrationActivity(t *testing.T) { + t.Parallel() + + d := openTestDriver(t) + + backends, err := d.Activity(t.Context()) + if err != nil { + t.Fatalf("Activity: %v", err) + } + + for _, b := range backends { + if b.PID == 0 { + t.Errorf("backend has zero PID: %+v", b) + } + } +} + +func TestIntegrationMetrics(t *testing.T) { + t.Parallel() + + d := openTestDriver(t) + + m, err := d.Metrics(t.Context()) + if err != nil { + t.Fatalf("Metrics: %v", err) + } + + if m.MaxConnections <= 0 { + t.Errorf("MaxConnections = %d, want > 0", m.MaxConnections) + } + if m.At.IsZero() { + t.Error("At is zero") + } +} + +func TestIntegrationCapabilities(t *testing.T) { + t.Parallel() + + d := openTestDriver(t) + + // Smoke test: the probe must succeed; values depend on the connected role. + if _, err := d.Capabilities(t.Context()); err != nil { + t.Fatalf("Capabilities: %v", err) + } +} + +func TestIntegrationStatements(t *testing.T) { + t.Parallel() + + d := openTestDriver(t) + ctx := t.Context() + + caps, err := d.Capabilities(ctx) + if err != nil { + t.Fatalf("Capabilities: %v", err) + } + if !caps.Statements { + t.Skip("pg_stat_statements not available") + } + + if _, err := d.Statements(ctx); err != nil { + t.Fatalf("Statements: %v", err) + } +} From 08a301572b65256704973cbda72ab6e8ba9779f5 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 15:53:22 +0900 Subject: [PATCH 12/20] fix(driver): return no duration for idle backends --- internal/driver/backend.go | 13 +++++++------ internal/driver/backend_test.go | 5 +++++ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/internal/driver/backend.go b/internal/driver/backend.go index ad09e44..a66156b 100644 --- a/internal/driver/backend.go +++ b/internal/driver/backend.go @@ -54,18 +54,19 @@ type Backend struct { BackgroundWorker bool } -// Duration returns the DURATION-column value: transaction age for -// idle-in-transaction backends, query age otherwise. +// Duration returns the value shown in the DURATION column. Idle and unknown +// backends have none: their query_start is stale from the last query. func (b Backend) Duration() (time.Duration, bool) { switch b.State { + case StateActive: + if b.QueryAge != nil { + return *b.QueryAge, true + } case StateIdleInTx, StateIdleInTxAborted: if b.XactAge != nil { return *b.XactAge, true } - case StateUnknown, StateActive, StateIdle: - if b.QueryAge != nil { - return *b.QueryAge, true - } + case StateUnknown, StateIdle: } return 0, false diff --git a/internal/driver/backend_test.go b/internal/driver/backend_test.go index 9463bfa..cd5bddd 100644 --- a/internal/driver/backend_test.go +++ b/internal/driver/backend_test.go @@ -60,6 +60,11 @@ func TestBackendDuration(t *testing.T) { backend: driver.Backend{State: driver.StateActive}, wantOK: false, }, + { + name: "idle ignores stale query age", + backend: driver.Backend{State: driver.StateIdle, QueryAge: new(2 * time.Hour)}, + wantOK: false, + }, } for _, tt := range tests { From b1673f59e0119c81f0b0226e3c3aa08622597361 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 15:54:23 +0900 Subject: [PATCH 13/20] refactor(driver): rename BackgroundWorker to SystemBackend --- internal/driver/backend.go | 4 ++-- internal/driver/postgres/activity.go | 22 +++++++++++----------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/internal/driver/backend.go b/internal/driver/backend.go index a66156b..06be905 100644 --- a/internal/driver/backend.go +++ b/internal/driver/backend.go @@ -50,8 +50,8 @@ type Backend struct { QueryAge *time.Duration XactAge *time.Duration - // Non-client backend, hidden by default. - BackgroundWorker bool + // System-internal (non-client) backend, hidden by default. + SystemBackend bool } // Duration returns the value shown in the DURATION column. Idle and unknown diff --git a/internal/driver/postgres/activity.go b/internal/driver/postgres/activity.go index f9c4bcc..7d6478b 100644 --- a/internal/driver/postgres/activity.go +++ b/internal/driver/postgres/activity.go @@ -50,17 +50,17 @@ func (d *Driver) Activity(ctx context.Context) ([]driver.Backend, error) { } backends = append(backends, driver.Backend{ - PID: pid, - User: ptr.OrZero(user), - DB: ptr.OrZero(db), - State: parseState(ptr.OrZero(state)), - WaitType: ptr.OrZero(waitType), - WaitEvent: ptr.OrZero(waitEvent), - Query: ptr.OrZero(query), - BlockedBy: blockedBy, - QueryAge: ptr.Map(queryAge, durations.FromSeconds), - XactAge: ptr.Map(xactAge, durations.FromSeconds), - BackgroundWorker: ptr.OrZero(backendType) != "client backend", + PID: pid, + User: ptr.OrZero(user), + DB: ptr.OrZero(db), + State: parseState(ptr.OrZero(state)), + WaitType: ptr.OrZero(waitType), + WaitEvent: ptr.OrZero(waitEvent), + Query: ptr.OrZero(query), + BlockedBy: blockedBy, + QueryAge: ptr.Map(queryAge, durations.FromSeconds), + XactAge: ptr.Map(xactAge, durations.FromSeconds), + SystemBackend: ptr.OrZero(backendType) != "client backend", }) } From d0d1f09c12bd00b3ed604b9cd1456ad19b26a927 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 16:01:57 +0900 Subject: [PATCH 14/20] fix(postgres): sort idle backends last in activity --- internal/driver/postgres/activity.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/driver/postgres/activity.go b/internal/driver/postgres/activity.go index 7d6478b..5c26861 100644 --- a/internal/driver/postgres/activity.go +++ b/internal/driver/postgres/activity.go @@ -18,9 +18,11 @@ SELECT a.pid, a.usename, a.datname, a.state, FROM pg_stat_activity a WHERE a.pid <> pg_backend_pid() ORDER BY CASE + WHEN a.state = 'active' + THEN EXTRACT(EPOCH FROM now() - a.query_start) WHEN a.state IN ('idle in transaction', 'idle in transaction (aborted)') THEN EXTRACT(EPOCH FROM now() - a.xact_start) - ELSE EXTRACT(EPOCH FROM now() - a.query_start) + ELSE NULL END DESC NULLS LAST ` From cb8add8e2307a68f642d869a24d979737ed1d14f Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 16:02:38 +0900 Subject: [PATCH 15/20] fix(postgres): cast pg_blocking_pids to bigint[] for portable scanning --- internal/driver/postgres/activity.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/driver/postgres/activity.go b/internal/driver/postgres/activity.go index 5c26861..2c26ede 100644 --- a/internal/driver/postgres/activity.go +++ b/internal/driver/postgres/activity.go @@ -12,7 +12,7 @@ import ( const activityQuery = ` SELECT a.pid, a.usename, a.datname, a.state, a.wait_event_type, a.wait_event, a.query, a.backend_type, - pg_blocking_pids(a.pid) AS blocked_by, + pg_blocking_pids(a.pid)::bigint[] AS blocked_by, EXTRACT(EPOCH FROM now() - a.query_start)::float8 AS query_age, EXTRACT(EPOCH FROM now() - a.xact_start)::float8 AS xact_age FROM pg_stat_activity a From c8089dd93cb1e0ca7921f15afe476c528b882882 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 16:06:57 +0900 Subject: [PATCH 16/20] ci: bump codecov-action to v5 --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ff7484f..c419d96 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -61,7 +61,7 @@ jobs: - name: Upload coverage to Codecov if: always() - uses: codecov/codecov-action@v4 + uses: codecov/codecov-action@v5 with: files: ./cover.out token: ${{ secrets.CODECOV_TOKEN }} From a5b5c25975ca082941f118eab71ec567f59e2be8 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 16:09:08 +0900 Subject: [PATCH 17/20] fix(postgres): scope statements to the current database --- internal/driver/postgres/statements.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/driver/postgres/statements.go b/internal/driver/postgres/statements.go index 37ce4e6..16573d7 100644 --- a/internal/driver/postgres/statements.go +++ b/internal/driver/postgres/statements.go @@ -24,6 +24,7 @@ SELECT shared_blks_hit, shared_blks_read FROM pg_stat_statements +WHERE dbid = (SELECT oid FROM pg_database WHERE datname = current_database()) ORDER BY total_exec_time DESC LIMIT 100 ` From 6cbdab8f461ae54635742edd946b7472faf56786 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 16:10:08 +0900 Subject: [PATCH 18/20] fix(postgres): exclude dbtop's own connections from activity --- internal/driver/postgres/activity.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/driver/postgres/activity.go b/internal/driver/postgres/activity.go index 2c26ede..ca1c9e6 100644 --- a/internal/driver/postgres/activity.go +++ b/internal/driver/postgres/activity.go @@ -17,6 +17,7 @@ SELECT a.pid, a.usename, a.datname, a.state, EXTRACT(EPOCH FROM now() - a.xact_start)::float8 AS xact_age FROM pg_stat_activity a WHERE a.pid <> pg_backend_pid() + AND a.application_name <> $1 ORDER BY CASE WHEN a.state = 'active' THEN EXTRACT(EPOCH FROM now() - a.query_start) @@ -27,7 +28,7 @@ ORDER BY CASE ` func (d *Driver) Activity(ctx context.Context) ([]driver.Backend, error) { - rows, err := d.pool.Query(ctx, activityQuery) + rows, err := d.pool.Query(ctx, activityQuery, appName) if err != nil { return nil, fmt.Errorf("query activity: %w", err) } From 75f27d58ede3227fe08d4d1d03cac3d88aa0e900 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 16:10:43 +0900 Subject: [PATCH 19/20] fix(postgres): grant monitor and kill to superusers --- internal/driver/postgres/capabilities.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/driver/postgres/capabilities.go b/internal/driver/postgres/capabilities.go index 8263cf1..e97637d 100644 --- a/internal/driver/postgres/capabilities.go +++ b/internal/driver/postgres/capabilities.go @@ -28,5 +28,11 @@ func (d *Driver) Capabilities(ctx context.Context) (driver.Capabilities, error) return driver.Capabilities{}, fmt.Errorf("query capabilities: %w", err) } + // A superuser can always monitor and kill, regardless of role membership. + if capabilities.Superuser { + capabilities.Monitor = true + capabilities.Kill = true + } + return capabilities, nil } From 816bec6f888dfac54984ab7198a899aef7671064 Mon Sep 17 00:00:00 2001 From: Tetsuro Mikami Date: Wed, 27 May 2026 16:18:23 +0900 Subject: [PATCH 20/20] fix(postgres): make the dbtop connection filter NULL-safe --- internal/driver/postgres/activity.go | 2 +- internal/driver/postgres/metrics.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/driver/postgres/activity.go b/internal/driver/postgres/activity.go index ca1c9e6..6018ca8 100644 --- a/internal/driver/postgres/activity.go +++ b/internal/driver/postgres/activity.go @@ -17,7 +17,7 @@ SELECT a.pid, a.usename, a.datname, a.state, EXTRACT(EPOCH FROM now() - a.xact_start)::float8 AS xact_age FROM pg_stat_activity a WHERE a.pid <> pg_backend_pid() - AND a.application_name <> $1 + AND coalesce(a.application_name, '') <> $1 ORDER BY CASE WHEN a.state = 'active' THEN EXTRACT(EPOCH FROM now() - a.query_start) diff --git a/internal/driver/postgres/metrics.go b/internal/driver/postgres/metrics.go index 8a3f27f..f920514 100644 --- a/internal/driver/postgres/metrics.go +++ b/internal/driver/postgres/metrics.go @@ -38,7 +38,7 @@ CROSS JOIN ( count(*) FILTER (WHERE state IN ('idle in transaction', 'idle in transaction (aborted)')) AS idle_tx FROM pg_stat_activity WHERE backend_type = 'client backend' - AND application_name <> $1 + AND coalesce(application_name, '') <> $1 ) c WHERE d.datname = current_database() `