From e7031420f6ba21fb33580a824714c9656e12854c Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Thu, 25 Jun 2026 12:27:32 +0530 Subject: [PATCH] pai: add kubeletstats receiver in log DaemonSet, drop prometheus/kubelet duplicate --- .../controller/parseableconfig_controller.go | 95 ++++++++++++------- 1 file changed, 61 insertions(+), 34 deletions(-) diff --git a/internal/controller/parseableconfig_controller.go b/internal/controller/parseableconfig_controller.go index 3b17844..345e07e 100644 --- a/internal/controller/parseableconfig_controller.go +++ b/internal/controller/parseableconfig_controller.go @@ -457,9 +457,15 @@ func (r *ParseableConfigReconciler) ensureLogCollector(ctx context.Context, conf } } - if !podLogsEnabled && !hasFiles { + kubeletstatsEnabled := config.Spec.Metrics != nil && + config.Spec.Metrics.ClusterMetrics != nil && + config.Spec.Metrics.ClusterMetrics.Kubelet != nil && + config.Spec.Metrics.ClusterMetrics.Kubelet.Enabled && + config.Spec.Metrics.ClusterMetrics.TargetDataset != "" + + if !podLogsEnabled && !hasFiles && !kubeletstatsEnabled { if err == nil { - logger.Info("Logs not configured, deleting log collector") + logger.Info("Logs and kubeletstats not configured, deleting log collector") if delErr := r.Delete(ctx, existing); delErr != nil && !errors.IsNotFound(delErr) { return fmt.Errorf("failed to delete log collector: %w", delErr) } @@ -517,6 +523,20 @@ func (r *ParseableConfigReconciler) ensureLogCollector(ctx context.Context, conf spec["volumes"] = volumes spec["volumeMounts"] = volumeMounts } + // kubeletstats receiver needs the local node name to target the right kubelet API + // (https://${env:K8S_NODE_NAME}:10250). Injected via downward API. + if kubeletstatsEnabled { + spec["env"] = []interface{}{ + map[string]interface{}{ + "name": "K8S_NODE_NAME", + "valueFrom": map[string]interface{}{ + "fieldRef": map[string]interface{}{ + "fieldPath": "spec.nodeName", + }, + }, + }, + } + } if err == nil { // Update existing @@ -666,6 +686,38 @@ func (r *ParseableConfigReconciler) buildLogCollectorConfig(ctx context.Context, } } + // Kubeletstats — node, pod, and container CPU/memory/network/filesystem metrics scraped + // directly from each node's local kubelet at https://${K8S_NODE_NAME}:10250/stats/summary. + // Runs in the log DaemonSet so each pod talks to its own node (avoids cross-node hops and + // kubelet TLS SAN mismatches). Requires nodes/stats RBAC (granted in ensureCollectorRBAC). + if config.Spec.Metrics != nil && + config.Spec.Metrics.ClusterMetrics != nil && + config.Spec.Metrics.ClusterMetrics.Kubelet != nil && + config.Spec.Metrics.ClusterMetrics.Kubelet.Enabled && + config.Spec.Metrics.ClusterMetrics.TargetDataset != "" { + + cm := config.Spec.Metrics.ClusterMetrics + receivers["kubeletstats"] = map[string]interface{}{ + "collection_interval": "30s", + "auth_type": "serviceAccount", + "endpoint": "https://${env:K8S_NODE_NAME}:10250", + "insecure_skip_verify": true, + } + exporters["otlphttp/clustermetrics"] = map[string]interface{}{ + "endpoint": endpoint, + "encoding": encoding, + "headers": r.buildExporterHeaders(basicAuth, "otel-metrics", cm.TargetDataset, tenantID, config.Spec.Target.Headers, nil), + } + // kubeletstats's /stats/summary response already carries k8s.namespace.name, + // k8s.pod.name, k8s.container.name, k8s.node.name as resource attributes, + // so the k8sattributes processor would be redundant here (matches sample YAML). + pipelines["metrics/kubeletstats"] = map[string]interface{}{ + "receivers": []interface{}{"kubeletstats"}, + "processors": []interface{}{"batch"}, + "exporters": []interface{}{"otlphttp/clustermetrics"}, + } + } + return map[string]interface{}{ "receivers": receivers, "processors": processors, @@ -821,38 +873,13 @@ func (r *ParseableConfigReconciler) buildMetricsEventsCollectorConfig( clusterReceivers = append(clusterReceivers, "k8s_cluster") } - if cm.Kubelet != nil && cm.Kubelet.Enabled { - receivers["prometheus/kubelet"] = map[string]interface{}{ - "config": map[string]interface{}{ - "scrape_configs": []interface{}{ - map[string]interface{}{ - "job_name": "kubelet", - "scrape_interval": "30s", - "scheme": "https", - "bearer_token_file": "/var/run/secrets/kubernetes.io/serviceaccount/token", - "tls_config": map[string]interface{}{ - "ca_file": "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", - "insecure_skip_verify": true, - }, - "kubernetes_sd_configs": []interface{}{ - map[string]interface{}{"role": "node"}, - }, - "relabel_configs": []interface{}{ - map[string]interface{}{ - "target_label": "__metrics_path__", - "replacement": "/metrics", - }, - map[string]interface{}{ - "source_labels": []interface{}{"__meta_kubernetes_node_name"}, - "target_label": "node", - }, - }, - }, - }, - }, - } - clusterReceivers = append(clusterReceivers, "prometheus/kubelet") - } + // NOTE: Kubelet metrics are sourced via the kubeletstats receiver in the log + // DaemonSet (see buildLogCollectorConfig). The DaemonSet placement lets each + // pod hit its own node's kubelet on localhost, which is required because + // kubelet TLS certs are SAN-bound to the node name. The metrics+events + // Deployment runs as a single pod and cannot reach every node's kubelet + // directly, so the kubelet receiver intentionally lives elsewhere. + _ = cm.Kubelet if cm.KubeState != nil && cm.KubeState.Enabled { ksNamespaces := cm.KubeState.Namespaces