Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 61 additions & 34 deletions internal/controller/parseableconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,15 @@ func (r *ParseableConfigReconciler) ensureLogCollector(ctx context.Context, conf
}
}

if !podLogsEnabled && !hasFiles {
kubeletstatsEnabled := config.Spec.Metrics != nil &&
config.Spec.Metrics.ClusterMetrics != nil &&
config.Spec.Metrics.ClusterMetrics.Kubelet != nil &&
config.Spec.Metrics.ClusterMetrics.Kubelet.Enabled &&
config.Spec.Metrics.ClusterMetrics.TargetDataset != ""

if !podLogsEnabled && !hasFiles && !kubeletstatsEnabled {
if err == nil {
logger.Info("Logs not configured, deleting log collector")
logger.Info("Logs and kubeletstats not configured, deleting log collector")
if delErr := r.Delete(ctx, existing); delErr != nil && !errors.IsNotFound(delErr) {
return fmt.Errorf("failed to delete log collector: %w", delErr)
}
Expand Down Expand Up @@ -517,6 +523,20 @@ func (r *ParseableConfigReconciler) ensureLogCollector(ctx context.Context, conf
spec["volumes"] = volumes
spec["volumeMounts"] = volumeMounts
}
// kubeletstats receiver needs the local node name to target the right kubelet API
// (https://${env:K8S_NODE_NAME}:10250). Injected via downward API.
if kubeletstatsEnabled {
spec["env"] = []interface{}{
map[string]interface{}{
"name": "K8S_NODE_NAME",
"valueFrom": map[string]interface{}{
"fieldRef": map[string]interface{}{
"fieldPath": "spec.nodeName",
},
},
},
}
}

if err == nil {
// Update existing
Expand Down Expand Up @@ -666,6 +686,38 @@ func (r *ParseableConfigReconciler) buildLogCollectorConfig(ctx context.Context,
}
}

// Kubeletstats — node, pod, and container CPU/memory/network/filesystem metrics scraped
// directly from each node's local kubelet at https://${K8S_NODE_NAME}:10250/stats/summary.
// Runs in the log DaemonSet so each pod talks to its own node (avoids cross-node hops and
// kubelet TLS SAN mismatches). Requires nodes/stats RBAC (granted in ensureCollectorRBAC).
if config.Spec.Metrics != nil &&
config.Spec.Metrics.ClusterMetrics != nil &&
config.Spec.Metrics.ClusterMetrics.Kubelet != nil &&
config.Spec.Metrics.ClusterMetrics.Kubelet.Enabled &&
config.Spec.Metrics.ClusterMetrics.TargetDataset != "" {

cm := config.Spec.Metrics.ClusterMetrics
receivers["kubeletstats"] = map[string]interface{}{
"collection_interval": "30s",
"auth_type": "serviceAccount",
"endpoint": "https://${env:K8S_NODE_NAME}:10250",
"insecure_skip_verify": true,
}
exporters["otlphttp/clustermetrics"] = map[string]interface{}{
"endpoint": endpoint,
"encoding": encoding,
"headers": r.buildExporterHeaders(basicAuth, "otel-metrics", cm.TargetDataset, tenantID, config.Spec.Target.Headers, nil),
}
// kubeletstats's /stats/summary response already carries k8s.namespace.name,
// k8s.pod.name, k8s.container.name, k8s.node.name as resource attributes,
// so the k8sattributes processor would be redundant here (matches sample YAML).
pipelines["metrics/kubeletstats"] = map[string]interface{}{
"receivers": []interface{}{"kubeletstats"},
"processors": []interface{}{"batch"},
"exporters": []interface{}{"otlphttp/clustermetrics"},
}
}

return map[string]interface{}{
"receivers": receivers,
"processors": processors,
Expand Down Expand Up @@ -821,38 +873,13 @@ func (r *ParseableConfigReconciler) buildMetricsEventsCollectorConfig(
clusterReceivers = append(clusterReceivers, "k8s_cluster")
}

if cm.Kubelet != nil && cm.Kubelet.Enabled {
receivers["prometheus/kubelet"] = map[string]interface{}{
"config": map[string]interface{}{
"scrape_configs": []interface{}{
map[string]interface{}{
"job_name": "kubelet",
"scrape_interval": "30s",
"scheme": "https",
"bearer_token_file": "/var/run/secrets/kubernetes.io/serviceaccount/token",
"tls_config": map[string]interface{}{
"ca_file": "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
"insecure_skip_verify": true,
},
"kubernetes_sd_configs": []interface{}{
map[string]interface{}{"role": "node"},
},
"relabel_configs": []interface{}{
map[string]interface{}{
"target_label": "__metrics_path__",
"replacement": "/metrics",
},
map[string]interface{}{
"source_labels": []interface{}{"__meta_kubernetes_node_name"},
"target_label": "node",
},
},
},
},
},
}
clusterReceivers = append(clusterReceivers, "prometheus/kubelet")
}
// NOTE: Kubelet metrics are sourced via the kubeletstats receiver in the log
// DaemonSet (see buildLogCollectorConfig). The DaemonSet placement lets each
// pod hit its own node's kubelet on localhost, which is required because
// kubelet TLS certs are SAN-bound to the node name. The metrics+events
// Deployment runs as a single pod and cannot reach every node's kubelet
// directly, so the kubelet receiver intentionally lives elsewhere.
_ = cm.Kubelet

if cm.KubeState != nil && cm.KubeState.Enabled {
ksNamespaces := cm.KubeState.Namespaces
Expand Down