Skip to content

feat: add source sync observability#103

Open
jonathanhaaswriter wants to merge 4 commits into
mainfrom
feat/source-sync-observability
Open

feat: add source sync observability#103
jonathanhaaswriter wants to merge 4 commits into
mainfrom
feat/source-sync-observability

Conversation

@jonathanhaaswriter

Copy link
Copy Markdown
Contributor

Summary

  • add typed per-source sync status, sync-now, and backfill RPCs
  • aggregate Google Reports, BigQuery, Directory, OAuth, and ingestion queue state
  • surface source health and recovery controls in the connectors UI
  • add wake listeners for Directory and OAuth sync workers

Validation

  • npm run db:generate
  • npm run db:validate
  • npm run typecheck
  • npm run guardrails:migration
  • npm run test:api
  • npm run proto:check
  • go test ./...
  • make lint
  • npm run build:web

Note: make test-go-db could not run locally because Docker/Colima is not running (missing /Users/jonathan/.colima/default/docker.sock).

@github-actions

github-actions Bot commented Jun 15, 2026

Copy link
Copy Markdown

Droid encountered an error —— View job


Security Review

Droid is reviewing code and running a security check…

@jonathanhaaswriter jonathanhaaswriter force-pushed the feat/source-sync-observability branch from b5cde13 to 3afaf62 Compare June 15, 2026 01:46
@github-actions

github-actions Bot commented Jun 15, 2026

Copy link
Copy Markdown

Droid finished @jonathanhaaswriter's task —— View job


Security Review

Validated 4 candidates and approved all 4. The source-level observability work is valuable, but the PR still has real regressions around rate limiting, wake-triggered sync behavior and error reporting, and dialog availability on unhealthy Google Workspace connectors.


func (a *App) rateLimitSourceSync(ctx context.Context, header http.Header, peerAddr, integrationID, action string, auth compatAuth) error {
path := "/api/v1/integrations/" + url.PathEscape(integrationID) + "/" + action
return a.compatRateLimit(ctx, header, peerAddr, http.MethodPost, path, typedRateLimitSubjectBody(auth))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Restore rate limiting for source sync and backfill

rateLimitSourceSync() routes these RPCs through compatRateLimit(), but the synthetic /api/v1/integrations/{id}/source-sync and /source-backfill paths still do not match any case in compatRateLimitPolicy(). That means RunIntegrationSourceSync and BackfillIntegrationSource currently bypass the limiter entirely, so an OWNER or ADMIN session can spam pg_notify wakeups and repeated cursor rewrites without ever hitting the intended throttle.

return
}
log.Printf("google-workspace-directory-sync: starting (interval=%s)", *interval)
go runWakeListener(ctx, cfg.DatabaseURL, sync)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] -once mode still drops manual Directory sync wakeups

The new Directory sync path depends on pg_notify, but the -once branch returns before runWakeListener() is ever started. In cron-style deployments that run this worker with -once, pressing Sync now will enqueue a notification that nobody is listening for, so the requested run does not execute until the next scheduled sweep.

if err != nil {
return err
}
return s.syncIntegration(ctx, integ)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Wake-triggered Directory failures never update last_error

Tick() persists failures through recordError(), but WakeIntegration() just returns s.syncIntegration(...) and leaves the caller to log the error. If an operator-triggered Directory sync fails, google_workspace_directory_sync_cursors.last_error stays stale, so GetIntegrationSyncStatus can continue showing the previous healthy heartbeat even though the requested run failed.

size="sm"
variant="outline"
onClick={() => void handleSync(integration)}
onClick={() => setSyncStatusIntegration(integration)}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Keep the Sources dialog available on unhealthy Google connectors

This button now opens IntegrationSyncStatusDialog, but it is still gated by supportsForceSync(), which hides it for any Google Workspace integration that is not CONNECTED. GetIntegrationSyncStatus itself works for existing integrations regardless of connection state, so operators lose the new cursor and queue observability exactly when a connector is in ERROR or DISABLED and they need that data to recover it.

@jonathanhaaswriter jonathanhaaswriter force-pushed the feat/source-sync-observability branch from 3afaf62 to 894894f Compare June 15, 2026 02:10
@github-actions

github-actions Bot commented Jun 15, 2026

Copy link
Copy Markdown

