From 481b290beadfc530e4b0adef05dbbbb211af0a66 Mon Sep 17 00:00:00 2001 From: Song Zhendong <289505773@qq.com> Date: Fri, 26 Jun 2026 19:25:16 +0800 Subject: [PATCH 1/7] Add Node.js runtime metrics and align gRPC with Java agent.core - Refactor remote reporting to agent.core: single shared gRPC channel, ServiceManager/BootService lifecycle, Builder/Decorator chain - Add six instance_nodejs_* runtime meters via MeterReportService (1s/1s) - Rename env vars to SW_AGENT_NODEJS_RUNTIME_METRICS_* with deprecated aliases --- README.md | 21 +++ .../Protocol.ts => core/boot/BootService.ts} | 16 +- src/agent/core/boot/ServiceManager.ts | 133 ++++++++++++++ src/agent/core/meter/MeterSender.ts | 163 ++++++++++++++++++ .../core/meter/RuntimeMetricsCollector.ts | 49 ++++++ src/agent/core/meter/RuntimeSampler.ts | 65 +++++++ src/agent/core/remote/AgentIDDecorator.ts | 36 ++++ .../core/remote/AuthenticationDecorator.ts | 38 ++++ src/agent/core/remote/ChannelBuilder.ts | 30 ++++ .../remote/ChannelDecorator.ts} | 13 +- src/agent/core/remote/GRPCChannel.ts | 100 +++++++++++ src/agent/core/remote/GRPCChannelListener.ts | 24 +++ src/agent/core/remote/GRPCChannelManager.ts | 138 +++++++++++++++ .../remote/GRPCChannelStatus.ts} | 11 +- .../core/remote/GRPCStreamServiceStatus.ts | 62 +++++++ .../remote/ServiceManagementClient.ts} | 80 +++++---- .../core/remote/StandardChannelBuilder.ts | 36 ++++ src/agent/core/remote/TLSChannelBuilder.ts | 35 ++++ .../remote/TraceSegmentServiceClient.ts} | 137 ++++++++------- src/agent/protocol/grpc/GrpcProtocol.ts | 56 ------ .../protocol/grpc/SegmentObjectAdapter.ts | 74 -------- src/config/AgentConfig.ts | 91 ++++++++++ src/index.ts | 30 ++-- src/trace/context/Segment.ts | 49 ++++++ tests/plugins/express/expected.data.yaml | 56 ++++++ tests/runtime/RuntimeMetricsCollector.test.ts | 57 ++++++ 26 files changed, 1331 insertions(+), 269 deletions(-) rename src/agent/{protocol/Protocol.ts => core/boot/BootService.ts} (78%) create mode 100644 src/agent/core/boot/ServiceManager.ts create mode 100644 src/agent/core/meter/MeterSender.ts create mode 100644 src/agent/core/meter/RuntimeMetricsCollector.ts create mode 100644 src/agent/core/meter/RuntimeSampler.ts create mode 100644 src/agent/core/remote/AgentIDDecorator.ts create mode 100644 src/agent/core/remote/AuthenticationDecorator.ts create mode 100644 src/agent/core/remote/ChannelBuilder.ts rename src/agent/{protocol/grpc/AuthInterceptor.ts => core/remote/ChannelDecorator.ts} (77%) create mode 100644 src/agent/core/remote/GRPCChannel.ts create mode 100644 src/agent/core/remote/GRPCChannelListener.ts create mode 100644 src/agent/core/remote/GRPCChannelManager.ts rename src/agent/{protocol/grpc/clients/Client.ts => core/remote/GRPCChannelStatus.ts} (85%) create mode 100644 src/agent/core/remote/GRPCStreamServiceStatus.ts rename src/agent/{protocol/grpc/clients/HeartbeatClient.ts => core/remote/ServiceManagementClient.ts} (54%) mode change 100755 => 100644 create mode 100644 src/agent/core/remote/StandardChannelBuilder.ts create mode 100644 src/agent/core/remote/TLSChannelBuilder.ts rename src/agent/{protocol/grpc/clients/TraceReportClient.ts => core/remote/TraceSegmentServiceClient.ts} (53%) mode change 100755 => 100644 delete mode 100644 src/agent/protocol/grpc/GrpcProtocol.ts delete mode 100644 src/agent/protocol/grpc/SegmentObjectAdapter.ts create mode 100644 tests/runtime/RuntimeMetricsCollector.test.ts diff --git a/README.md b/README.md index 2a31b3e..50586c5 100644 --- a/README.md +++ b/README.md @@ -72,10 +72,31 @@ Environment Variable | Description | Default | `SW_AWS_SQS_CHECK_BODY` | Incoming SQS messages check inside the body for trace ID in order to allow linking outgoing SNS messages to incoming SQS. | `false` | | `SW_AGENT_MAX_BUFFER_SIZE` | The maximum buffer size before sending the segment data to backend | `'1000'` | | `SW_AGENT_TRACE_TIMEOUT` | The timeout for trace requests to backend services | `'10000'` | +| `SW_AGENT_NODEJS_RUNTIME_METRICS_REPORTER_ACTIVE` | Whether to report Node.js runtime metrics through MeterReportService (default: collect 1s, report 1s) | `true` | +| `SW_AGENT_NODEJS_RUNTIME_METRICS_COLLECT_PERIOD` | Runtime metric sample interval in milliseconds | `1000` | +| `SW_AGENT_NODEJS_RUNTIME_METRICS_REPORT_PERIOD` | Runtime metric report interval in milliseconds (aligned with Java JVM metrics upload interval) | `1000` | +| `SW_AGENT_NODEJS_RUNTIME_METRICS_BUFFER_SIZE` | Maximum buffered runtime metric samples before dropping oldest | `600` | + +Legacy env names `SW_AGENT_RUNTIME_METRICS_*`, `SW_AGENT_NVM_METRICS_*` and `SW_AGENT_NVM_JVM_*` are still accepted as deprecated aliases. Note that the various ignore options like `SW_IGNORE_SUFFIX`, `SW_TRACE_IGNORE_PATH` and `SW_HTTP_IGNORE_METHOD` as well as endpoints which are not recorded due to exceeding `SW_AGENT_MAX_BUFFER_SIZE` all propagate their ignored status downstream to any other endpoints they may call. If that endpoint is running the Node Skywalking agent then regardless of its ignore settings it will not be recorded since its upstream parent was not recorded. This allows the elimination of entire trees of endpoints you are not interested in as well as eliminating partial traces if a span in the chain is ignored but calls out to other endpoints which are recorded as children of ROOT instead of the actual parent. +## Node.js Runtime Metrics + +The agent reports six process-level meters (`instance_nodejs_*`) via `MeterReportService` by default (collect 1s, report 1s). Set `SW_AGENT_NODEJS_RUNTIME_METRICS_REPORTER_ACTIVE=false` to disable. Process CPU combines `process.cpuUsage()` user + system, normalized by logical CPU count (0–100%). + +| Node.js source | Meter name | Notes | +| :--- | :--- | :--- | +| `process.cpuUsage()` user + system | `instance_nodejs_process_cpu` | % | +| `process.memoryUsage().heapUsed` | `instance_nodejs_heap_used` | bytes | +| `process.memoryUsage().heapTotal` | `instance_nodejs_heap_total` | bytes | +| `v8.getHeapStatistics().heap_size_limit` | `instance_nodejs_heap_limit` | bytes | +| `process.memoryUsage().rss` | `instance_nodejs_rss` | bytes | +| `process.memoryUsage().external` | `instance_nodejs_external_memory` | bytes | + +Custom business metrics are not available through a public API; use [OpenTelemetry metrics](https://skywalking.apache.org/docs/main/latest/en/setup/backend/opentelemetry-receiver/) if you need those. + ## Supported Libraries Some built-in plugins support automatic instrumentation of NodeJS libraries, the complete list is as follows: diff --git a/src/agent/protocol/Protocol.ts b/src/agent/core/boot/BootService.ts similarity index 78% rename from src/agent/protocol/Protocol.ts rename to src/agent/core/boot/BootService.ts index 062318a..34a53d5 100644 --- a/src/agent/protocol/Protocol.ts +++ b/src/agent/core/boot/BootService.ts @@ -17,17 +17,15 @@ * */ -/** - * The transport protocol between the agent and the backend (OAP). - */ -export default interface Protocol { - isConnected: boolean; +/** Boot lifecycle contract (Java {@code BootService}). */ +export default interface BootService { + prepare(): void; - heartbeat(): this; + boot(): void; - report(): this; + onComplete(): void; - flush(): Promise | null; + shutdown(): void; - destroy?(): void; + priority(): number; } diff --git a/src/agent/core/boot/ServiceManager.ts b/src/agent/core/boot/ServiceManager.ts new file mode 100644 index 0000000..e5dd18d --- /dev/null +++ b/src/agent/core/boot/ServiceManager.ts @@ -0,0 +1,133 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import config from '../../../config/AgentConfig'; +import { createLogger } from '../../../logging'; +import BootService from './BootService'; +import MeterSender from '../meter/MeterSender'; +import GRPCChannelManager from '../remote/GRPCChannelManager'; +import ServiceManagementClient from '../remote/ServiceManagementClient'; +import TraceSegmentServiceClient from '../remote/TraceSegmentServiceClient'; + +const logger = createLogger(__filename); + +/** Service registry and boot orchestrator (Java {@code ServiceManager}). */ +class ServiceManager { + private static readonly instance = new ServiceManager(); + + private bootedServices = new Map BootService, BootService>(); + + private booted = false; + + static get INSTANCE(): ServiceManager { + return ServiceManager.instance; + } + + boot(): void { + if (this.booted) { + return; + } + + this.loadServices(); + const services = this.sortByPriority(true); + + for (const service of services) { + try { + service.prepare(); + } catch (error) { + logger.error('ServiceManager prepare failed: %s', error); + } + } + + for (const service of services) { + try { + service.boot(); + } catch (error) { + logger.error('ServiceManager boot failed: %s', error); + } + } + + for (const service of services) { + try { + service.onComplete(); + } catch (error) { + logger.error('ServiceManager onComplete failed: %s', error); + } + } + + this.booted = true; + } + + shutdown(): void { + for (const service of this.sortByPriority(false)) { + try { + service.shutdown(); + } catch (error) { + logger.error('ServiceManager shutdown failed: %s', error); + } + } + this.bootedServices.clear(); + this.booted = false; + } + + flush(): Promise | null { + const traceFlush = this.findService(TraceSegmentServiceClient)?.flush() ?? null; + const meterSender = this.findService(MeterSender); + const meterFlush = meterSender?.flush() ?? null; + + if (!traceFlush && !meterFlush) { + return null; + } + if (!traceFlush) { + return meterFlush; + } + if (!meterFlush) { + return traceFlush; + } + + return Promise.all([traceFlush, meterFlush]).then(() => null); + } + + findService(serviceClass: new (...args: never[]) => T): T | undefined { + return this.bootedServices.get(serviceClass) as T | undefined; + } + + private register(serviceClass: new (...args: never[]) => T, service: T): void { + this.bootedServices.set(serviceClass, service); + } + + private loadServices(): void { + this.register(GRPCChannelManager, new GRPCChannelManager()); + this.register(TraceSegmentServiceClient, new TraceSegmentServiceClient()); + this.register(ServiceManagementClient, new ServiceManagementClient()); + if (config.runtimeMetricsReporterActive) { + this.register(MeterSender, new MeterSender()); + } + } + + private sortByPriority(ascending: boolean): BootService[] { + const services = Array.from(this.bootedServices.values()); + services.sort((left, right) => + ascending ? left.priority() - right.priority() : right.priority() - left.priority(), + ); + return services; + } +} + +export default ServiceManager; diff --git a/src/agent/core/meter/MeterSender.ts b/src/agent/core/meter/MeterSender.ts new file mode 100644 index 0000000..4d4c5ac --- /dev/null +++ b/src/agent/core/meter/MeterSender.ts @@ -0,0 +1,163 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import config from '../../../config/AgentConfig'; +import * as grpc from '@grpc/grpc-js'; +import { createLogger, throttled } from '../../../logging'; +import { MeterReportServiceClient } from '../../../proto/language-agent/Meter_grpc_pb'; +import BootService from '../boot/BootService'; +import ServiceManager from '../boot/ServiceManager'; +import RuntimeMetricsCollector from './RuntimeMetricsCollector'; +import { RuntimeSnapshot } from './RuntimeSampler'; +import GRPCChannelManager from '../remote/GRPCChannelManager'; +import { GRPCChannelListener } from '../remote/GRPCChannelListener'; +import { GRPCChannelStatus } from '../remote/GRPCChannelStatus'; + +const logger = createLogger(__filename); +const logReportError = throttled(logger, 'error', 30000); + +/** Reports Node.js runtime metrics via gRPC MeterReportService (Go/Python-compatible pipeline). */ +export default class MeterSender implements BootService, GRPCChannelListener { + private reporterClient!: MeterReportServiceClient; + private readonly buffer: RuntimeSnapshot[] = []; + private collectTimer?: NodeJS.Timeout; + private reportTimer?: NodeJS.Timeout; + + private collector!: RuntimeMetricsCollector; + + prepare(): void { + this.collector = new RuntimeMetricsCollector(); + ServiceManager.INSTANCE.findService(GRPCChannelManager)!.addChannelListener(this); + } + + boot(): void { + this.startTimers(); + } + + onComplete(): void {} + + priority(): number { + return 0; + } + + statusChanged(status: GRPCChannelStatus): void { + if (status === GRPCChannelStatus.CONNECTED) { + this.reporterClient = this.createReporterClient(); + } + } + + private createReporterClient(): MeterReportServiceClient { + return new MeterReportServiceClient( + config.collectorAddress, + grpc.credentials.createInsecure(), + ServiceManager.INSTANCE.findService(GRPCChannelManager)!.getClientOptions(), + ); + } + + get isConnected(): boolean { + return ServiceManager.INSTANCE.findService(GRPCChannelManager)!.isConnected(); + } + + private startTimers(): void { + this.collectTimer = setInterval( + () => this.collectSample(), + config.runtimeMetricsCollectPeriod || 1000, + ) as NodeJS.Timeout; + this.collectTimer.unref(); + this.reportTimer = setInterval( + () => this.reportBufferedMetrics(), + config.runtimeMetricsReportPeriod || 1000, + ) as NodeJS.Timeout; + this.reportTimer.unref(); + } + + private collectSample(): void { + const maxBufferSize = config.runtimeMetricsBufferSize || 600; + if (this.buffer.length >= maxBufferSize) { + this.buffer.shift(); + } + this.buffer.push(this.collector.sample()); + } + + private reportBufferedMetrics(callback?: () => void): void { + try { + if (this.buffer.length === 0 || !this.isConnected || !this.reporterClient) { + if (callback) callback(); + return; + } + + const snapshots = this.buffer.splice(0, this.buffer.length); + const stream = this.reporterClient.collect( + new grpc.Metadata(), + { deadline: Date.now() + (config.traceTimeout || 10000) }, + (error: grpc.ServiceError | null) => { + if (error) { + logReportError('Failed to report runtime meter data', error); + ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); + } + if (callback) callback(); + }, + ); + + try { + let metadataWritten = false; + const timestamp = Date.now(); + for (const snapshot of snapshots) { + for (const meterData of this.collector.toMeterData(snapshot)) { + if (!metadataWritten) { + meterData + .setService(config.serviceName!) + .setServiceinstance(config.serviceInstance!) + .setTimestamp(timestamp); + metadataWritten = true; + } + stream.write(meterData); + } + } + } finally { + stream.end(); + } + } catch (error) { + logReportError('Failed to report runtime meter data', error); + ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); + if (callback) callback(); + } + } + + flush(): Promise | null { + return new Promise((resolve) => { + this.collectSample(); + this.reportBufferedMetrics(resolve); + }); + } + + shutdown(): void { + if (this.collectTimer) { + clearInterval(this.collectTimer); + this.collectTimer = undefined; + } + if (this.reportTimer) { + clearInterval(this.reportTimer); + this.reportTimer = undefined; + } + this.buffer.length = 0; + this.collector.destroy(); + logger.info('MeterSender destroyed and resources cleaned up'); + } +} diff --git a/src/agent/core/meter/RuntimeMetricsCollector.ts b/src/agent/core/meter/RuntimeMetricsCollector.ts new file mode 100644 index 0000000..0d59a29 --- /dev/null +++ b/src/agent/core/meter/RuntimeMetricsCollector.ts @@ -0,0 +1,49 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { MeterData, MeterSingleValue } from '../../../proto/language-agent/Meter_pb'; +import RuntimeSampler, { RuntimeSnapshot } from './RuntimeSampler'; + +/** Maps Node.js runtime samples into MeterReportService single-value meters (instance_nodejs_*). */ +export default class RuntimeMetricsCollector { + private readonly sampler = new RuntimeSampler(); + + sample(): RuntimeSnapshot { + return this.sampler.sample(); + } + + toMeterData(snapshot: RuntimeSnapshot): MeterData[] { + const gauges: Array<[string, number]> = [ + ['instance_nodejs_process_cpu', snapshot.cpuUserPercent + snapshot.cpuSystemPercent], + ['instance_nodejs_heap_used', snapshot.heapUsed], + ['instance_nodejs_heap_total', snapshot.heapTotal], + ['instance_nodejs_heap_limit', snapshot.heapSizeLimit], + ['instance_nodejs_rss', snapshot.rss], + ['instance_nodejs_external_memory', snapshot.external], + ]; + + return gauges.map(([name, value]) => + new MeterData().setSinglevalue(new MeterSingleValue().setName(name).setValue(value)), + ); + } + + destroy(): void { + this.sampler.destroy(); + } +} diff --git a/src/agent/core/meter/RuntimeSampler.ts b/src/agent/core/meter/RuntimeSampler.ts new file mode 100644 index 0000000..c7ac80e --- /dev/null +++ b/src/agent/core/meter/RuntimeSampler.ts @@ -0,0 +1,65 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import os from 'os'; +import v8 from 'v8'; + +export type RuntimeSnapshot = { + heapUsed: number; + heapTotal: number; + heapSizeLimit: number; + rss: number; + external: number; + cpuUserPercent: number; + cpuSystemPercent: number; +}; + +export default class RuntimeSampler { + private readonly logicalCpuCount = Math.max(1, os.cpus().length); + private lastCpuUsage = process.cpuUsage(); + private lastCpuTimestamp = process.hrtime.bigint(); + + sample(): RuntimeSnapshot { + const memory = process.memoryUsage(); + const heapStats = v8.getHeapStatistics(); + const cpuUsage = process.cpuUsage(this.lastCpuUsage); + const now = process.hrtime.bigint(); + const elapsedMicros = Number(now - this.lastCpuTimestamp) / 1000; + this.lastCpuUsage = process.cpuUsage(); + this.lastCpuTimestamp = now; + + const cpuScale = elapsedMicros > 0 ? 100 / elapsedMicros / this.logicalCpuCount : 0; + const cpuUserPercent = cpuUsage.user * cpuScale; + const cpuSystemPercent = cpuUsage.system * cpuScale; + + return { + heapUsed: memory.heapUsed, + heapTotal: memory.heapTotal, + heapSizeLimit: heapStats.heap_size_limit, + rss: memory.rss, + external: memory.external, + cpuUserPercent, + cpuSystemPercent, + }; + } + + destroy(): void { + // no-op: kept for symmetry with start/stop lifecycle + } +} diff --git a/src/agent/core/remote/AgentIDDecorator.ts b/src/agent/core/remote/AgentIDDecorator.ts new file mode 100644 index 0000000..6f77406 --- /dev/null +++ b/src/agent/core/remote/AgentIDDecorator.ts @@ -0,0 +1,36 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as grpc from '@grpc/grpc-js'; +import * as packageInfo from '../../../../package.json'; +import ChannelDecorator from './ChannelDecorator'; + +/** Add agent version to every RPC (Java AgentIDDecorator). */ +export default class AgentIDDecorator implements ChannelDecorator { + build(): grpc.Interceptor { + return (options, nextCall) => { + return new grpc.InterceptingCall(nextCall(options), { + start: (metadata, listener, next) => { + metadata.set('Agent-Version', packageInfo.version); + next(metadata, listener); + }, + }); + }; + } +} diff --git a/src/agent/core/remote/AuthenticationDecorator.ts b/src/agent/core/remote/AuthenticationDecorator.ts new file mode 100644 index 0000000..b0d138f --- /dev/null +++ b/src/agent/core/remote/AuthenticationDecorator.ts @@ -0,0 +1,38 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as grpc from '@grpc/grpc-js'; +import config from '../../../config/AgentConfig'; +import ChannelDecorator from './ChannelDecorator'; + +/** Active authentication header by SW_AGENT_AUTHENTICATION (Java AuthenticationDecorator). */ +export default class AuthenticationDecorator implements ChannelDecorator { + build(): grpc.Interceptor { + return (options, nextCall) => { + return new grpc.InterceptingCall(nextCall(options), { + start: (metadata, listener, next) => { + if (config.authorization) { + metadata.set('Authentication', config.authorization); + } + next(metadata, listener); + }, + }); + }; + } +} diff --git a/src/agent/core/remote/ChannelBuilder.ts b/src/agent/core/remote/ChannelBuilder.ts new file mode 100644 index 0000000..9a2d7c9 --- /dev/null +++ b/src/agent/core/remote/ChannelBuilder.ts @@ -0,0 +1,30 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as grpc from '@grpc/grpc-js'; + +/** Mutable state passed through ChannelBuilder chain (Java ManagedChannelBuilder equivalent). */ +export interface ChannelBuildContext { + credentials: grpc.ChannelCredentials; + options: grpc.ChannelOptions; +} + +export default interface ChannelBuilder { + build(context: ChannelBuildContext): ChannelBuildContext; +} diff --git a/src/agent/protocol/grpc/AuthInterceptor.ts b/src/agent/core/remote/ChannelDecorator.ts similarity index 77% rename from src/agent/protocol/grpc/AuthInterceptor.ts rename to src/agent/core/remote/ChannelDecorator.ts index 0f5ae4d..c99319d 100644 --- a/src/agent/protocol/grpc/AuthInterceptor.ts +++ b/src/agent/core/remote/ChannelDecorator.ts @@ -18,12 +18,11 @@ */ import * as grpc from '@grpc/grpc-js'; -import config from '../../../config/AgentConfig'; -export default function AuthInterceptor() { - const mata = new grpc.Metadata(); - if (config.authorization) { - mata.add('Authentication', config.authorization); - } - return mata; +/** + * Decorates the gRPC channel (Java ChannelDecorator equivalent). + * Node uses client interceptors because @grpc/grpc-js has no ClientInterceptors.intercept(). + */ +export default interface ChannelDecorator { + build(): grpc.Interceptor; } diff --git a/src/agent/core/remote/GRPCChannel.ts b/src/agent/core/remote/GRPCChannel.ts new file mode 100644 index 0000000..4eae558 --- /dev/null +++ b/src/agent/core/remote/GRPCChannel.ts @@ -0,0 +1,100 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as grpc from '@grpc/grpc-js'; +import { ClientOptions, connectivityState } from '@grpc/grpc-js'; +import ChannelBuilder, { ChannelBuildContext } from './ChannelBuilder'; +import ChannelDecorator from './ChannelDecorator'; + +export default class GRPCChannel { + private readonly originChannel: grpc.Channel; + private readonly interceptors: grpc.Interceptor[]; + + private constructor(host: string, port: number, channelBuilders: ChannelBuilder[], decorators: ChannelDecorator[]) { + let context: ChannelBuildContext = { + credentials: grpc.credentials.createInsecure(), + options: {}, + }; + + for (const builder of channelBuilders) { + context = builder.build(context); + } + + this.originChannel = new grpc.Channel(`${host}:${port}`, context.credentials, context.options); + this.interceptors = decorators.map((decorator) => decorator.build()); + } + + static create( + host: string, + port: number, + channelBuilders: ChannelBuilder[], + decorators: ChannelDecorator[], + ): GRPCChannel { + return new GRPCChannel(host, port, channelBuilders, decorators); + } + + static newBuilder(host: string, port: number): GRPCChannelBuilder { + return new GRPCChannelBuilder(host, port); + } + + getChannel(): grpc.Channel { + return this.originChannel; + } + + getClientOptions(): ClientOptions { + return { + channelOverride: this.originChannel, + interceptors: this.interceptors, + }; + } + + isConnected(requestConnection = false): boolean { + return this.originChannel.getConnectivityState(requestConnection) === connectivityState.READY; + } + + shutdownNow(): void { + this.originChannel.close(); + } +} + +class GRPCChannelBuilder { + private readonly host: string; + private readonly port: number; + private readonly channelBuilders: ChannelBuilder[] = []; + private readonly decorators: ChannelDecorator[] = []; + + constructor(host: string, port: number) { + this.host = host; + this.port = port; + } + + addManagedChannelBuilder(builder: ChannelBuilder): this { + this.channelBuilders.push(builder); + return this; + } + + addChannelDecorator(decorator: ChannelDecorator): this { + this.decorators.push(decorator); + return this; + } + + build(): GRPCChannel { + return GRPCChannel.create(this.host, this.port, this.channelBuilders, this.decorators); + } +} diff --git a/src/agent/core/remote/GRPCChannelListener.ts b/src/agent/core/remote/GRPCChannelListener.ts new file mode 100644 index 0000000..34febae --- /dev/null +++ b/src/agent/core/remote/GRPCChannelListener.ts @@ -0,0 +1,24 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { GRPCChannelStatus } from './GRPCChannelStatus'; + +export interface GRPCChannelListener { + statusChanged(status: GRPCChannelStatus): void; +} diff --git a/src/agent/core/remote/GRPCChannelManager.ts b/src/agent/core/remote/GRPCChannelManager.ts new file mode 100644 index 0000000..c47a3d1 --- /dev/null +++ b/src/agent/core/remote/GRPCChannelManager.ts @@ -0,0 +1,138 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as grpc from '@grpc/grpc-js'; +import { ClientOptions } from '@grpc/grpc-js'; +import config from '../../../config/AgentConfig'; +import { createLogger } from '../../../logging'; +import AgentIDDecorator from './AgentIDDecorator'; +import AuthenticationDecorator from './AuthenticationDecorator'; +import GRPCChannel from './GRPCChannel'; +import { GRPCChannelListener } from './GRPCChannelListener'; +import { GRPCChannelStatus } from './GRPCChannelStatus'; +import BootService from '../boot/BootService'; +import StandardChannelBuilder from './StandardChannelBuilder'; +import TLSChannelBuilder from './TLSChannelBuilder'; + +const logger = createLogger(__filename); + +/** + * Shared gRPC channel manager (Java GRPCChannelManager skeleton). + * V1: single address; V2 reserved: multi-address failover via reportError(). + */ +export default class GRPCChannelManager implements BootService { + private managedChannel: GRPCChannel | null = null; + private readonly listeners: GRPCChannelListener[] = []; + private lastStatus: GRPCChannelStatus | null = null; + + /** V1: first address when comma-separated; V2: failover selection. */ + resolveAddress(): string { + const raw = config.collectorAddress ?? ''; + const first = raw.split(',')[0]?.trim(); + if (!first) { + throw new Error('collectorAddress is not configured'); + } + return first; + } + + getChannel(): grpc.Channel { + return this.managedChannel!.getChannel(); + } + + getClientOptions(): ClientOptions { + return this.managedChannel!.getClientOptions(); + } + + isConnected(): boolean { + return this.managedChannel?.isConnected(true) ?? false; + } + + addChannelListener(listener: GRPCChannelListener): void { + this.listeners.push(listener); + if (this.lastStatus !== null) { + listener.statusChanged(this.lastStatus); + } + } + + priority(): number { + return Number.MAX_SAFE_INTEGER; + } + + /** V2 hook: network errors may trigger address failover. */ + reportError(error: unknown): void { + logger.debug('gRPC report error (multi-address failover reserved for V2): %s', error); + } + + prepare(): void {} + + boot(): void { + const address = this.resolveAddress(); + const [host, portText] = address.split(':'); + const port = Number.parseInt(portText, 10); + + if (!host || Number.isNaN(port)) { + throw new Error(`Invalid collector address: ${address}`); + } + + this.managedChannel = GRPCChannel.newBuilder(host, port) + .addManagedChannelBuilder(new StandardChannelBuilder()) + .addManagedChannelBuilder(new TLSChannelBuilder()) + .addChannelDecorator(new AgentIDDecorator()) + .addChannelDecorator(new AuthenticationDecorator()) + .build(); + + this.watchConnectivityState(); + } + + onComplete(): void {} + + shutdown(): void { + this.managedChannel?.shutdownNow(); + this.managedChannel = null; + } + + private watchConnectivityState(): void { + const channel = this.getChannel(); + const currentState = channel.getConnectivityState(true); + channel.watchConnectivityState(currentState, Infinity, (error) => { + if (error) { + logger.debug('Channel connectivity watch error: %s', error.message); + return; + } + + const ready = channel.getConnectivityState(false) === grpc.connectivityState.READY; + this.notify(ready ? GRPCChannelStatus.CONNECTED : GRPCChannelStatus.DISCONNECT); + this.watchConnectivityState(); + }); + } + + private notify(status: GRPCChannelStatus): void { + if (this.lastStatus === status) { + return; + } + this.lastStatus = status; + for (const listener of this.listeners) { + try { + listener.statusChanged(status); + } catch (err) { + logger.error('GRPCChannelListener failed: %s', err); + } + } + } +} diff --git a/src/agent/protocol/grpc/clients/Client.ts b/src/agent/core/remote/GRPCChannelStatus.ts similarity index 85% rename from src/agent/protocol/grpc/clients/Client.ts rename to src/agent/core/remote/GRPCChannelStatus.ts index 8d8fd06..5200d58 100644 --- a/src/agent/protocol/grpc/clients/Client.ts +++ b/src/agent/core/remote/GRPCChannelStatus.ts @@ -17,12 +17,7 @@ * */ -export default interface Client { - readonly isConnected: boolean; - - start(): void; - - flush(): Promise | null; - - destroy?(): void; +export enum GRPCChannelStatus { + CONNECTED = 'CONNECTED', + DISCONNECT = 'DISCONNECT', } diff --git a/src/agent/core/remote/GRPCStreamServiceStatus.ts b/src/agent/core/remote/GRPCStreamServiceStatus.ts new file mode 100644 index 0000000..79f984d --- /dev/null +++ b/src/agent/core/remote/GRPCStreamServiceStatus.ts @@ -0,0 +1,62 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { createLogger } from '../../../logging'; + +const logger = createLogger(__filename); + +/** Tracks gRPC stream completion (Java {@code GRPCStreamServiceStatus}). */ +export default class GRPCStreamServiceStatus { + private status = false; + + constructor(status = false) { + this.status = status; + } + + isStatus(): boolean { + return this.status; + } + + finished(): void { + this.status = true; + } + + /** Wait until success status reported (Java wait4Finish). */ + wait4Finish(): void { + let recheckCycle = 5; + let hasWaited = 0; + const maxCycle = 30 * 1000; + while (!this.status) { + this.try2Sleep(recheckCycle); + hasWaited += recheckCycle; + if (recheckCycle >= maxCycle) { + logger.warn("Collector stream service doesn't response in %s seconds.", hasWaited / 1000); + } else { + recheckCycle = Math.min(recheckCycle * 2, maxCycle); + } + } + } + + private try2Sleep(millis: number): void { + const end = Date.now() + millis; + while (Date.now() < end) { + // busy-wait fallback; Node agent has no Thread.sleep in sync path + } + } +} diff --git a/src/agent/protocol/grpc/clients/HeartbeatClient.ts b/src/agent/core/remote/ServiceManagementClient.ts old mode 100755 new mode 100644 similarity index 54% rename from src/agent/protocol/grpc/clients/HeartbeatClient.ts rename to src/agent/core/remote/ServiceManagementClient.ts index 2c101a6..4eb0120 --- a/src/agent/protocol/grpc/clients/HeartbeatClient.ts +++ b/src/agent/core/remote/ServiceManagementClient.ts @@ -18,37 +18,31 @@ */ import * as grpc from '@grpc/grpc-js'; -import { connectivityState } from '@grpc/grpc-js'; - -import * as packageInfo from '../../../../../package.json'; -import { createLogger, throttled } from '../../../../logging'; -import Client from './Client'; -import { ManagementServiceClient } from '../../../../proto/management/Management_grpc_pb'; -import AuthInterceptor from '../AuthInterceptor'; -import { InstancePingPkg, InstanceProperties } from '../../../../proto/management/Management_pb'; -import config from '../../../../config/AgentConfig'; -import { KeyStringValuePair } from '../../../../proto/common/Common_pb'; import * as os from 'os'; +import * as packageInfo from '../../../../package.json'; +import config from '../../../config/AgentConfig'; +import { createLogger, throttled } from '../../../logging'; +import BootService from '../boot/BootService'; +import ServiceManager from '../boot/ServiceManager'; +import { ManagementServiceClient } from '../../../proto/management/Management_grpc_pb'; +import { InstancePingPkg, InstanceProperties } from '../../../proto/management/Management_pb'; +import { KeyStringValuePair } from '../../../proto/common/Common_pb'; +import GRPCChannelManager from './GRPCChannelManager'; +import { GRPCChannelListener } from './GRPCChannelListener'; +import { GRPCChannelStatus } from './GRPCChannelStatus'; const logger = createLogger(__filename); const logHeartbeatError = throttled(logger, 'error', 30000); -export default class HeartbeatClient implements Client { - private readonly managementServiceClient: ManagementServiceClient; +export default class ServiceManagementClient implements BootService, GRPCChannelListener { + private managementServiceClient!: ManagementServiceClient; private heartbeatTimer?: NodeJS.Timeout; - constructor() { - this.managementServiceClient = new ManagementServiceClient( - config.collectorAddress, - config.secure ? grpc.credentials.createSsl() : grpc.credentials.createInsecure(), - ); - } - - get isConnected(): boolean { - return this.managementServiceClient.getChannel().getConnectivityState(true) === connectivityState.READY; + prepare(): void { + ServiceManager.INSTANCE.findService(GRPCChannelManager)!.addChannelListener(this); } - start() { + boot(): void { if (this.heartbeatTimer) { logger.warn(` The heartbeat timer has already been scheduled, @@ -73,31 +67,55 @@ export default class HeartbeatClient implements Client { ]); this.heartbeatTimer = setInterval(() => { - this.managementServiceClient.reportInstanceProperties(instanceProperties, AuthInterceptor(), (error, _) => { + if (!this.managementServiceClient) { + return; + } + this.managementServiceClient.reportInstanceProperties(instanceProperties, new grpc.Metadata(), (error) => { if (error) { logHeartbeatError('Failed to send heartbeat', error); + ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); } }); - this.managementServiceClient.keepAlive(keepAlivePkg, AuthInterceptor(), (error, _) => { + this.managementServiceClient.keepAlive(keepAlivePkg, new grpc.Metadata(), (error) => { if (error) { logHeartbeatError('Failed to send heartbeat', error); + ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); } }); }, 20000).unref(); } - flush(): Promise | null { - logger.warn('HeartbeatClient does not need flush().'); - return null; - } + onComplete(): void {} - destroy(): void { - // Clear heartbeat timer to prevent memory leak + shutdown(): void { if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = undefined; } + logger.info('ServiceManagementClient destroyed and resources cleaned up'); + } + + priority(): number { + return 0; + } + + statusChanged(status: GRPCChannelStatus): void { + if (status === GRPCChannelStatus.CONNECTED) { + this.managementServiceClient = this.createManagementClient(); + } + } - logger.info('HeartbeatClient destroyed and resources cleaned up'); + private createManagementClient(): ManagementServiceClient { + const channelManager = ServiceManager.INSTANCE.findService(GRPCChannelManager)!; + return new ManagementServiceClient( + config.collectorAddress, + grpc.credentials.createInsecure(), + channelManager.getClientOptions(), + ); + } + + flush(): Promise | null { + logger.warn('ServiceManagementClient does not need flush().'); + return null; } } diff --git a/src/agent/core/remote/StandardChannelBuilder.ts b/src/agent/core/remote/StandardChannelBuilder.ts new file mode 100644 index 0000000..ae37f24 --- /dev/null +++ b/src/agent/core/remote/StandardChannelBuilder.ts @@ -0,0 +1,36 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as grpc from '@grpc/grpc-js'; +import ChannelBuilder, { ChannelBuildContext } from './ChannelBuilder'; + +const MAX_INBOUND_MESSAGE_SIZE = 1024 * 1024 * 50; + +export default class StandardChannelBuilder implements ChannelBuilder { + build(context: ChannelBuildContext): ChannelBuildContext { + return { + credentials: grpc.credentials.createInsecure(), + options: { + ...context.options, + 'grpc.max_receive_message_length': MAX_INBOUND_MESSAGE_SIZE, + 'grpc.max_send_message_length': MAX_INBOUND_MESSAGE_SIZE, + }, + }; + } +} diff --git a/src/agent/core/remote/TLSChannelBuilder.ts b/src/agent/core/remote/TLSChannelBuilder.ts new file mode 100644 index 0000000..d3d7e51 --- /dev/null +++ b/src/agent/core/remote/TLSChannelBuilder.ts @@ -0,0 +1,35 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as grpc from '@grpc/grpc-js'; +import config from '../../../config/AgentConfig'; +import ChannelBuilder, { ChannelBuildContext } from './ChannelBuilder'; + +/** When SW_AGENT_SECURE=true, upgrade channel credentials to TLS (Java TLSChannelBuilder simplified). */ +export default class TLSChannelBuilder implements ChannelBuilder { + build(context: ChannelBuildContext): ChannelBuildContext { + if (config.secure) { + return { + ...context, + credentials: grpc.credentials.createSsl(), + }; + } + return context; + } +} diff --git a/src/agent/protocol/grpc/clients/TraceReportClient.ts b/src/agent/core/remote/TraceSegmentServiceClient.ts old mode 100755 new mode 100644 similarity index 53% rename from src/agent/protocol/grpc/clients/TraceReportClient.ts rename to src/agent/core/remote/TraceSegmentServiceClient.ts index 4f5cfe7..9f03c61 --- a/src/agent/protocol/grpc/clients/TraceReportClient.ts +++ b/src/agent/core/remote/TraceSegmentServiceClient.ts @@ -17,41 +17,37 @@ * */ -import config from '../../../../config/AgentConfig'; +import config from '../../../config/AgentConfig'; import * as grpc from '@grpc/grpc-js'; -import { connectivityState } from '@grpc/grpc-js'; -import { createLogger, throttled } from '../../../../logging'; -import Client from './Client'; -import { TraceSegmentReportServiceClient } from '../../../../proto/language-agent/Tracing_grpc_pb'; -import AuthInterceptor from '../AuthInterceptor'; -import SegmentObjectAdapter from '../SegmentObjectAdapter'; -import { emitter } from '../../../../lib/EventEmitter'; -import Segment from '../../../../trace/context/Segment'; +import { createLogger, throttled } from '../../../logging'; +import BootService from '../boot/BootService'; +import ServiceManager from '../boot/ServiceManager'; +import { TraceSegmentReportServiceClient } from '../../../proto/language-agent/Tracing_grpc_pb'; +import { emitter } from '../../../lib/EventEmitter'; +import Segment from '../../../trace/context/Segment'; +import GRPCChannelManager from './GRPCChannelManager'; +import { GRPCChannelListener } from './GRPCChannelListener'; +import { GRPCChannelStatus } from './GRPCChannelStatus'; const logger = createLogger(__filename); const logReportError = throttled(logger, 'error', 30000); const logBufferFull = throttled(logger, 'warn', 30000); -export default class TraceReportClient implements Client { - private readonly reporterClient: TraceSegmentReportServiceClient; +export default class TraceSegmentServiceClient implements BootService, GRPCChannelListener { + private reporterClient!: TraceSegmentReportServiceClient; private readonly buffer: Segment[] = []; private timeout?: NodeJS.Timeout; - private segmentFinishedListener: (segment: Segment) => void; + private segmentFinishedListener!: (segment: Segment) => void; - constructor() { - this.reporterClient = new TraceSegmentReportServiceClient( - config.collectorAddress, - config.secure ? grpc.credentials.createSsl() : grpc.credentials.createInsecure(), - ); + prepare(): void { + ServiceManager.INSTANCE.findService(GRPCChannelManager)!.addChannelListener(this); - // Store listener reference for cleanup this.segmentFinishedListener = (segment: Segment) => { - // Limit buffer size to prevent memory leak during network issues if (this.buffer.length >= config.maxBufferSize) { logBufferFull( `Trace buffer reached maximum size (${config.maxBufferSize}); discarding oldest segments. The collector at ${config.collectorAddress} is likely unreachable.`, ); - this.buffer.shift(); // Remove oldest segment + this.buffer.shift(); } this.buffer.push(segment); @@ -61,38 +57,73 @@ export default class TraceReportClient implements Client { emitter.on('segment-finished', this.segmentFinishedListener); } - get isConnected(): boolean { - return this.reporterClient?.getChannel().getConnectivityState(true) === connectivityState.READY; + boot(): void { + this.timeout = setTimeout(this.reportFunction.bind(this), 1000) as unknown as NodeJS.Timeout; + this.timeout.unref(); + } + + onComplete(): void {} + + shutdown(): void { + if (this.segmentFinishedListener) { + emitter.off('segment-finished', this.segmentFinishedListener); + } + + if (this.timeout) { + clearTimeout(this.timeout); + this.timeout = undefined; + } + + this.buffer.length = 0; + logger.info('TraceSegmentServiceClient destroyed and resources cleaned up'); + } + + priority(): number { + return 0; + } + + statusChanged(status: GRPCChannelStatus): void { + if (status === GRPCChannelStatus.CONNECTED) { + this.reporterClient = this.createReporterClient(); + } + } + + private createReporterClient(): TraceSegmentReportServiceClient { + const channelManager = ServiceManager.INSTANCE.findService(GRPCChannelManager)!; + return new TraceSegmentReportServiceClient( + config.collectorAddress, + grpc.credentials.createInsecure(), + channelManager.getClientOptions(), + ); + } + + private get isConnected(): boolean { + return ServiceManager.INSTANCE.findService(GRPCChannelManager)!.isConnected(); } - private reportFunction(callback?: any) { - emitter.emit('segments-sent'); // reset limiter in SpanContext + private reportFunction(callback?: () => void) { + emitter.emit('segments-sent'); try { if (this.buffer.length === 0) { - if (callback) callback(); - + callback?.(); return; } - // Collector unreachable: keep the (bounded) buffer and let gRPC reconnect with its own exponential - // backoff, instead of failing a stream every tick and logging an error storm that exhausts the heap. - // The channel keeps trying to connect because `isConnected` polls getConnectivityState(true). - if (!this.isConnected) { - if (callback) callback(); - + if (!this.isConnected || !this.reporterClient) { + callback?.(); return; } const stream = this.reporterClient.collect( - AuthInterceptor(), + new grpc.Metadata(), { deadline: Date.now() + config.traceTimeout }, - (error, _) => { + (error) => { if (error) { logReportError('Failed to report trace data', error); + ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); } - - if (callback) callback(); + callback?.(); }, ); @@ -102,8 +133,7 @@ export default class TraceReportClient implements Client { if (logger._isDebugEnabled) { logger.debug('Sending segment ', { segment }); } - - stream.write(new SegmentObjectAdapter(segment)); + stream.write(segment.transform()); } } } finally { @@ -112,41 +142,16 @@ export default class TraceReportClient implements Client { stream.end(); } finally { - this.timeout = setTimeout(this.reportFunction.bind(this), 1000).unref(); + this.timeout = setTimeout(this.reportFunction.bind(this), 1000) as unknown as NodeJS.Timeout; + this.timeout.unref(); } } - start() { - this.timeout = setTimeout(this.reportFunction.bind(this), 1000).unref(); - } - - flush(): Promise | null { - // This function explicitly returns null instead of a resolved Promise in case of nothing to flush so that in this - // case passing control back to the event loop can be avoided. Even a resolved Promise will run other things in - // the event loop when it is awaited and before it continues. - + flush(): Promise | null { return this.buffer.length === 0 ? null : new Promise((resolve) => { this.reportFunction(resolve); }); } - - destroy(): void { - // Clean up event listener to prevent memory leak - if (this.segmentFinishedListener) { - emitter.off('segment-finished', this.segmentFinishedListener); - } - - // Clear timeout - if (this.timeout) { - clearTimeout(this.timeout); - this.timeout = undefined; - } - - // Clear buffer - this.buffer.length = 0; - - logger.info('TraceReportClient destroyed and resources cleaned up'); - } } diff --git a/src/agent/protocol/grpc/GrpcProtocol.ts b/src/agent/protocol/grpc/GrpcProtocol.ts deleted file mode 100644 index f151141..0000000 --- a/src/agent/protocol/grpc/GrpcProtocol.ts +++ /dev/null @@ -1,56 +0,0 @@ -/*! - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import Protocol from '../../../agent/protocol/Protocol'; -import HeartbeatClient from '../../../agent/protocol/grpc/clients/HeartbeatClient'; -import TraceReportClient from '../../../agent/protocol/grpc/clients/TraceReportClient'; - -export default class GrpcProtocol implements Protocol { - private readonly heartbeatClient: HeartbeatClient; - private readonly traceReportClient: TraceReportClient; - - constructor() { - this.heartbeatClient = new HeartbeatClient(); - this.traceReportClient = new TraceReportClient(); - } - - get isConnected(): boolean { - return this.heartbeatClient.isConnected && this.traceReportClient.isConnected; - } - - heartbeat(): this { - this.heartbeatClient.start(); - return this; - } - - report(): this { - this.traceReportClient.start(); - return this; - } - - flush(): Promise | null { - return this.traceReportClient.flush(); - } - - destroy(): void { - // Clean up both clients to prevent memory leaks - this.heartbeatClient.destroy?.(); - this.traceReportClient.destroy?.(); - } -} diff --git a/src/agent/protocol/grpc/SegmentObjectAdapter.ts b/src/agent/protocol/grpc/SegmentObjectAdapter.ts deleted file mode 100644 index 6559f20..0000000 --- a/src/agent/protocol/grpc/SegmentObjectAdapter.ts +++ /dev/null @@ -1,74 +0,0 @@ -/*! - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import config from '../../../config/AgentConfig'; -import { KeyStringValuePair } from '../../../proto/common/Common_pb'; -import Segment from '../../../trace/context/Segment'; -import { Log, RefType, SegmentObject, SegmentReference, SpanObject } from '../../../proto/language-agent/Tracing_pb'; - -/** - * An adapter that adapts {@link Segment} objects to gRPC object {@link SegmentObject}. - */ -export default class SegmentObjectAdapter extends SegmentObject { - constructor(segment: Segment) { - super(); - super - .setService(config.serviceName) - .setServiceinstance(config.serviceInstance) - .setTraceid(segment.relatedTraces[0].toString()) - .setTracesegmentid(segment.segmentId.toString()) - .setSpansList( - segment.spans.map((span) => - new SpanObject() - .setSpanid(span.id) - .setParentspanid(span.parentId) - .setStarttime(span.startTime) - .setEndtime(span.endTime) - .setOperationname(span.operation) - .setPeer(span.peer) - .setSpantype(span.type) - .setSpanlayer(span.layer) - .setComponentid(span.component.id) - .setIserror(span.errored) - .setLogsList( - span.logs.map((log) => - new Log() - .setTime(log.timestamp) - .setDataList( - log.items.map((logItem) => new KeyStringValuePair().setKey(logItem.key).setValue(logItem.val)), - ), - ), - ) - .setTagsList(span.tags.map((tag) => new KeyStringValuePair().setKey(tag.key).setValue(tag.val))) - .setRefsList( - span.refs.map((ref) => - new SegmentReference() - .setReftype(RefType.CROSSPROCESS) - .setTraceid(ref.traceId.toString()) - .setParenttracesegmentid(ref.segmentId.toString()) - .setParentspanid(ref.spanId) - .setParentservice(ref.service) - .setParentserviceinstance(ref.serviceInstance) - .setNetworkaddressusedatpeer(ref.clientAddress), - ), - ), - ), - ); - } -} diff --git a/src/config/AgentConfig.ts b/src/config/AgentConfig.ts index 2ad5905..4c79cf1 100644 --- a/src/config/AgentConfig.ts +++ b/src/config/AgentConfig.ts @@ -43,9 +43,62 @@ export type AgentConfig = { reIgnoreOperation?: RegExp; reHttpIgnoreMethod?: RegExp; traceTimeout?: number; + runtimeMetricsReporterActive?: boolean; + runtimeMetricsCollectPeriod?: number; + runtimeMetricsReportPeriod?: number; + runtimeMetricsBufferSize?: number; + /** @deprecated use runtimeMetricsReporterActive */ + nvmMetricsReporterActive?: boolean; + /** @deprecated use runtimeMetricsCollectPeriod */ + nvmMetricsCollectPeriod?: number; + /** @deprecated use runtimeMetricsReportPeriod */ + nvmMetricsReportPeriod?: number; + /** @deprecated use runtimeMetricsBufferSize */ + nvmMetricsBufferSize?: number; + /** @deprecated use runtimeMetricsReporterActive */ + nvmJvmReporterActive?: boolean; + /** @deprecated use runtimeMetricsCollectPeriod */ + nvmJvmMetricsCollectPeriod?: number; + /** @deprecated use runtimeMetricsReportPeriod */ + nvmJvmMetricsReportPeriod?: number; + /** @deprecated use runtimeMetricsBufferSize */ + nvmJvmMetricsBufferSize?: number; }; +function applyDeprecatedRuntimeMetricConfig(config: AgentConfig): void { + if (config.runtimeMetricsReporterActive === undefined) { + if (config.nvmMetricsReporterActive !== undefined) { + config.runtimeMetricsReporterActive = config.nvmMetricsReporterActive; + } else if (config.nvmJvmReporterActive !== undefined) { + config.runtimeMetricsReporterActive = config.nvmJvmReporterActive; + } + } + if (config.runtimeMetricsCollectPeriod === undefined) { + if (config.nvmMetricsCollectPeriod !== undefined) { + config.runtimeMetricsCollectPeriod = config.nvmMetricsCollectPeriod; + } else if (config.nvmJvmMetricsCollectPeriod !== undefined) { + config.runtimeMetricsCollectPeriod = config.nvmJvmMetricsCollectPeriod; + } + } + if (config.runtimeMetricsReportPeriod === undefined) { + if (config.nvmMetricsReportPeriod !== undefined) { + config.runtimeMetricsReportPeriod = config.nvmMetricsReportPeriod; + } else if (config.nvmJvmMetricsReportPeriod !== undefined) { + config.runtimeMetricsReportPeriod = config.nvmJvmMetricsReportPeriod; + } + } + if (config.runtimeMetricsBufferSize === undefined) { + if (config.nvmMetricsBufferSize !== undefined) { + config.runtimeMetricsBufferSize = config.nvmMetricsBufferSize; + } else if (config.nvmJvmMetricsBufferSize !== undefined) { + config.runtimeMetricsBufferSize = config.nvmJvmMetricsBufferSize; + } + } +} + export function finalizeConfig(config: AgentConfig): void { + applyDeprecatedRuntimeMetricConfig(config); + const escapeRegExp = (s: string) => s.replace(/([.*+?^=!:${}()|\[\]\/\\])/g, '\\$1'); config.reDisablePlugins = RegExp( @@ -140,6 +193,44 @@ const _config = { traceTimeout: ((n) => (Number.isSafeInteger(n) && n > 0 ? n : 10 * 1000))( Number.parseInt(process.env.SW_AGENT_TRACE_TIMEOUT ?? '', 10), ), + runtimeMetricsReporterActive: ((): boolean => { + const configured = + process.env.SW_AGENT_NODEJS_RUNTIME_METRICS_REPORTER_ACTIVE ?? + process.env.SW_AGENT_RUNTIME_METRICS_REPORTER_ACTIVE ?? + process.env.SW_AGENT_NVM_METRICS_REPORTER_ACTIVE ?? + process.env.SW_AGENT_NVM_JVM_REPORTER_ACTIVE; + return configured?.toLowerCase() !== 'false'; + })(), + runtimeMetricsCollectPeriod: ((n) => (Number.isSafeInteger(n) && n > 0 ? n : 1000))( + Number.parseInt( + process.env.SW_AGENT_NODEJS_RUNTIME_METRICS_COLLECT_PERIOD ?? + process.env.SW_AGENT_RUNTIME_METRICS_COLLECT_PERIOD ?? + process.env.SW_AGENT_NVM_METRICS_COLLECT_PERIOD ?? + process.env.SW_AGENT_NVM_JVM_METRICS_COLLECT_PERIOD ?? + '', + 10, + ), + ), + runtimeMetricsReportPeriod: ((n) => (Number.isSafeInteger(n) && n > 0 ? n : 1000))( + Number.parseInt( + process.env.SW_AGENT_NODEJS_RUNTIME_METRICS_REPORT_PERIOD ?? + process.env.SW_AGENT_RUNTIME_METRICS_REPORT_PERIOD ?? + process.env.SW_AGENT_NVM_METRICS_REPORT_PERIOD ?? + process.env.SW_AGENT_NVM_JVM_METRICS_REPORT_PERIOD ?? + '', + 10, + ), + ), + runtimeMetricsBufferSize: ((n) => (Number.isSafeInteger(n) && n > 0 ? n : 600))( + Number.parseInt( + process.env.SW_AGENT_NODEJS_RUNTIME_METRICS_BUFFER_SIZE ?? + process.env.SW_AGENT_RUNTIME_METRICS_BUFFER_SIZE ?? + process.env.SW_AGENT_NVM_METRICS_BUFFER_SIZE ?? + process.env.SW_AGENT_NVM_JVM_METRICS_BUFFER_SIZE ?? + '', + 10, + ), + ), }; export default _config; diff --git a/src/index.ts b/src/index.ts index ae2c494..e32a638 100644 --- a/src/index.ts +++ b/src/index.ts @@ -18,8 +18,7 @@ */ import config, { AgentConfig, finalizeConfig } from './config/AgentConfig'; -import Protocol from './agent/protocol/Protocol'; -import GrpcProtocol from './agent/protocol/grpc/GrpcProtocol'; +import ServiceManager from './agent/core/boot/ServiceManager'; import { createLogger } from './logging'; import PluginInstaller from './core/PluginInstaller'; import SpanContext from './trace/context/SpanContext'; @@ -28,7 +27,6 @@ const logger = createLogger(__filename); class Agent { private started = false; - private protocol: Protocol | null = null; start(options: AgentConfig = {}): void { if (process.env.SW_DISABLE === 'true') { @@ -48,42 +46,38 @@ class Agent { new PluginInstaller().install(); - this.protocol = new GrpcProtocol().heartbeat().report(); + ServiceManager.INSTANCE.boot(); this.started = true; } flush(): Promise | null { - if (this.protocol === null) { + if (!this.started) { logger.warn('Trying to flush() SkyWalking agent which is not started.'); return null; } - const spanContextFlush = SpanContext.flush(); // if there are spans which haven't finished then wait for them - const protocol = this.protocol; - - if (!spanContextFlush) return protocol.flush(); + const spanContextFlush = SpanContext.flush(); + if (!spanContextFlush) { + return ServiceManager.INSTANCE.flush(); + } return new Promise((resolve) => { spanContextFlush.then(() => { - const protocolFlush = protocol.flush(); - - if (!protocolFlush) resolve(null); - else protocolFlush.then(() => resolve(null)); + const serviceFlush = ServiceManager.INSTANCE.flush(); + if (!serviceFlush) resolve(null); + else serviceFlush.then(() => resolve(null)); }); }); } destroy(): void { - if (this.protocol === null) { + if (!this.started) { logger.warn('Trying to destroy() SkyWalking agent which is not started.'); return; } logger.info('Destroying SkyWalking agent and cleaning up resources'); - - // Clean up protocol resources - this.protocol.destroy?.(); - this.protocol = null; + ServiceManager.INSTANCE.shutdown(); this.started = false; } } diff --git a/src/trace/context/Segment.ts b/src/trace/context/Segment.ts index 9c8a7fd..7d97414 100644 --- a/src/trace/context/Segment.ts +++ b/src/trace/context/Segment.ts @@ -21,6 +21,9 @@ import Span from '../../trace/span/Span'; import ID from '../../trace/ID'; import NewID from '../../trace/NewID'; import SegmentRef from '../../trace/context/SegmentRef'; +import config from '../../config/AgentConfig'; +import { KeyStringValuePair } from '../../proto/common/Common_pb'; +import { Log, RefType, SegmentObject, SegmentReference, SpanObject } from '../../proto/language-agent/Tracing_pb'; export default class Segment { segmentId = new ID(); @@ -48,4 +51,50 @@ export default class Segment { return this; } + + /** Convert to gRPC SegmentObject (Java TraceSegment.transform). */ + transform(): SegmentObject { + return new SegmentObject() + .setService(config.serviceName) + .setServiceinstance(config.serviceInstance) + .setTraceid(this.relatedTraces[0].toString()) + .setTracesegmentid(this.segmentId.toString()) + .setSpansList( + this.spans.map((span) => + new SpanObject() + .setSpanid(span.id) + .setParentspanid(span.parentId) + .setStarttime(span.startTime) + .setEndtime(span.endTime) + .setOperationname(span.operation) + .setPeer(span.peer) + .setSpantype(span.type) + .setSpanlayer(span.layer) + .setComponentid(span.component.id) + .setIserror(span.errored) + .setLogsList( + span.logs.map((log) => + new Log() + .setTime(log.timestamp) + .setDataList( + log.items.map((logItem) => new KeyStringValuePair().setKey(logItem.key).setValue(logItem.val)), + ), + ), + ) + .setTagsList(span.tags.map((tag) => new KeyStringValuePair().setKey(tag.key).setValue(tag.val))) + .setRefsList( + span.refs.map((ref) => + new SegmentReference() + .setReftype(RefType.CROSSPROCESS) + .setTraceid(ref.traceId.toString()) + .setParenttracesegmentid(ref.segmentId.toString()) + .setParentspanid(ref.spanId) + .setParentservice(ref.service) + .setParentserviceinstance(ref.serviceInstance) + .setNetworkaddressusedatpeer(ref.clientAddress), + ), + ), + ), + ); + } } diff --git a/tests/plugins/express/expected.data.yaml b/tests/plugins/express/expected.data.yaml index 891f130..afddb26 100644 --- a/tests/plugins/express/expected.data.yaml +++ b/tests/plugins/express/expected.data.yaml @@ -119,3 +119,59 @@ segmentItems: spanType: Exit peer: server:5000 skipAnalysis: false + +meterItems: + - serviceName: server + meterSize: 6 + meters: + - meterId: + name: instance_nodejs_process_cpu + tags: [] + singleValue: gt 0 + - meterId: + name: instance_nodejs_heap_used + tags: [] + singleValue: gt 0 + - meterId: + name: instance_nodejs_heap_total + tags: [] + singleValue: gt 0 + - meterId: + name: instance_nodejs_heap_limit + tags: [] + singleValue: gt 0 + - meterId: + name: instance_nodejs_rss + tags: [] + singleValue: gt 0 + - meterId: + name: instance_nodejs_external_memory + tags: [] + singleValue: gt 0 + - serviceName: client + meterSize: 6 + meters: + - meterId: + name: instance_nodejs_process_cpu + tags: [] + singleValue: gt 0 + - meterId: + name: instance_nodejs_heap_used + tags: [] + singleValue: gt 0 + - meterId: + name: instance_nodejs_heap_total + tags: [] + singleValue: gt 0 + - meterId: + name: instance_nodejs_heap_limit + tags: [] + singleValue: gt 0 + - meterId: + name: instance_nodejs_rss + tags: [] + singleValue: gt 0 + - meterId: + name: instance_nodejs_external_memory + tags: [] + singleValue: gt 0 diff --git a/tests/runtime/RuntimeMetricsCollector.test.ts b/tests/runtime/RuntimeMetricsCollector.test.ts new file mode 100644 index 0000000..35b7833 --- /dev/null +++ b/tests/runtime/RuntimeMetricsCollector.test.ts @@ -0,0 +1,57 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* eslint-env jest */ + +import RuntimeMetricsCollector from '../../src/agent/core/meter/RuntimeMetricsCollector'; + +describe('RuntimeMetricsCollector', () => { + let collector: RuntimeMetricsCollector; + + beforeEach(() => { + collector = new RuntimeMetricsCollector(); + }); + + afterEach(() => { + collector.destroy(); + }); + + it('maps Node.js runtime data into nodejs meter fields', () => { + const snapshot = collector.sample(); + const meters = collector.toMeterData(snapshot); + const names = meters.map((meter) => meter.getSinglevalue()?.getName()); + + expect(names).toEqual( + expect.arrayContaining([ + 'instance_nodejs_process_cpu', + 'instance_nodejs_heap_used', + 'instance_nodejs_heap_total', + 'instance_nodejs_heap_limit', + 'instance_nodejs_rss', + 'instance_nodejs_external_memory', + ]), + ); + + expect(names).toHaveLength(6); + + for (const meter of meters) { + expect(meter.getSinglevalue()?.getValue()).toBeGreaterThanOrEqual(0); + } + }); +}); From bc6c898e077e0d82bbf3277a5c8ece674bd8a870 Mon Sep 17 00:00:00 2001 From: Song Zhendong <289505773@qq.com> Date: Sat, 27 Jun 2026 18:35:04 +0800 Subject: [PATCH 2/7] Address PR #139 review: Node-native gRPC lifecycle safety - GRPCChannelManager: closed flag and stale connectivity callback guards - GRPCChannelManager: reportError notifies DISCONNECT on network errors - ServiceManagementClient: track status, clear client on disconnect, RPC deadlines - TraceSegmentServiceClient: single report timer/in-flight promise, flush safety - MeterSender: status gating and shared in-flight report promise - Remove unused GRPCStreamServiceStatus busy-wait class --- src/agent/core/meter/MeterSender.ts | 114 ++++++++++-------- src/agent/core/remote/GRPCChannelManager.ts | 43 ++++++- .../core/remote/GRPCStreamServiceStatus.ts | 62 ---------- .../core/remote/ServiceManagementClient.ts | 59 +++++---- .../core/remote/TraceSegmentServiceClient.ts | 68 +++++++---- 5 files changed, 179 insertions(+), 167 deletions(-) delete mode 100644 src/agent/core/remote/GRPCStreamServiceStatus.ts diff --git a/src/agent/core/meter/MeterSender.ts b/src/agent/core/meter/MeterSender.ts index 4d4c5ac..d32b528 100644 --- a/src/agent/core/meter/MeterSender.ts +++ b/src/agent/core/meter/MeterSender.ts @@ -34,10 +34,12 @@ const logReportError = throttled(logger, 'error', 30000); /** Reports Node.js runtime metrics via gRPC MeterReportService (Go/Python-compatible pipeline). */ export default class MeterSender implements BootService, GRPCChannelListener { - private reporterClient!: MeterReportServiceClient; + private status = GRPCChannelStatus.DISCONNECT; + private reporterClient?: MeterReportServiceClient; private readonly buffer: RuntimeSnapshot[] = []; private collectTimer?: NodeJS.Timeout; private reportTimer?: NodeJS.Timeout; + private reporting?: Promise; private collector!: RuntimeMetricsCollector; @@ -57,9 +59,8 @@ export default class MeterSender implements BootService, GRPCChannelListener { } statusChanged(status: GRPCChannelStatus): void { - if (status === GRPCChannelStatus.CONNECTED) { - this.reporterClient = this.createReporterClient(); - } + this.status = status; + this.reporterClient = status === GRPCChannelStatus.CONNECTED ? this.createReporterClient() : undefined; } private createReporterClient(): MeterReportServiceClient { @@ -70,20 +71,15 @@ export default class MeterSender implements BootService, GRPCChannelListener { ); } - get isConnected(): boolean { - return ServiceManager.INSTANCE.findService(GRPCChannelManager)!.isConnected(); - } - private startTimers(): void { this.collectTimer = setInterval( () => this.collectSample(), config.runtimeMetricsCollectPeriod || 1000, ) as NodeJS.Timeout; this.collectTimer.unref(); - this.reportTimer = setInterval( - () => this.reportBufferedMetrics(), - config.runtimeMetricsReportPeriod || 1000, - ) as NodeJS.Timeout; + this.reportTimer = setInterval(() => { + void this.reportBufferedMetrics(); + }, config.runtimeMetricsReportPeriod || 1000) as NodeJS.Timeout; this.reportTimer.unref(); } @@ -95,56 +91,68 @@ export default class MeterSender implements BootService, GRPCChannelListener { this.buffer.push(this.collector.sample()); } - private reportBufferedMetrics(callback?: () => void): void { - try { - if (this.buffer.length === 0 || !this.isConnected || !this.reporterClient) { - if (callback) callback(); - return; - } + private reportBufferedMetrics(): Promise { + if (this.reporting) { + return this.reporting; + } - const snapshots = this.buffer.splice(0, this.buffer.length); - const stream = this.reporterClient.collect( - new grpc.Metadata(), - { deadline: Date.now() + (config.traceTimeout || 10000) }, - (error: grpc.ServiceError | null) => { - if (error) { - logReportError('Failed to report runtime meter data', error); - ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); - } - if (callback) callback(); - }, - ); + this.reporting = this.doReportBufferedMetrics().finally(() => { + this.reporting = undefined; + }); + + return this.reporting; + } + private doReportBufferedMetrics(): Promise { + return new Promise((resolve) => { try { - let metadataWritten = false; - const timestamp = Date.now(); - for (const snapshot of snapshots) { - for (const meterData of this.collector.toMeterData(snapshot)) { - if (!metadataWritten) { - meterData - .setService(config.serviceName!) - .setServiceinstance(config.serviceInstance!) - .setTimestamp(timestamp); - metadataWritten = true; + if (this.buffer.length === 0 || this.status !== GRPCChannelStatus.CONNECTED || !this.reporterClient) { + resolve(); + return; + } + + const snapshots = this.buffer.splice(0, this.buffer.length); + const stream = this.reporterClient.collect( + new grpc.Metadata(), + { deadline: Date.now() + (config.traceTimeout || 10000) }, + (error: grpc.ServiceError | null) => { + if (error) { + logReportError('Failed to report runtime meter data', error); + ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); + } + resolve(); + }, + ); + + try { + let metadataWritten = false; + const timestamp = Date.now(); + for (const snapshot of snapshots) { + for (const meterData of this.collector.toMeterData(snapshot)) { + if (!metadataWritten) { + meterData + .setService(config.serviceName!) + .setServiceinstance(config.serviceInstance!) + .setTimestamp(timestamp); + metadataWritten = true; + } + stream.write(meterData); } - stream.write(meterData); } + } finally { + stream.end(); } - } finally { - stream.end(); + } catch (error) { + logReportError('Failed to report runtime meter data', error); + ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); + resolve(); } - } catch (error) { - logReportError('Failed to report runtime meter data', error); - ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); - if (callback) callback(); - } + }); } flush(): Promise | null { - return new Promise((resolve) => { - this.collectSample(); - this.reportBufferedMetrics(resolve); - }); + this.collectSample(); + return this.reportBufferedMetrics(); } shutdown(): void { @@ -156,6 +164,8 @@ export default class MeterSender implements BootService, GRPCChannelListener { clearInterval(this.reportTimer); this.reportTimer = undefined; } + this.reporting = undefined; + this.reporterClient = undefined; this.buffer.length = 0; this.collector.destroy(); logger.info('MeterSender destroyed and resources cleaned up'); diff --git a/src/agent/core/remote/GRPCChannelManager.ts b/src/agent/core/remote/GRPCChannelManager.ts index c47a3d1..79d18cf 100644 --- a/src/agent/core/remote/GRPCChannelManager.ts +++ b/src/agent/core/remote/GRPCChannelManager.ts @@ -32,6 +32,18 @@ import TLSChannelBuilder from './TLSChannelBuilder'; const logger = createLogger(__filename); +function isGrpcNetworkError(error: unknown): boolean { + const code = (error as grpc.ServiceError | undefined)?.code; + + return ( + code === grpc.status.UNAVAILABLE || + code === grpc.status.PERMISSION_DENIED || + code === grpc.status.UNAUTHENTICATED || + code === grpc.status.RESOURCE_EXHAUSTED || + code === grpc.status.UNKNOWN + ); +} + /** * Shared gRPC channel manager (Java GRPCChannelManager skeleton). * V1: single address; V2 reserved: multi-address failover via reportError(). @@ -40,6 +52,7 @@ export default class GRPCChannelManager implements BootService { private managedChannel: GRPCChannel | null = null; private readonly listeners: GRPCChannelListener[] = []; private lastStatus: GRPCChannelStatus | null = null; + private closed = false; /** V1: first address when comma-separated; V2: failover selection. */ resolveAddress(): string { @@ -74,14 +87,21 @@ export default class GRPCChannelManager implements BootService { return Number.MAX_SAFE_INTEGER; } - /** V2 hook: network errors may trigger address failover. */ + /** Notify DISCONNECT on network errors so periodic work stops until grpc-js reconnects. */ reportError(error: unknown): void { - logger.debug('gRPC report error (multi-address failover reserved for V2): %s', error); + if (!isGrpcNetworkError(error)) { + logger.debug('gRPC report error (ignored): %s', error); + return; + } + + logger.debug('gRPC network error, notify DISCONNECT: %s', error); + this.notify(GRPCChannelStatus.DISCONNECT); } prepare(): void {} boot(): void { + this.closed = false; const address = this.resolveAddress(); const [host, portText] = address.split(':'); const port = Number.parseInt(portText, 10); @@ -103,16 +123,29 @@ export default class GRPCChannelManager implements BootService { onComplete(): void {} shutdown(): void { - this.managedChannel?.shutdownNow(); + this.closed = true; + const managed = this.managedChannel; this.managedChannel = null; + managed?.shutdownNow(); + this.notify(GRPCChannelStatus.DISCONNECT); } private watchConnectivityState(): void { - const channel = this.getChannel(); + const managed = this.managedChannel; + if (this.closed || !managed) { + return; + } + + const channel = managed.getChannel(); const currentState = channel.getConnectivityState(true); + channel.watchConnectivityState(currentState, Infinity, (error) => { + if (this.closed || this.managedChannel !== managed) { + return; + } + if (error) { - logger.debug('Channel connectivity watch error: %s', error.message); + logger.debug('Channel connectivity watch stopped: %s', error.message); return; } diff --git a/src/agent/core/remote/GRPCStreamServiceStatus.ts b/src/agent/core/remote/GRPCStreamServiceStatus.ts deleted file mode 100644 index 79f984d..0000000 --- a/src/agent/core/remote/GRPCStreamServiceStatus.ts +++ /dev/null @@ -1,62 +0,0 @@ -/*! - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { createLogger } from '../../../logging'; - -const logger = createLogger(__filename); - -/** Tracks gRPC stream completion (Java {@code GRPCStreamServiceStatus}). */ -export default class GRPCStreamServiceStatus { - private status = false; - - constructor(status = false) { - this.status = status; - } - - isStatus(): boolean { - return this.status; - } - - finished(): void { - this.status = true; - } - - /** Wait until success status reported (Java wait4Finish). */ - wait4Finish(): void { - let recheckCycle = 5; - let hasWaited = 0; - const maxCycle = 30 * 1000; - while (!this.status) { - this.try2Sleep(recheckCycle); - hasWaited += recheckCycle; - if (recheckCycle >= maxCycle) { - logger.warn("Collector stream service doesn't response in %s seconds.", hasWaited / 1000); - } else { - recheckCycle = Math.min(recheckCycle * 2, maxCycle); - } - } - } - - private try2Sleep(millis: number): void { - const end = Date.now() + millis; - while (Date.now() < end) { - // busy-wait fallback; Node agent has no Thread.sleep in sync path - } - } -} diff --git a/src/agent/core/remote/ServiceManagementClient.ts b/src/agent/core/remote/ServiceManagementClient.ts index 4eb0120..098c04e 100644 --- a/src/agent/core/remote/ServiceManagementClient.ts +++ b/src/agent/core/remote/ServiceManagementClient.ts @@ -35,8 +35,11 @@ const logger = createLogger(__filename); const logHeartbeatError = throttled(logger, 'error', 30000); export default class ServiceManagementClient implements BootService, GRPCChannelListener { - private managementServiceClient!: ManagementServiceClient; + private status = GRPCChannelStatus.DISCONNECT; + private managementServiceClient?: ManagementServiceClient; private heartbeatTimer?: NodeJS.Timeout; + private keepAlivePkg?: InstancePingPkg; + private instanceProperties?: InstanceProperties; prepare(): void { ServiceManager.INSTANCE.findService(GRPCChannelManager)!.addChannelListener(this); @@ -52,11 +55,9 @@ export default class ServiceManagementClient implements BootService, GRPCChannel return; } - const keepAlivePkg = new InstancePingPkg() - .setService(config.serviceName) - .setServiceinstance(config.serviceInstance); + this.keepAlivePkg = new InstancePingPkg().setService(config.serviceName).setServiceinstance(config.serviceInstance); - const instanceProperties = new InstanceProperties() + this.instanceProperties = new InstanceProperties() .setService(config.serviceName) .setServiceinstance(config.serviceInstance) .setPropertiesList([ @@ -66,23 +67,7 @@ export default class ServiceManagementClient implements BootService, GRPCChannel new KeyStringValuePair().setKey('Process No.').setValue(`${process.pid}`), ]); - this.heartbeatTimer = setInterval(() => { - if (!this.managementServiceClient) { - return; - } - this.managementServiceClient.reportInstanceProperties(instanceProperties, new grpc.Metadata(), (error) => { - if (error) { - logHeartbeatError('Failed to send heartbeat', error); - ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); - } - }); - this.managementServiceClient.keepAlive(keepAlivePkg, new grpc.Metadata(), (error) => { - if (error) { - logHeartbeatError('Failed to send heartbeat', error); - ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); - } - }); - }, 20000).unref(); + this.heartbeatTimer = setInterval(() => this.sendHeartbeat(), 20000).unref(); } onComplete(): void {} @@ -92,6 +77,7 @@ export default class ServiceManagementClient implements BootService, GRPCChannel clearInterval(this.heartbeatTimer); this.heartbeatTimer = undefined; } + this.managementServiceClient = undefined; logger.info('ServiceManagementClient destroyed and resources cleaned up'); } @@ -100,9 +86,34 @@ export default class ServiceManagementClient implements BootService, GRPCChannel } statusChanged(status: GRPCChannelStatus): void { - if (status === GRPCChannelStatus.CONNECTED) { - this.managementServiceClient = this.createManagementClient(); + this.status = status; + this.managementServiceClient = status === GRPCChannelStatus.CONNECTED ? this.createManagementClient() : undefined; + } + + private sendHeartbeat(): void { + if (this.status !== GRPCChannelStatus.CONNECTED || !this.managementServiceClient) { + return; } + + const options = { deadline: Date.now() + config.traceTimeout }; + + this.managementServiceClient.reportInstanceProperties( + this.instanceProperties!, + new grpc.Metadata(), + options, + (error) => { + if (error) { + logHeartbeatError('Failed to send heartbeat', error); + ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); + } + }, + ); + this.managementServiceClient.keepAlive(this.keepAlivePkg!, new grpc.Metadata(), options, (error) => { + if (error) { + logHeartbeatError('Failed to send heartbeat', error); + ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); + } + }); } private createManagementClient(): ManagementServiceClient { diff --git a/src/agent/core/remote/TraceSegmentServiceClient.ts b/src/agent/core/remote/TraceSegmentServiceClient.ts index 9f03c61..b9345dc 100644 --- a/src/agent/core/remote/TraceSegmentServiceClient.ts +++ b/src/agent/core/remote/TraceSegmentServiceClient.ts @@ -34,9 +34,11 @@ const logReportError = throttled(logger, 'error', 30000); const logBufferFull = throttled(logger, 'warn', 30000); export default class TraceSegmentServiceClient implements BootService, GRPCChannelListener { - private reporterClient!: TraceSegmentReportServiceClient; + private status = GRPCChannelStatus.DISCONNECT; + private reporterClient?: TraceSegmentReportServiceClient; private readonly buffer: Segment[] = []; private timeout?: NodeJS.Timeout; + private reporting?: Promise; private segmentFinishedListener!: (segment: Segment) => void; prepare(): void { @@ -58,8 +60,7 @@ export default class TraceSegmentServiceClient implements BootService, GRPCChann } boot(): void { - this.timeout = setTimeout(this.reportFunction.bind(this), 1000) as unknown as NodeJS.Timeout; - this.timeout.unref(); + this.scheduleNextReport(); } onComplete(): void {} @@ -74,6 +75,8 @@ export default class TraceSegmentServiceClient implements BootService, GRPCChann this.timeout = undefined; } + this.reporting = undefined; + this.reporterClient = undefined; this.buffer.length = 0; logger.info('TraceSegmentServiceClient destroyed and resources cleaned up'); } @@ -83,9 +86,8 @@ export default class TraceSegmentServiceClient implements BootService, GRPCChann } statusChanged(status: GRPCChannelStatus): void { - if (status === GRPCChannelStatus.CONNECTED) { - this.reporterClient = this.createReporterClient(); - } + this.status = status; + this.reporterClient = status === GRPCChannelStatus.CONNECTED ? this.createReporterClient() : undefined; } private createReporterClient(): TraceSegmentReportServiceClient { @@ -97,21 +99,41 @@ export default class TraceSegmentServiceClient implements BootService, GRPCChann ); } - private get isConnected(): boolean { - return ServiceManager.INSTANCE.findService(GRPCChannelManager)!.isConnected(); + private scheduleNextReport(): void { + if (this.timeout) { + return; + } + + this.timeout = setTimeout(() => { + this.timeout = undefined; + void this.reportOnce().finally(() => this.scheduleNextReport()); + }, 1000) as unknown as NodeJS.Timeout; + this.timeout.unref(); + } + + private reportOnce(): Promise { + if (this.reporting) { + return this.reporting; + } + + this.reporting = this.doReport().finally(() => { + this.reporting = undefined; + }); + + return this.reporting; } - private reportFunction(callback?: () => void) { - emitter.emit('segments-sent'); + private doReport(): Promise { + return new Promise((resolve) => { + emitter.emit('segments-sent'); - try { if (this.buffer.length === 0) { - callback?.(); + resolve(); return; } - if (!this.isConnected || !this.reporterClient) { - callback?.(); + if (this.status !== GRPCChannelStatus.CONNECTED || !this.reporterClient) { + resolve(); return; } @@ -123,7 +145,7 @@ export default class TraceSegmentServiceClient implements BootService, GRPCChann logReportError('Failed to report trace data', error); ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); } - callback?.(); + resolve(); }, ); @@ -141,17 +163,15 @@ export default class TraceSegmentServiceClient implements BootService, GRPCChann } stream.end(); - } finally { - this.timeout = setTimeout(this.reportFunction.bind(this), 1000) as unknown as NodeJS.Timeout; - this.timeout.unref(); - } + }); } flush(): Promise | null { - return this.buffer.length === 0 - ? null - : new Promise((resolve) => { - this.reportFunction(resolve); - }); + if (this.timeout) { + clearTimeout(this.timeout); + this.timeout = undefined; + } + + return this.buffer.length === 0 ? null : this.reportOnce(); } } From 916f457c850d5d3d116ac70108de5634cf51f2d0 Mon Sep 17 00:00:00 2001 From: Song Zhendong <289505773@qq.com> Date: Sat, 27 Jun 2026 18:35:04 +0800 Subject: [PATCH 3/7] Fix TraceSegmentServiceClient flush to restart report loop after manual flush --- src/agent/core/remote/TraceSegmentServiceClient.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/agent/core/remote/TraceSegmentServiceClient.ts b/src/agent/core/remote/TraceSegmentServiceClient.ts index b9345dc..0841c3b 100644 --- a/src/agent/core/remote/TraceSegmentServiceClient.ts +++ b/src/agent/core/remote/TraceSegmentServiceClient.ts @@ -172,6 +172,11 @@ export default class TraceSegmentServiceClient implements BootService, GRPCChann this.timeout = undefined; } - return this.buffer.length === 0 ? null : this.reportOnce(); + if (this.buffer.length === 0) { + this.scheduleNextReport(); + return null; + } + + return this.reportOnce().finally(() => this.scheduleNextReport()); } } From d67e866f44b550d4eacd1a304032ea09ad142ba0 Mon Sep 17 00:00:00 2001 From: songzhendong <289505773@qq.com> Date: Sat, 27 Jun 2026 21:30:32 +0800 Subject: [PATCH 4/7] Address PR #139 round-2 review: safe async callbacks and reportError recovery - Capture GRPCChannelManager in prepare(); reportGrpcError() no-ops after shutdown - Apply pattern in ServiceManagementClient, TraceSegmentServiceClient, MeterSender - reportError(): re-notify CONNECTED when grpc-js channel stays ready - Guard scheduleNextReport, flush, heartbeat, and meter timers with closed flag - Trace stream: always end in finally; EventEmitter off-before-on and closed guard - Clear GRPCChannelManager listeners on shutdown; replace channelManager! with guards Signed-off-by: songzhendong <289505773@qq.com> --- src/agent/core/meter/MeterSender.ts | 73 ++++++++++++--- src/agent/core/remote/GRPCChannelManager.ts | 27 +++++- .../core/remote/ServiceManagementClient.ts | 70 +++++++++++---- .../core/remote/TraceSegmentServiceClient.ts | 89 ++++++++++++++----- 4 files changed, 205 insertions(+), 54 deletions(-) diff --git a/src/agent/core/meter/MeterSender.ts b/src/agent/core/meter/MeterSender.ts index d32b528..b66d370 100644 --- a/src/agent/core/meter/MeterSender.ts +++ b/src/agent/core/meter/MeterSender.ts @@ -34,6 +34,8 @@ const logReportError = throttled(logger, 'error', 30000); /** Reports Node.js runtime metrics via gRPC MeterReportService (Go/Python-compatible pipeline). */ export default class MeterSender implements BootService, GRPCChannelListener { + private closed = false; + private channelManager?: GRPCChannelManager; private status = GRPCChannelStatus.DISCONNECT; private reporterClient?: MeterReportServiceClient; private readonly buffer: RuntimeSnapshot[] = []; @@ -45,10 +47,16 @@ export default class MeterSender implements BootService, GRPCChannelListener { prepare(): void { this.collector = new RuntimeMetricsCollector(); - ServiceManager.INSTANCE.findService(GRPCChannelManager)!.addChannelListener(this); + this.channelManager = ServiceManager.INSTANCE.findService(GRPCChannelManager); + this.channelManager?.addChannelListener(this); } boot(): void { + if (this.collectTimer || this.reportTimer) { + logger.warn('MeterSender timers already scheduled; skipping duplicate boot.'); + return; + } + this.startTimers(); } @@ -63,21 +71,30 @@ export default class MeterSender implements BootService, GRPCChannelListener { this.reporterClient = status === GRPCChannelStatus.CONNECTED ? this.createReporterClient() : undefined; } - private createReporterClient(): MeterReportServiceClient { + private createReporterClient(): MeterReportServiceClient | undefined { + if (!this.channelManager) { + return undefined; + } + return new MeterReportServiceClient( config.collectorAddress, grpc.credentials.createInsecure(), - ServiceManager.INSTANCE.findService(GRPCChannelManager)!.getClientOptions(), + this.channelManager.getClientOptions(), ); } private startTimers(): void { - this.collectTimer = setInterval( - () => this.collectSample(), - config.runtimeMetricsCollectPeriod || 1000, - ) as NodeJS.Timeout; + this.collectTimer = setInterval(() => { + if (this.closed) { + return; + } + this.collectSample(); + }, config.runtimeMetricsCollectPeriod || 1000) as NodeJS.Timeout; this.collectTimer.unref(); this.reportTimer = setInterval(() => { + if (this.closed) { + return; + } void this.reportBufferedMetrics(); }, config.runtimeMetricsReportPeriod || 1000) as NodeJS.Timeout; this.reportTimer.unref(); @@ -92,6 +109,10 @@ export default class MeterSender implements BootService, GRPCChannelListener { } private reportBufferedMetrics(): Promise { + if (this.closed) { + return Promise.resolve(); + } + if (this.reporting) { return this.reporting; } @@ -106,11 +127,21 @@ export default class MeterSender implements BootService, GRPCChannelListener { private doReportBufferedMetrics(): Promise { return new Promise((resolve) => { try { + if (this.closed) { + resolve(); + return; + } + if (this.buffer.length === 0 || this.status !== GRPCChannelStatus.CONNECTED || !this.reporterClient) { resolve(); return; } + if (!config.serviceName || !config.serviceInstance) { + resolve(); + return; + } + const snapshots = this.buffer.splice(0, this.buffer.length); const stream = this.reporterClient.collect( new grpc.Metadata(), @@ -118,7 +149,7 @@ export default class MeterSender implements BootService, GRPCChannelListener { (error: grpc.ServiceError | null) => { if (error) { logReportError('Failed to report runtime meter data', error); - ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); + this.reportGrpcError(error); } resolve(); }, @@ -131,8 +162,8 @@ export default class MeterSender implements BootService, GRPCChannelListener { for (const meterData of this.collector.toMeterData(snapshot)) { if (!metadataWritten) { meterData - .setService(config.serviceName!) - .setServiceinstance(config.serviceInstance!) + .setService(config.serviceName) + .setServiceinstance(config.serviceInstance) .setTimestamp(timestamp); metadataWritten = true; } @@ -140,22 +171,39 @@ export default class MeterSender implements BootService, GRPCChannelListener { } } } finally { - stream.end(); + try { + stream.end(); + } catch (error) { + logReportError('Failed to end meter collect stream', error); + resolve(); + } } } catch (error) { logReportError('Failed to report runtime meter data', error); - ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); + this.reportGrpcError(error); resolve(); } }); } + private reportGrpcError(error: unknown): void { + if (this.closed) { + return; + } + + this.channelManager?.reportError(error); + } + flush(): Promise | null { + if (this.closed) { + return null; + } this.collectSample(); return this.reportBufferedMetrics(); } shutdown(): void { + this.closed = true; if (this.collectTimer) { clearInterval(this.collectTimer); this.collectTimer = undefined; @@ -168,6 +216,7 @@ export default class MeterSender implements BootService, GRPCChannelListener { this.reporterClient = undefined; this.buffer.length = 0; this.collector.destroy(); + this.channelManager = undefined; logger.info('MeterSender destroyed and resources cleaned up'); } } diff --git a/src/agent/core/remote/GRPCChannelManager.ts b/src/agent/core/remote/GRPCChannelManager.ts index 79d18cf..a8f8f00 100644 --- a/src/agent/core/remote/GRPCChannelManager.ts +++ b/src/agent/core/remote/GRPCChannelManager.ts @@ -65,11 +65,19 @@ export default class GRPCChannelManager implements BootService { } getChannel(): grpc.Channel { - return this.managedChannel!.getChannel(); + if (!this.managedChannel) { + throw new Error('gRPC channel is not available'); + } + + return this.managedChannel.getChannel(); } getClientOptions(): ClientOptions { - return this.managedChannel!.getClientOptions(); + if (!this.managedChannel) { + throw new Error('gRPC channel is not available'); + } + + return this.managedChannel.getClientOptions(); } isConnected(): boolean { @@ -87,13 +95,25 @@ export default class GRPCChannelManager implements BootService { return Number.MAX_SAFE_INTEGER; } - /** Notify DISCONNECT on network errors so periodic work stops until grpc-js reconnects. */ + /** Align local status with grpc-js connectivity; avoid permanent DISCONNECT while channel stays READY. */ reportError(error: unknown): void { if (!isGrpcNetworkError(error)) { logger.debug('gRPC report error (ignored): %s', error); return; } + const managed = this.managedChannel; + if (!managed || this.closed) { + this.notify(GRPCChannelStatus.DISCONNECT); + return; + } + + if (managed.isConnected(false)) { + logger.debug('gRPC network error but channel still connected: %s', error); + this.notify(GRPCChannelStatus.CONNECTED); + return; + } + logger.debug('gRPC network error, notify DISCONNECT: %s', error); this.notify(GRPCChannelStatus.DISCONNECT); } @@ -128,6 +148,7 @@ export default class GRPCChannelManager implements BootService { this.managedChannel = null; managed?.shutdownNow(); this.notify(GRPCChannelStatus.DISCONNECT); + this.listeners.length = 0; } private watchConnectivityState(): void { diff --git a/src/agent/core/remote/ServiceManagementClient.ts b/src/agent/core/remote/ServiceManagementClient.ts index 098c04e..a558caf 100644 --- a/src/agent/core/remote/ServiceManagementClient.ts +++ b/src/agent/core/remote/ServiceManagementClient.ts @@ -35,14 +35,21 @@ const logger = createLogger(__filename); const logHeartbeatError = throttled(logger, 'error', 30000); export default class ServiceManagementClient implements BootService, GRPCChannelListener { + private closed = false; + private channelManager?: GRPCChannelManager; private status = GRPCChannelStatus.DISCONNECT; private managementServiceClient?: ManagementServiceClient; private heartbeatTimer?: NodeJS.Timeout; private keepAlivePkg?: InstancePingPkg; private instanceProperties?: InstanceProperties; + private sendPropertiesCounter = 0; + + /** Same default as Java Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR (10). */ + private static readonly PROPERTIES_REPORT_PERIOD_FACTOR = 10; prepare(): void { - ServiceManager.INSTANCE.findService(GRPCChannelManager)!.addChannelListener(this); + this.channelManager = ServiceManager.INSTANCE.findService(GRPCChannelManager); + this.channelManager?.addChannelListener(this); } boot(): void { @@ -67,17 +74,20 @@ export default class ServiceManagementClient implements BootService, GRPCChannel new KeyStringValuePair().setKey('Process No.').setValue(`${process.pid}`), ]); - this.heartbeatTimer = setInterval(() => this.sendHeartbeat(), 20000).unref(); + this.heartbeatTimer = setInterval(() => this.sendHeartbeat(), 20000) as NodeJS.Timeout; + this.heartbeatTimer.unref(); } onComplete(): void {} shutdown(): void { + this.closed = true; if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = undefined; } this.managementServiceClient = undefined; + this.channelManager = undefined; logger.info('ServiceManagementClient destroyed and resources cleaned up'); } @@ -91,39 +101,61 @@ export default class ServiceManagementClient implements BootService, GRPCChannel } private sendHeartbeat(): void { - if (this.status !== GRPCChannelStatus.CONNECTED || !this.managementServiceClient) { + if ( + this.closed || + this.status !== GRPCChannelStatus.CONNECTED || + !this.managementServiceClient || + !this.instanceProperties || + !this.keepAlivePkg + ) { return; } const options = { deadline: Date.now() + config.traceTimeout }; + const reportProperties = + Math.abs(this.sendPropertiesCounter++) % ServiceManagementClient.PROPERTIES_REPORT_PERIOD_FACTOR === 0; + + if (reportProperties) { + this.managementServiceClient.reportInstanceProperties( + this.instanceProperties, + new grpc.Metadata(), + options, + (error) => { + if (error) { + logHeartbeatError('Failed to send instance properties', error); + this.reportGrpcError(error); + } + }, + ); + return; + } - this.managementServiceClient.reportInstanceProperties( - this.instanceProperties!, - new grpc.Metadata(), - options, - (error) => { - if (error) { - logHeartbeatError('Failed to send heartbeat', error); - ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); - } - }, - ); - this.managementServiceClient.keepAlive(this.keepAlivePkg!, new grpc.Metadata(), options, (error) => { + this.managementServiceClient.keepAlive(this.keepAlivePkg, new grpc.Metadata(), options, (error) => { if (error) { logHeartbeatError('Failed to send heartbeat', error); - ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); + this.reportGrpcError(error); } }); } - private createManagementClient(): ManagementServiceClient { - const channelManager = ServiceManager.INSTANCE.findService(GRPCChannelManager)!; + private createManagementClient(): ManagementServiceClient | undefined { + if (!this.channelManager) { + return undefined; + } + return new ManagementServiceClient( config.collectorAddress, grpc.credentials.createInsecure(), - channelManager.getClientOptions(), + this.channelManager.getClientOptions(), ); } + private reportGrpcError(error: unknown): void { + if (this.closed) { + return; + } + + this.channelManager?.reportError(error); + } flush(): Promise | null { logger.warn('ServiceManagementClient does not need flush().'); diff --git a/src/agent/core/remote/TraceSegmentServiceClient.ts b/src/agent/core/remote/TraceSegmentServiceClient.ts index 0841c3b..3f1812d 100644 --- a/src/agent/core/remote/TraceSegmentServiceClient.ts +++ b/src/agent/core/remote/TraceSegmentServiceClient.ts @@ -34,17 +34,28 @@ const logReportError = throttled(logger, 'error', 30000); const logBufferFull = throttled(logger, 'warn', 30000); export default class TraceSegmentServiceClient implements BootService, GRPCChannelListener { + private closed = false; + private channelManager?: GRPCChannelManager; private status = GRPCChannelStatus.DISCONNECT; private reporterClient?: TraceSegmentReportServiceClient; private readonly buffer: Segment[] = []; private timeout?: NodeJS.Timeout; private reporting?: Promise; - private segmentFinishedListener!: (segment: Segment) => void; + private segmentFinishedListener?: (segment: Segment) => void; prepare(): void { - ServiceManager.INSTANCE.findService(GRPCChannelManager)!.addChannelListener(this); + this.channelManager = ServiceManager.INSTANCE.findService(GRPCChannelManager); + this.channelManager?.addChannelListener(this); + + if (this.segmentFinishedListener) { + emitter.off('segment-finished', this.segmentFinishedListener); + } this.segmentFinishedListener = (segment: Segment) => { + if (this.closed) { + return; + } + if (this.buffer.length >= config.maxBufferSize) { logBufferFull( `Trace buffer reached maximum size (${config.maxBufferSize}); discarding oldest segments. The collector at ${config.collectorAddress} is likely unreachable.`, @@ -66,6 +77,7 @@ export default class TraceSegmentServiceClient implements BootService, GRPCChann onComplete(): void {} shutdown(): void { + this.closed = true; if (this.segmentFinishedListener) { emitter.off('segment-finished', this.segmentFinishedListener); } @@ -78,6 +90,7 @@ export default class TraceSegmentServiceClient implements BootService, GRPCChann this.reporting = undefined; this.reporterClient = undefined; this.buffer.length = 0; + this.channelManager = undefined; logger.info('TraceSegmentServiceClient destroyed and resources cleaned up'); } @@ -90,28 +103,38 @@ export default class TraceSegmentServiceClient implements BootService, GRPCChann this.reporterClient = status === GRPCChannelStatus.CONNECTED ? this.createReporterClient() : undefined; } - private createReporterClient(): TraceSegmentReportServiceClient { - const channelManager = ServiceManager.INSTANCE.findService(GRPCChannelManager)!; + private createReporterClient(): TraceSegmentReportServiceClient | undefined { + if (!this.channelManager) { + return undefined; + } + return new TraceSegmentReportServiceClient( config.collectorAddress, grpc.credentials.createInsecure(), - channelManager.getClientOptions(), + this.channelManager.getClientOptions(), ); } private scheduleNextReport(): void { - if (this.timeout) { + if (this.closed || this.timeout) { return; } this.timeout = setTimeout(() => { this.timeout = undefined; + if (this.closed) { + return; + } void this.reportOnce().finally(() => this.scheduleNextReport()); }, 1000) as unknown as NodeJS.Timeout; this.timeout.unref(); } private reportOnce(): Promise { + if (this.closed) { + return Promise.resolve(); + } + if (this.reporting) { return this.reporting; } @@ -125,6 +148,11 @@ export default class TraceSegmentServiceClient implements BootService, GRPCChann private doReport(): Promise { return new Promise((resolve) => { + if (this.closed) { + resolve(); + return; + } + emitter.emit('segments-sent'); if (this.buffer.length === 0) { @@ -137,19 +165,20 @@ export default class TraceSegmentServiceClient implements BootService, GRPCChann return; } - const stream = this.reporterClient.collect( - new grpc.Metadata(), - { deadline: Date.now() + config.traceTimeout }, - (error) => { - if (error) { - logReportError('Failed to report trace data', error); - ServiceManager.INSTANCE.findService(GRPCChannelManager)!.reportError(error); - } - resolve(); - }, - ); - + let stream: ReturnType | undefined; try { + stream = this.reporterClient.collect( + new grpc.Metadata(), + { deadline: Date.now() + config.traceTimeout }, + (error) => { + if (error) { + logReportError('Failed to report trace data', error); + this.reportGrpcError(error); + } + resolve(); + }, + ); + for (const segment of this.buffer) { if (segment) { if (logger._isDebugEnabled) { @@ -158,15 +187,35 @@ export default class TraceSegmentServiceClient implements BootService, GRPCChann stream.write(segment.transform()); } } + } catch (error) { + logReportError('Failed to report trace data', error); + this.reportGrpcError(error); + resolve(); } finally { this.buffer.length = 0; + try { + stream?.end(); + } catch (error) { + logReportError('Failed to end trace collect stream', error); + resolve(); + } } - - stream.end(); }); } + private reportGrpcError(error: unknown): void { + if (this.closed) { + return; + } + + this.channelManager?.reportError(error); + } + flush(): Promise | null { + if (this.closed) { + return null; + } + if (this.timeout) { clearTimeout(this.timeout); this.timeout = undefined; From 9ddd6a5044ad445c4db50ef664f9efa429826362 Mon Sep 17 00:00:00 2001 From: songzhendong <289505773@qq.com> Date: Sun, 28 Jun 2026 10:05:19 +0800 Subject: [PATCH 5/7] Fix PR #139 round-3 review: deprecated runtime metric config path. Normalize programmatic deprecated aliases before merging user options into the singleton config without overwriting defaults with undefined, apply deprecated fields during finalizeConfig for direct config mutation, and add unit tests for alias mapping and agent.start() disable path. Signed-off-by: songzhendong <289505773@qq.com> --- src/config/AgentConfig.ts | 83 +++++++++++++----- src/index.ts | 4 +- tests/config/AgentConfig.test.ts | 146 +++++++++++++++++++++++++++++++ 3 files changed, 210 insertions(+), 23 deletions(-) create mode 100644 tests/config/AgentConfig.test.ts diff --git a/src/config/AgentConfig.ts b/src/config/AgentConfig.ts index 4c79cf1..c59c9b7 100644 --- a/src/config/AgentConfig.ts +++ b/src/config/AgentConfig.ts @@ -65,34 +65,75 @@ export type AgentConfig = { nvmJvmMetricsBufferSize?: number; }; -function applyDeprecatedRuntimeMetricConfig(config: AgentConfig): void { - if (config.runtimeMetricsReporterActive === undefined) { - if (config.nvmMetricsReporterActive !== undefined) { - config.runtimeMetricsReporterActive = config.nvmMetricsReporterActive; - } else if (config.nvmJvmReporterActive !== undefined) { - config.runtimeMetricsReporterActive = config.nvmJvmReporterActive; +export function normalizeDeprecatedRuntimeMetricOptions(options: AgentConfig): AgentConfig { + const normalized = { ...options }; + + if (normalized.runtimeMetricsReporterActive === undefined) { + const reporterActive = normalized.nvmMetricsReporterActive ?? normalized.nvmJvmReporterActive; + if (reporterActive !== undefined) { + normalized.runtimeMetricsReporterActive = reporterActive; } + } else { + delete normalized.nvmMetricsReporterActive; + delete normalized.nvmJvmReporterActive; } - if (config.runtimeMetricsCollectPeriod === undefined) { - if (config.nvmMetricsCollectPeriod !== undefined) { - config.runtimeMetricsCollectPeriod = config.nvmMetricsCollectPeriod; - } else if (config.nvmJvmMetricsCollectPeriod !== undefined) { - config.runtimeMetricsCollectPeriod = config.nvmJvmMetricsCollectPeriod; + + if (normalized.runtimeMetricsCollectPeriod === undefined) { + const collectPeriod = normalized.nvmMetricsCollectPeriod ?? normalized.nvmJvmMetricsCollectPeriod; + if (collectPeriod !== undefined) { + normalized.runtimeMetricsCollectPeriod = collectPeriod; } + } else { + delete normalized.nvmMetricsCollectPeriod; + delete normalized.nvmJvmMetricsCollectPeriod; } - if (config.runtimeMetricsReportPeriod === undefined) { - if (config.nvmMetricsReportPeriod !== undefined) { - config.runtimeMetricsReportPeriod = config.nvmMetricsReportPeriod; - } else if (config.nvmJvmMetricsReportPeriod !== undefined) { - config.runtimeMetricsReportPeriod = config.nvmJvmMetricsReportPeriod; + + if (normalized.runtimeMetricsReportPeriod === undefined) { + const reportPeriod = normalized.nvmMetricsReportPeriod ?? normalized.nvmJvmMetricsReportPeriod; + if (reportPeriod !== undefined) { + normalized.runtimeMetricsReportPeriod = reportPeriod; } + } else { + delete normalized.nvmMetricsReportPeriod; + delete normalized.nvmJvmMetricsReportPeriod; } - if (config.runtimeMetricsBufferSize === undefined) { - if (config.nvmMetricsBufferSize !== undefined) { - config.runtimeMetricsBufferSize = config.nvmMetricsBufferSize; - } else if (config.nvmJvmMetricsBufferSize !== undefined) { - config.runtimeMetricsBufferSize = config.nvmJvmMetricsBufferSize; + + if (normalized.runtimeMetricsBufferSize === undefined) { + const bufferSize = normalized.nvmMetricsBufferSize ?? normalized.nvmJvmMetricsBufferSize; + if (bufferSize !== undefined) { + normalized.runtimeMetricsBufferSize = bufferSize; } + } else { + delete normalized.nvmMetricsBufferSize; + delete normalized.nvmJvmMetricsBufferSize; + } + + return normalized; +} + +function applyDeprecatedRuntimeMetricConfig(config: AgentConfig): void { + if (config.nvmMetricsReporterActive !== undefined) { + config.runtimeMetricsReporterActive = config.nvmMetricsReporterActive; + } else if (config.nvmJvmReporterActive !== undefined) { + config.runtimeMetricsReporterActive = config.nvmJvmReporterActive; + } + + if (config.nvmMetricsCollectPeriod !== undefined) { + config.runtimeMetricsCollectPeriod = config.nvmMetricsCollectPeriod; + } else if (config.nvmJvmMetricsCollectPeriod !== undefined) { + config.runtimeMetricsCollectPeriod = config.nvmJvmMetricsCollectPeriod; + } + + if (config.nvmMetricsReportPeriod !== undefined) { + config.runtimeMetricsReportPeriod = config.nvmMetricsReportPeriod; + } else if (config.nvmJvmMetricsReportPeriod !== undefined) { + config.runtimeMetricsReportPeriod = config.nvmJvmMetricsReportPeriod; + } + + if (config.nvmMetricsBufferSize !== undefined) { + config.runtimeMetricsBufferSize = config.nvmMetricsBufferSize; + } else if (config.nvmJvmMetricsBufferSize !== undefined) { + config.runtimeMetricsBufferSize = config.nvmJvmMetricsBufferSize; } } diff --git a/src/index.ts b/src/index.ts index e32a638..9b1af95 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,7 +17,7 @@ * */ -import config, { AgentConfig, finalizeConfig } from './config/AgentConfig'; +import config, { AgentConfig, finalizeConfig, normalizeDeprecatedRuntimeMetricOptions } from './config/AgentConfig'; import ServiceManager from './agent/core/boot/ServiceManager'; import { createLogger } from './logging'; import PluginInstaller from './core/PluginInstaller'; @@ -39,7 +39,7 @@ class Agent { return; } - Object.assign(config, options); + Object.assign(config, normalizeDeprecatedRuntimeMetricOptions(options)); finalizeConfig(config); logger.debug('Starting SkyWalking agent'); diff --git a/tests/config/AgentConfig.test.ts b/tests/config/AgentConfig.test.ts new file mode 100644 index 0000000..ee8670e --- /dev/null +++ b/tests/config/AgentConfig.test.ts @@ -0,0 +1,146 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* eslint-env jest */ + +const registeredServiceNames = new Set(); + +jest.mock('../../src/agent/core/boot/ServiceManager', () => ({ + __esModule: true, + default: { + INSTANCE: { + boot: jest.fn(() => { + registeredServiceNames.clear(); + registeredServiceNames.add('GRPCChannelManager'); + registeredServiceNames.add('TraceSegmentServiceClient'); + registeredServiceNames.add('ServiceManagementClient'); + const { default: agentConfig } = jest.requireActual('../../src/config/AgentConfig') as { + default: { runtimeMetricsReporterActive?: boolean }; + }; + if (agentConfig.runtimeMetricsReporterActive) { + registeredServiceNames.add('MeterSender'); + } + }), + shutdown: jest.fn(() => { + registeredServiceNames.clear(); + }), + flush: jest.fn(), + findService: jest.fn((serviceClass: { name: string }) => + registeredServiceNames.has(serviceClass.name) ? {} : undefined, + ), + }, + }, +})); + +jest.mock('../../src/core/PluginInstaller', () => ({ + __esModule: true, + default: jest.fn().mockImplementation(() => ({ + install: jest.fn(), + })), +})); + +import agent, { config } from '../../src/index'; +import ServiceManager from '../../src/agent/core/boot/ServiceManager'; +import MeterSender from '../../src/agent/core/meter/MeterSender'; +import { AgentConfig, normalizeDeprecatedRuntimeMetricOptions } from '../../src/config/AgentConfig'; + +function resetRuntimeMetricConfig(): void { + const mutableConfig = config as AgentConfig; + mutableConfig.runtimeMetricsReporterActive = true; + mutableConfig.runtimeMetricsCollectPeriod = 1000; + mutableConfig.runtimeMetricsReportPeriod = 1000; + mutableConfig.runtimeMetricsBufferSize = 600; + delete mutableConfig.nvmMetricsReporterActive; + delete mutableConfig.nvmJvmReporterActive; + delete mutableConfig.nvmMetricsCollectPeriod; + delete mutableConfig.nvmJvmMetricsCollectPeriod; + delete mutableConfig.nvmMetricsReportPeriod; + delete mutableConfig.nvmJvmMetricsReportPeriod; + delete mutableConfig.nvmMetricsBufferSize; + delete mutableConfig.nvmJvmMetricsBufferSize; +} + +describe('AgentConfig deprecated runtime metric options (unit)', () => { + afterEach(() => { + agent.destroy(); + resetRuntimeMetricConfig(); + }); + + it('maps deprecated programmatic aliases before merge', () => { + const normalized = normalizeDeprecatedRuntimeMetricOptions({ + nvmMetricsReporterActive: false, + nvmMetricsCollectPeriod: 2000, + nvmMetricsReportPeriod: 3000, + nvmMetricsBufferSize: 42, + }); + + expect(normalized.runtimeMetricsReporterActive).toBe(false); + expect(normalized.runtimeMetricsCollectPeriod).toBe(2000); + expect(normalized.runtimeMetricsReportPeriod).toBe(3000); + expect(normalized.runtimeMetricsBufferSize).toBe(42); + }); + + it('maps nvmJvm deprecated aliases before merge', () => { + const normalized = normalizeDeprecatedRuntimeMetricOptions({ + nvmJvmReporterActive: false, + nvmJvmMetricsCollectPeriod: 2222, + nvmJvmMetricsReportPeriod: 3333, + nvmJvmMetricsBufferSize: 44, + }); + + expect(normalized.runtimeMetricsReporterActive).toBe(false); + expect(normalized.runtimeMetricsCollectPeriod).toBe(2222); + expect(normalized.runtimeMetricsReportPeriod).toBe(3333); + expect(normalized.runtimeMetricsBufferSize).toBe(44); + }); + + it('keeps explicit canonical options over deprecated aliases', () => { + const normalized = normalizeDeprecatedRuntimeMetricOptions({ + runtimeMetricsReporterActive: true, + nvmMetricsReporterActive: false, + }); + + expect(normalized.runtimeMetricsReporterActive).toBe(true); + expect(normalized.nvmMetricsReporterActive).toBeUndefined(); + expect(normalized.nvmJvmReporterActive).toBeUndefined(); + }); + + it('disables runtime metrics when agent.start receives nvmMetrics alias', () => { + agent.start({ nvmMetricsReporterActive: false }); + + expect(config.runtimeMetricsReporterActive).toBe(false); + expect(ServiceManager.INSTANCE.findService(MeterSender)).toBeUndefined(); + }); + + it('disables runtime metrics when agent.start receives nvmJvm alias', () => { + agent.start({ nvmJvmReporterActive: false }); + + expect(config.runtimeMetricsReporterActive).toBe(false); + expect(ServiceManager.INSTANCE.findService(MeterSender)).toBeUndefined(); + }); + + it('disables runtime metrics when deprecated alias is set on exported config', () => { + (config as AgentConfig).nvmMetricsReporterActive = false; + + agent.start(); + + expect(config.runtimeMetricsReporterActive).toBe(false); + expect(ServiceManager.INSTANCE.findService(MeterSender)).toBeUndefined(); + }); +}); From 2706f7c03e239504b06476ba0ed593e3397854ce Mon Sep 17 00:00:00 2001 From: songzhendong <289505773@qq.com> Date: Sun, 28 Jun 2026 11:49:33 +0800 Subject: [PATCH 6/7] Fix PR #139 round-4 review: clear stale deprecated runtime metric aliases. Strip deprecated runtime metric fields after alias promotion in normalizeDeprecatedRuntimeMetricOptions(), respect explicit canonical options on restart in applyDeprecatedRuntimeMetricConfig(), and add regression tests for destroy/start cycles. Signed-off-by: songzhendong <289505773@qq.com> --- src/config/AgentConfig.ts | 79 +++++++++++++++++++------------- src/index.ts | 2 +- tests/config/AgentConfig.test.ts | 24 ++++++++++ 3 files changed, 73 insertions(+), 32 deletions(-) diff --git a/src/config/AgentConfig.ts b/src/config/AgentConfig.ts index c59c9b7..54d8a34 100644 --- a/src/config/AgentConfig.ts +++ b/src/config/AgentConfig.ts @@ -73,72 +73,89 @@ export function normalizeDeprecatedRuntimeMetricOptions(options: AgentConfig): A if (reporterActive !== undefined) { normalized.runtimeMetricsReporterActive = reporterActive; } - } else { - delete normalized.nvmMetricsReporterActive; - delete normalized.nvmJvmReporterActive; } + delete normalized.nvmMetricsReporterActive; + delete normalized.nvmJvmReporterActive; if (normalized.runtimeMetricsCollectPeriod === undefined) { const collectPeriod = normalized.nvmMetricsCollectPeriod ?? normalized.nvmJvmMetricsCollectPeriod; if (collectPeriod !== undefined) { normalized.runtimeMetricsCollectPeriod = collectPeriod; } - } else { - delete normalized.nvmMetricsCollectPeriod; - delete normalized.nvmJvmMetricsCollectPeriod; } + delete normalized.nvmMetricsCollectPeriod; + delete normalized.nvmJvmMetricsCollectPeriod; if (normalized.runtimeMetricsReportPeriod === undefined) { const reportPeriod = normalized.nvmMetricsReportPeriod ?? normalized.nvmJvmMetricsReportPeriod; if (reportPeriod !== undefined) { normalized.runtimeMetricsReportPeriod = reportPeriod; } - } else { - delete normalized.nvmMetricsReportPeriod; - delete normalized.nvmJvmMetricsReportPeriod; } + delete normalized.nvmMetricsReportPeriod; + delete normalized.nvmJvmMetricsReportPeriod; if (normalized.runtimeMetricsBufferSize === undefined) { const bufferSize = normalized.nvmMetricsBufferSize ?? normalized.nvmJvmMetricsBufferSize; if (bufferSize !== undefined) { normalized.runtimeMetricsBufferSize = bufferSize; } - } else { - delete normalized.nvmMetricsBufferSize; - delete normalized.nvmJvmMetricsBufferSize; } + delete normalized.nvmMetricsBufferSize; + delete normalized.nvmJvmMetricsBufferSize; return normalized; } -function applyDeprecatedRuntimeMetricConfig(config: AgentConfig): void { - if (config.nvmMetricsReporterActive !== undefined) { - config.runtimeMetricsReporterActive = config.nvmMetricsReporterActive; - } else if (config.nvmJvmReporterActive !== undefined) { - config.runtimeMetricsReporterActive = config.nvmJvmReporterActive; +function clearDeprecatedRuntimeMetricFields(config: AgentConfig): void { + delete config.nvmMetricsReporterActive; + delete config.nvmJvmReporterActive; + delete config.nvmMetricsCollectPeriod; + delete config.nvmJvmMetricsCollectPeriod; + delete config.nvmMetricsReportPeriod; + delete config.nvmJvmMetricsReportPeriod; + delete config.nvmMetricsBufferSize; + delete config.nvmJvmMetricsBufferSize; +} + +function applyDeprecatedRuntimeMetricConfig(config: AgentConfig, options: AgentConfig = {}): void { + if (options.runtimeMetricsReporterActive === undefined) { + if (config.nvmMetricsReporterActive !== undefined) { + config.runtimeMetricsReporterActive = config.nvmMetricsReporterActive; + } else if (config.nvmJvmReporterActive !== undefined) { + config.runtimeMetricsReporterActive = config.nvmJvmReporterActive; + } } - if (config.nvmMetricsCollectPeriod !== undefined) { - config.runtimeMetricsCollectPeriod = config.nvmMetricsCollectPeriod; - } else if (config.nvmJvmMetricsCollectPeriod !== undefined) { - config.runtimeMetricsCollectPeriod = config.nvmJvmMetricsCollectPeriod; + if (options.runtimeMetricsCollectPeriod === undefined) { + if (config.nvmMetricsCollectPeriod !== undefined) { + config.runtimeMetricsCollectPeriod = config.nvmMetricsCollectPeriod; + } else if (config.nvmJvmMetricsCollectPeriod !== undefined) { + config.runtimeMetricsCollectPeriod = config.nvmJvmMetricsCollectPeriod; + } } - if (config.nvmMetricsReportPeriod !== undefined) { - config.runtimeMetricsReportPeriod = config.nvmMetricsReportPeriod; - } else if (config.nvmJvmMetricsReportPeriod !== undefined) { - config.runtimeMetricsReportPeriod = config.nvmJvmMetricsReportPeriod; + if (options.runtimeMetricsReportPeriod === undefined) { + if (config.nvmMetricsReportPeriod !== undefined) { + config.runtimeMetricsReportPeriod = config.nvmMetricsReportPeriod; + } else if (config.nvmJvmMetricsReportPeriod !== undefined) { + config.runtimeMetricsReportPeriod = config.nvmJvmMetricsReportPeriod; + } } - if (config.nvmMetricsBufferSize !== undefined) { - config.runtimeMetricsBufferSize = config.nvmMetricsBufferSize; - } else if (config.nvmJvmMetricsBufferSize !== undefined) { - config.runtimeMetricsBufferSize = config.nvmJvmMetricsBufferSize; + if (options.runtimeMetricsBufferSize === undefined) { + if (config.nvmMetricsBufferSize !== undefined) { + config.runtimeMetricsBufferSize = config.nvmMetricsBufferSize; + } else if (config.nvmJvmMetricsBufferSize !== undefined) { + config.runtimeMetricsBufferSize = config.nvmJvmMetricsBufferSize; + } } + + clearDeprecatedRuntimeMetricFields(config); } -export function finalizeConfig(config: AgentConfig): void { - applyDeprecatedRuntimeMetricConfig(config); +export function finalizeConfig(config: AgentConfig, options: AgentConfig = {}): void { + applyDeprecatedRuntimeMetricConfig(config, options); const escapeRegExp = (s: string) => s.replace(/([.*+?^=!:${}()|\[\]\/\\])/g, '\\$1'); diff --git a/src/index.ts b/src/index.ts index 9b1af95..33d67a2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -40,7 +40,7 @@ class Agent { } Object.assign(config, normalizeDeprecatedRuntimeMetricOptions(options)); - finalizeConfig(config); + finalizeConfig(config, options); logger.debug('Starting SkyWalking agent'); diff --git a/tests/config/AgentConfig.test.ts b/tests/config/AgentConfig.test.ts index ee8670e..08b7fd3 100644 --- a/tests/config/AgentConfig.test.ts +++ b/tests/config/AgentConfig.test.ts @@ -143,4 +143,28 @@ describe('AgentConfig deprecated runtime metric options (unit)', () => { expect(config.runtimeMetricsReporterActive).toBe(false); expect(ServiceManager.INSTANCE.findService(MeterSender)).toBeUndefined(); }); + it('maps deprecated aliases without leaving stale alias keys on normalized options', () => { + const normalized = normalizeDeprecatedRuntimeMetricOptions({ + nvmMetricsReporterActive: false, + }); + + expect(normalized.runtimeMetricsReporterActive).toBe(false); + expect(normalized.nvmMetricsReporterActive).toBeUndefined(); + expect(normalized.nvmJvmReporterActive).toBeUndefined(); + }); + + it('re-enables runtime metrics after destroy/start with canonical option', () => { + agent.start({ nvmMetricsReporterActive: false }); + + expect(config.runtimeMetricsReporterActive).toBe(false); + expect((config as AgentConfig).nvmMetricsReporterActive).toBeUndefined(); + + agent.destroy(); + + agent.start({ runtimeMetricsReporterActive: true }); + + expect(config.runtimeMetricsReporterActive).toBe(true); + expect((config as AgentConfig).nvmMetricsReporterActive).toBeUndefined(); + expect(ServiceManager.INSTANCE.findService(MeterSender)).toBeDefined(); + }); }); From 25bb39929e7f4c41cebfa1993bd22aa28f30921d Mon Sep 17 00:00:00 2001 From: songzhendong <289505773@qq.com> Date: Sun, 28 Jun 2026 14:04:40 +0800 Subject: [PATCH 7/7] Fix GRPCChannelManager initial connectivity notification on boot. When the shared channel is already READY before the first state transition, listeners now receive CONNECTED immediately instead of staying DISCONNECT until a later connectivity change. Add unit regression test for the already-READY boot path. --- src/agent/core/remote/GRPCChannelManager.ts | 15 +++- tests/remote/GRPCChannelManager.test.ts | 85 +++++++++++++++++++++ 2 files changed, 98 insertions(+), 2 deletions(-) create mode 100644 tests/remote/GRPCChannelManager.test.ts diff --git a/src/agent/core/remote/GRPCChannelManager.ts b/src/agent/core/remote/GRPCChannelManager.ts index a8f8f00..5eb2fed 100644 --- a/src/agent/core/remote/GRPCChannelManager.ts +++ b/src/agent/core/remote/GRPCChannelManager.ts @@ -138,6 +138,7 @@ export default class GRPCChannelManager implements BootService { .build(); this.watchConnectivityState(); + this.notifyCurrentConnectivityState(true); } onComplete(): void {} @@ -170,12 +171,22 @@ export default class GRPCChannelManager implements BootService { return; } - const ready = channel.getConnectivityState(false) === grpc.connectivityState.READY; - this.notify(ready ? GRPCChannelStatus.CONNECTED : GRPCChannelStatus.DISCONNECT); + this.notifyCurrentConnectivityState(false); this.watchConnectivityState(); }); } + private notifyCurrentConnectivityState(requestConnection: boolean): void { + const managed = this.managedChannel; + if (this.closed || !managed) { + return; + } + + const channel = managed.getChannel(); + const ready = channel.getConnectivityState(requestConnection) === grpc.connectivityState.READY; + this.notify(ready ? GRPCChannelStatus.CONNECTED : GRPCChannelStatus.DISCONNECT); + } + private notify(status: GRPCChannelStatus): void { if (this.lastStatus === status) { return; diff --git a/tests/remote/GRPCChannelManager.test.ts b/tests/remote/GRPCChannelManager.test.ts new file mode 100644 index 0000000..ad6f16e --- /dev/null +++ b/tests/remote/GRPCChannelManager.test.ts @@ -0,0 +1,85 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* eslint-env jest */ + +import * as grpc from '@grpc/grpc-js'; +import GRPCChannelManager from '../../src/agent/core/remote/GRPCChannelManager'; +import { GRPCChannelStatus } from '../../src/agent/core/remote/GRPCChannelStatus'; + +const mockShutdownNow = jest.fn(); +const mockGetConnectivityState = jest.fn(); +const mockWatchConnectivityState = jest.fn(); + +jest.mock('../../src/agent/core/remote/GRPCChannel', () => ({ + __esModule: true, + default: { + newBuilder: jest.fn(() => ({ + addManagedChannelBuilder: jest.fn().mockReturnThis(), + addChannelDecorator: jest.fn().mockReturnThis(), + build: jest.fn(() => ({ + getChannel: () => ({ + getConnectivityState: mockGetConnectivityState, + watchConnectivityState: mockWatchConnectivityState, + }), + getClientOptions: () => ({ channelOverride: {} }), + isConnected: jest.fn(() => true), + shutdownNow: mockShutdownNow, + })), + })), + }, +})); + +jest.mock('../../src/config/AgentConfig', () => ({ + __esModule: true, + default: { + collectorAddress: '127.0.0.1:11800', + }, +})); + +describe('GRPCChannelManager initial connectivity', () => { + beforeEach(() => { + jest.clearAllMocks(); + mockWatchConnectivityState.mockImplementation(() => undefined); + }); + + it('notifies CONNECTED when channel is already READY at boot', () => { + mockGetConnectivityState.mockReturnValue(grpc.connectivityState.READY); + + const listener = { statusChanged: jest.fn() }; + const manager = new GRPCChannelManager(); + + manager.addChannelListener(listener); + manager.boot(); + + expect(listener.statusChanged).toHaveBeenCalledWith(GRPCChannelStatus.CONNECTED); + }); + + it('notifies DISCONNECT when channel is not READY at boot', () => { + mockGetConnectivityState.mockReturnValue(grpc.connectivityState.CONNECTING); + + const listener = { statusChanged: jest.fn() }; + const manager = new GRPCChannelManager(); + + manager.addChannelListener(listener); + manager.boot(); + + expect(listener.statusChanged).toHaveBeenCalledWith(GRPCChannelStatus.DISCONNECT); + }); +});