Droid finished @jonathanhaaswriter's task —— View job


Security Review

Validated 3 candidates and approved all 3. The new source observability surface still has real correctness gaps: per-source Sync ignores the selected stream, queued backfills look successful before workers run, and failed Directory or OAuth sweeps can appear current even when the underlying data is stale.

return nil, connect.NewError(connect.CodeFailedPrecondition, errors.New("integration is not connected"))
}
kind := normalizeSourceKind(sourceKind)
channels, err := syncWakeChannelsForSource(kind, integ.BigQueryEnabled)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Honor streamName when queueing a source sync

The new Sources dialog sends both sourceKind and streamName for per-row Sync actions, but runIntegrationSourceSync() only uses sourceKind to pick a wake channel and then notifies workers with just the integration id. Both the Google Reports and BigQuery wake paths rescan every default application for that integration, so clicking Sync on a single row now triggers a full kind-wide sweep instead of the selected stream and can enqueue much more work than the operator requested.

ON CONFLICT (integration_id, application) DO UPDATE SET
last_event_time = EXCLUDED.last_event_time,
last_unique_qualifier = '',
last_polled_at = NOW(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Do not mark a backfill successful before the worker runs

Both backfill branches rewrite the cursor row with last_polled_at = NOW() and clear last_error before sending the wake-up. applyCursorState() treats any non-zero attempt with no error as a healthy success, so the Sources dialog flips to a fresh successful run as soon as an operator clicks Queue backfill, even if no worker is listening yet or the actual backfill later fails.

Comment thread internal/bootstrap/sync_status.go Outdated
WHERE integration_id = $1
`, integrationID).Scan(&syncedAt, &userCount, &lastErr)
if err == nil {
applyCursorState(state, syncedAt, syncedAt, lastErr, userCount, now)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Stop failed Directory and OAuth runs from looking current

googleDirectorySyncState() and googleOAuthSyncState() both pass last_synced_at as both the cursor timestamp and the attempt timestamp, but the sync workers also update that same column inside recordError(). After any failed Directory or OAuth run, applyCursorState() therefore reports a near-zero lag and fresh cursorTime even though the identities or grants are still as old as the last successful sweep, which defeats the new observability view right when operators need to diagnose stale data.

@jonathanhaaswriter jonathanhaaswriter force-pushed the feat/source-sync-observability branch from 894894f to 2584fe6 Compare June 15, 2026 02:41
@github-actions

github-actions Bot commented Jun 15, 2026

Copy link
Copy Markdown

Droid finished @jonathanhaaswriter's task —— View job


Security Review

Validated all 3 review candidates for PR #103 and wrote /home/runner/work/_temp/droid-prompts/review_validated.json.

Approved and posted 3 batched review comments:

  • connector freshness still advances after single-stream Reports or BigQuery runs
  • -once workers can still drop later wake notifications while draining
  • source sync still accepts stream names that Directory and OAuth workers cannot honor

return p.pollIntegrationApplications(ctx, integ, p.applications)
}

func (p *Poller) pollIntegrationApplications(ctx context.Context, integ integrationRow, applications []string) error {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Don't refresh connector freshness on single-stream runs

WakeIntegration() can now run a single Reports or BigQuery stream, but this helper still updates integration_connections.last_sync_at unconditionally at the end. That makes the connector card look freshly synced after a one-stream run, and even after a per-stream failure that only records last_error, although the other sources were never polled.

}

func drainWakeNotifications(ctx context.Context, conn *pgx.Conn, worker *googleworkspacedirectorysync.Sync) {
listenCtx, stopListening := context.WithTimeout(ctx, onceDrainWindow)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Keep draining wake notifications until once-mode work finishes

In -once mode this helper stops WaitForNotification after onceDrainWindow, then waits only for goroutines that were already launched. A second manual Sync or Backfill request that arrives a few seconds later while the first wake-triggered run is still executing stays unread on the LISTEN connection and is lost when the process exits. The OAuth and BigQuery entrypoints have the same copied helper.

return kind
}

func validateSourceStream(kind, stream string, bigQueryEnabled bool) error {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P3] Reject stream names that a source worker cannot honor

validateSourceStream() only rejects bad stream names for Reports and BigQuery. Directory and OAuth sources still accept any non-empty streamName, but their workers only receive the integration id and can only run the fixed users or grants sweep. That means the RPC can acknowledge and audit a stream name it can never honor.

@jonathanhaaswriter jonathanhaaswriter force-pushed the feat/source-sync-observability branch from 2584fe6 to 655fe79 Compare June 15, 2026 03:03
@github-actions

github-actions Bot commented Jun 15, 2026

Copy link
Copy Markdown

Droid finished @jonathanhaaswriter's task —— View job


Security Review

Validated 4 candidates and approved all 4. The source-level observability work is valuable, but the PR still has actionable correctness gaps around dialog state races, durable backfill queuing, wake-listener error handling, and BigQuery queue attribution.

setError("");
try {
const result = await fetchIntegrationSyncStatus(integrationId);
setStatus(result.data);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Guard sync-status refreshes against connector switches

IntegrationSyncStatusDialog stays mounted while integration changes, but refresh() always commits the result of fetchIntegrationSyncStatus(integrationId). If connector A's request resolves after the user has already reopened the dialog for connector B, setStatus(result.data) overwrites B's state with A's source rows. The Sync and Backfill buttons then act on B's current integrationId while showing A's streams, which can queue work on the wrong connector. The BigQuery and OAuth dialogs in this file already use a cancelled guard for the same race.

(integration_id, application, last_event_time, last_unique_qualifier, last_polled_at, last_error)
VALUES ($1, $2, $3, '', NOW(), $4)
ON CONFLICT (integration_id, application) DO UPDATE SET
last_event_time = EXCLUDED.last_event_time,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Make queued backfills survive overlapping sweeps

BackfillIntegrationSource rewrites the live cursor row and immediately returns success, but the Reports and BigQuery workers later overwrite that same row unconditionally in their touch*Cursor helpers. If a scheduled sweep has already loaded the old cursor when this branch runs, that in-flight worker can clear the queued-backfill marker and restore its newer position before the wake-triggered retry starts. The API still replies that the backfill was queued, but the requested historical replay is silently dropped.

}
waitCtx, stopWaiting := context.WithTimeout(ctx, notificationPollInterval)
notification, err := conn.WaitForNotification(waitCtx)
stopWaiting()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Check the wait error before canceling the timeout context

drainWakeNotifications() calls stopWaiting() before it inspects waitCtx.Err(). That cancel makes waitCtx.Err() non-nil for every WaitForNotification failure, so a real LISTEN disconnect is indistinguishable from the normal 500 ms timeout and gets silently ignored. In -once mode the worker can therefore lose later manual sync or backfill notifications after a transient Postgres drop without ever surfacing or reconnecting the listener. The BigQuery and OAuth binaries copy the same helper.

Comment thread internal/bootstrap/sync_status.go Outdated
func (a *App) googleBigQuerySyncStates(ctx context.Context, integrationID string, queues map[string]queueCounts, now time.Time) ([]*aperiov1.IntegrationSourceSyncState, error) {
states := make(map[string]*aperiov1.IntegrationSourceSyncState, len(googleworkspacepoller.DefaultApplications))
for _, recordType := range googleworkspacepoller.DefaultApplications {
queueSource := "google.reports." + recordType

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P3] Use a distinct queue source for BigQuery stream state

googleBigQuerySyncStates() maps every BigQuery row to google.reports.<recordType>, which is also the ingestion source written by the Reports poller. When both ingestion paths are enabled, the BigQuery and Reports rows read the same queue counters, so a queued or failed Reports job shows up as BigQuery backlog and vice versa. That makes the new per-source queue status incorrect for any tenant running both paths.

@github-actions

github-actions Bot commented Jun 15, 2026

Copy link
Copy Markdown

Droid finished @jonathanhaaswriter's task —— View job


Security Review

Validated 1 candidate and approved 1.

Sync all still has a real ordering bug: it can wake OAuth before Directory refreshes identities, which makes Shadow IT look current while omitting grants until a later OAuth run.

Comment thread internal/bootstrap/sync_status.go Outdated
channels := []string{
GoogleWorkspaceSyncWakeChannel,
GoogleWorkspaceDirectorySyncWakeChannel,
GoogleWorkspaceOAuthSyncWakeChannel,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Don't wake OAuth before Directory refreshes identities

In the all case this channel list notifies the Directory and OAuth workers at the same time, but the OAuth sweep builds its user list from saas_identities and explicitly treats an empty table as a successful zero-grant heartbeat. On a newly connected tenant, or any tenant whose identities are stale, the OAuth worker can run first, persist last_error = NULL with zero counts, and no second OAuth wake is sent after Directory finishes. Sync all can therefore report Shadow IT as current while omitting every grant until the next scheduled OAuth sweep or a separate manual OAuth sync.

@github-actions

github-actions Bot commented Jun 15, 2026

Copy link
Copy Markdown

Droid finished @jonathanhaaswriter's task —— View job


Security Review

Validated all 2 review candidates for PR #103 and wrote /home/runner/work/_temp/droid-prompts/review_validated.json.

Approved and posted 2 batched review comments:

  • Reports per-stream setup failures can leave queued or stale-success state behind
  • BigQuery setup failures can leave queued backfills stuck forever

Overall assessment: the source-level observability work is useful, but the PR still hides real wake-triggered setup failures for Reports and BigQuery streams.

return p.pollIntegrationApplications(ctx, integ, p.applications, true)
}

func (p *Poller) pollIntegrationApplications(ctx context.Context, integ integrationRow, applications []string, refreshConnector bool) error {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Record per-stream Reports failures when setup aborts early

pollIntegrationApplications() now powers per-stream Sync and Backfill, but the early returns from OAuth-client resolution, refresh-token decryption, and token exchange still exit before any application-specific recordError() call runs. When a backfill has already rewritten a stream cursor to backfill queued..., one of those setup failures leaves that row stuck in the queued state forever, and ordinary manual Syncs keep showing the previous healthy cursor instead of the real credential error.

return nil
}

func (p *BigQueryPoller) recordBigQueryErrors(ctx context.Context, integrationID string, recordTypes []string, err error) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Preserve BigQuery backfill failures when setup never reaches polling

recordBigQueryErrors() handles cfg.validate(), subjectToken(), and accessToken() failures, but it always passes an empty expected cursor into recordBigQueryError(). If a backfill already marked that stream with backfill queued..., the guarded WHERE clause rejects the update because the queued row no longer matches the zero cursor, so the worker logs the real WIF or config error while the sync-status row keeps showing queued forever.

@github-actions

github-actions Bot commented Jun 15, 2026

Copy link
Copy Markdown

Droid finished @jonathanhaaswriter's task —— View job


Security Review

Validated all 3 review candidates for PR #103 and wrote /home/runner/work/_temp/droid-prompts/review_validated.json.

Approved and posted 3 batched review comments:

  • once-mode wake draining can abort active sync work after a listener error
  • Sync all stays clickable for the wrong connector while another sync is queueing
  • errored source rows can regress seen counts to 0

Overall assessment: the source-level observability work is useful, but the PR still has actionable correctness gaps in once-mode wake draining, cross-connector Sync all state, and errored source row-count reporting.

}
continue
}
log.Printf("google-workspace-directory-sync: -once wake drain failed: %v", err)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Keep once-mode workers alive until wake-triggered syncs finish

drainWakeNotifications() returns immediately on any non-timeout WaitForNotification() error, even when active.Load() > 0 means a wake-triggered sync goroutine is still running. In -once mode that return lets main() exit, cancels the shared context via defer stop(), and can abort the directory sync an operator just queued after a transient LISTEN connection drop. The OAuth and BigQuery entrypoints copy the same helper.

size="sm"
onClick={() => void onSyncAll(integration)}
disabled={
syncingId === integration.id || !supportsForceSync(integration)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Disable Sync all while another connector is queueing

handleSync() bails out whenever syncingId is non-null, but this button is only disabled when syncingId === integration.id. If an operator opens connector B while connector A's request is still in flight, B's button still looks available even though clicking it silently returns before sending runIntegrationSourceSync(). That loses the second manual sync request with no toast or other feedback.

return
}
if strings.TrimSpace(lastErr) != "" {
state.Status = "error"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P3] Preserve the last seen count for errored sources

applyCursorState() returns from the lastErr branch before it assigns RowsSeen, even though the directory, OAuth, and BigQuery cursors keep their last row-count columns on failures. The new Sources dialog always renders seen {source.rowsSeen}, so the first failed sweep regresses a source from its real last processed count to seen 0 right when an operator is debugging the failure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant