diff --git a/go.mod b/go.mod index f0f2c3ed..8ab53f73 100644 --- a/go.mod +++ b/go.mod @@ -62,7 +62,7 @@ require ( golang.org/x/sys v0.46.0 golang.org/x/text v0.38.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20260622175928-b703f567277d - google.golang.org/grpc v1.81.1 + google.golang.org/grpc v1.82.0 google.golang.org/protobuf v1.36.12-0.20260120151049-f2248ac996af k8s.io/client-go v0.36.1 k8s.io/cri-api v0.36.1 diff --git a/go.sum b/go.sum index eff545b6..a2009a99 100644 --- a/go.sum +++ b/go.sum @@ -629,6 +629,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20260622175928-b703f567277d h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20260622175928-b703f567277d/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/grpc v1.81.1 h1:VnnIIZ88UzOOKLukQi+ImGz8O1Wdp8nAGGnvOfEIWQQ= google.golang.org/grpc v1.81.1/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I= +google.golang.org/grpc v1.82.0 h1:vguDnZUPjE26w09A63VoxZPnvPjB5Riyc0mkXPFmAIU= +google.golang.org/grpc v1.82.0/go.mod h1:yzTZ1TB1Z3SG+LIYaI+WiE8D5+PZ3ArnrSp8zF3+/ZA= google.golang.org/protobuf v1.36.12-0.20260120151049-f2248ac996af h1:+5/Sw3GsDNlEmu7TfklWKPdQ0Ykja5VEmq2i817+jbI= google.golang.org/protobuf v1.36.12-0.20260120151049-f2248ac996af/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go index 326888ae..7e3dbaad 100644 --- a/vendor/google.golang.org/grpc/balancer/balancer.go +++ b/vendor/google.golang.org/grpc/balancer/balancer.go @@ -60,7 +60,7 @@ func Register(b Builder) { if !envconfig.CaseSensitiveBalancerRegistries { name = strings.ToLower(name) if name != b.Name() { - logger.Warningf("Balancer registered with name %q. grpc-go will be switching to case sensitive balancer registries soon. After 2 releases, we will enable the env var by default.", b.Name()) + logger.Warningf("Balancer registered with name %q. grpc-go has switched to case sensitive balancer registries. GRPC_GO_EXPERIMENTAL_CASE_SENSITIVE_BALANCER_REGISTRIES env variable will be removed in release v1.82.0", b.Name()) } } m[name] = b @@ -85,7 +85,7 @@ func Get(name string) Builder { if !envconfig.CaseSensitiveBalancerRegistries { lowerName := strings.ToLower(name) if lowerName != name { - logger.Warningf("Balancer retrieved for name %q. grpc-go will be switching to case sensitive balancer registries soon. After 2 releases, we will enable the env var by default.", name) + logger.Warningf("Balancer retrieved for name %q. grpc-go has switched to case sensitive balancer registries. GRPC_GO_EXPERIMENTAL_CASE_SENSITIVE_BALANCER_REGISTRIES env variable will be removed in release v1.82.0", name) } name = lowerName } diff --git a/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go index 518a69d5..d48bc304 100644 --- a/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go +++ b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go @@ -35,9 +35,9 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/pickfirst/internal" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/experimental/balancer/weight" expstats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/internal/balancer/weight" "google.golang.org/grpc/internal/envconfig" internalgrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/pretty" diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go index 4ec5f9cd..3af08e1a 100644 --- a/vendor/google.golang.org/grpc/dialoptions.go +++ b/vendor/google.golang.org/grpc/dialoptions.go @@ -173,10 +173,8 @@ func newJoinDialOption(opts ...DialOption) DialOption { // If this option is set to true every connection will release the buffer after // flushing the data on the wire. // -// # Experimental -// -// Notice: This API is EXPERIMENTAL and may be changed or removed in a -// later release. +// Deprecated: shared write buffer is enabled by default. WithSharedWriteBuffer +// will be removed in a future release. func WithSharedWriteBuffer(val bool) DialOption { return newFuncDialOption(func(o *dialOptions) { o.copts.SharedWriteBuffer = val @@ -229,6 +227,14 @@ func WithInitialConnWindowSize(s int32) DialOption { // WithStaticStreamWindowSize returns a DialOption which sets the initial // stream window size to the value provided and disables dynamic flow control. +// +// Note that this also disables dynamic flow control for the connection, +// falling back to a default static connection-level window of 64KB. To +// use a larger connection-level window, you must also use the +// [WithStaticConnWindowSize] DialOption. +// +// Most users should not configure static flow control windows unless +// operating in a memory-constrained environment. func WithStaticStreamWindowSize(s int32) DialOption { return newFuncDialOption(func(o *dialOptions) { o.copts.InitialWindowSize = s @@ -239,6 +245,14 @@ func WithStaticStreamWindowSize(s int32) DialOption { // WithStaticConnWindowSize returns a DialOption which sets the initial // connection window size to the value provided and disables dynamic flow // control. +// +// Note that this also disables dynamic flow control for individual streams, +// falling back to a default static connection-level window of 64KB. To +// explicitly configure the stream-level window size, you must also use the +// [WithStaticStreamWindowSize] DialOption. +// +// Most users should not configure static flow control windows unless +// operating in a memory-constrained environment. func WithStaticConnWindowSize(s int32) DialOption { return newFuncDialOption(func(o *dialOptions) { o.copts.InitialConnWindowSize = s diff --git a/vendor/google.golang.org/grpc/encoding/encoding.go b/vendor/google.golang.org/grpc/encoding/encoding.go index 296f38c3..bfa8b268 100644 --- a/vendor/google.golang.org/grpc/encoding/encoding.go +++ b/vendor/google.golang.org/grpc/encoding/encoding.go @@ -66,6 +66,9 @@ type Compressor interface { // Decompress reads data from r, decompresses it, and provides the // uncompressed data via the returned io.Reader. If an error occurs while // initializing the decompressor, that error is returned instead. + // + // The returned io.Reader may optionally implement io.ReadCloser, and if it + // does, gRPC will call Close() exactly once. Decompress(r io.Reader) (io.Reader, error) // Name is the name of the compression codec and is used to set the content // coding header. The result must be static; the result cannot change diff --git a/vendor/google.golang.org/grpc/encoding/gzip/gzip.go b/vendor/google.golang.org/grpc/encoding/gzip/gzip.go index 153e4dbf..65908d9a 100644 --- a/vendor/google.golang.org/grpc/encoding/gzip/gzip.go +++ b/vendor/google.golang.org/grpc/encoding/gzip/gzip.go @@ -81,6 +81,8 @@ func (z *writer) Close() error { return z.Writer.Close() } +var _ io.Closer = &reader{} + type reader struct { *gzip.Reader pool *sync.Pool @@ -102,14 +104,16 @@ func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { return z, nil } -func (z *reader) Read(p []byte) (n int, err error) { - n, err = z.Reader.Read(p) - if err == io.EOF { - z.pool.Put(z) - } +func (r *reader) Read(p []byte) (n int, err error) { + n, err = r.Reader.Read(p) return n, err } +func (r *reader) Close() error { + defer r.pool.Put(r) + return r.Reader.Close() +} + func (c *compressor) Name() string { return Name } diff --git a/vendor/google.golang.org/grpc/internal/balancer/weight/weight.go b/vendor/google.golang.org/grpc/experimental/balancer/weight/weight.go similarity index 71% rename from vendor/google.golang.org/grpc/internal/balancer/weight/weight.go rename to vendor/google.golang.org/grpc/experimental/balancer/weight/weight.go index 11beb07d..beab9e07 100644 --- a/vendor/google.golang.org/grpc/internal/balancer/weight/weight.go +++ b/vendor/google.golang.org/grpc/experimental/balancer/weight/weight.go @@ -16,23 +16,23 @@ * */ -// Package weight contains utilities to manage endpoint weights. Weights are -// used by LB policies such as ringhash to distribute load across multiple -// endpoints. +// Package weight contains utilities to manage endpoint weights. +// Weights may be used by LB policies to distribute load across +// multiple endpoints. +// +// # Experimental +// +// Notice: All APIs in this package are EXPERIMENTAL and may be changed +// or removed in a later release. package weight -import ( - "fmt" - - "google.golang.org/grpc/resolver" -) +import "google.golang.org/grpc/resolver" // attributeKey is the type used as the key to store EndpointInfo in the // Attributes field of resolver.Endpoint. type attributeKey struct{} -// EndpointInfo will be stored in the Attributes field of Endpoints in order to -// use the ringhash balancer. +// EndpointInfo will be stored in the Attributes field of Endpoints. type EndpointInfo struct { Weight uint32 } @@ -43,22 +43,16 @@ func (a EndpointInfo) Equal(o any) bool { return ok && oa.Weight == a.Weight } -// Set returns a copy of endpoint in which the Attributes field is updated with -// EndpointInfo. +// Set returns a copy of endpoint in which the Attributes field is +// updated with EndpointInfo. func Set(endpoint resolver.Endpoint, epInfo EndpointInfo) resolver.Endpoint { endpoint.Attributes = endpoint.Attributes.WithValue(attributeKey{}, epInfo) return endpoint } -// String returns a human-readable representation of EndpointInfo. -// This method is intended for logging, testing, and debugging purposes only. -// Do not rely on the output format, as it is not guaranteed to remain stable. -func (a EndpointInfo) String() string { - return fmt.Sprintf("Weight: %d", a.Weight) -} - -// FromEndpoint returns the EndpointInfo stored in the Attributes field of an -// endpoint. It returns an empty EndpointInfo if attribute is not found. +// FromEndpoint returns the EndpointInfo stored in the Attributes +// field of an endpoint. It returns an empty EndpointInfo if attribute +// is not found. func FromEndpoint(endpoint resolver.Endpoint) EndpointInfo { v := endpoint.Attributes.Value(attributeKey{}) ei, _ := v.(EndpointInfo) diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go b/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go index 9e10fdd2..537ba057 100644 --- a/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go +++ b/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.6.1 +// - protoc-gen-go-grpc v1.6.2 // - protoc v5.27.1 // source: grpc/health/v1/health.proto diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go index 8ca87a57..ba05b65d 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go @@ -59,6 +59,15 @@ var ( // unconditionally. XDSEndpointHashKeyBackwardCompat = boolFromEnv("GRPC_XDS_ENDPOINT_HASH_KEY_BACKWARD_COMPAT", false) + // LabelServerGoroutines controls setting [runtime/pprof.Labels] on the + // goroutines spawned by [grpc.Server] type. + // For now, this is limited to the goroutines spawned to handle incoming + // requests on the server. + // Set "GRPC_GO_SERVER_GOROUTINE_LABELS" to "grpc.method=true" to + // enable this grpc.method label, or "all" to enable all valid labels. + // This variable is a bit-field. + LabelServerGoroutines = goroutineLabelsFromEnv("GRPC_GO_SERVER_GOROUTINE_LABELS", 0) + // RingHashSetRequestHashKey is set if the ring hash balancer can get the // request hash header by setting the "requestHashHeader" field, according // to gRFC A76. It can be disabled by setting the environment variable @@ -78,12 +87,12 @@ var ( EnableDefaultPortForProxyTarget = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_DEFAULT_PORT_FOR_PROXY_TARGET", true) // CaseSensitiveBalancerRegistries is set if the balancer registry should be - // case-sensitive. This is disabled by default, but can be enabled by setting + // case-sensitive. This is enabled by default, but can be disabled by setting // the env variable "GRPC_GO_EXPERIMENTAL_CASE_SENSITIVE_BALANCER_REGISTRIES" - // to "true". + // to "false". // - // TODO: After 2 releases, we will enable the env var by default. - CaseSensitiveBalancerRegistries = boolFromEnv("GRPC_GO_EXPERIMENTAL_CASE_SENSITIVE_BALANCER_REGISTRIES", false) + // This env varible will be removed in release v1.82.0. + CaseSensitiveBalancerRegistries = boolFromEnv("GRPC_GO_EXPERIMENTAL_CASE_SENSITIVE_BALANCER_REGISTRIES", true) // XDSAuthorityRewrite indicates whether xDS authority rewriting is enabled. // This feature is defined in gRFC A81 and is enabled by setting the @@ -104,22 +113,6 @@ var ( // to "false". XDSRecoverPanicInResourceParsing = boolFromEnv("GRPC_GO_EXPERIMENTAL_XDS_RESOURCE_PANIC_RECOVERY", true) - // DisableStrictPathChecking indicates whether strict path checking is - // disabled. This feature can be disabled by setting the environment - // variable GRPC_GO_EXPERIMENTAL_DISABLE_STRICT_PATH_CHECKING to "true". - // - // When strict path checking is enabled, gRPC will reject requests with - // paths that do not conform to the gRPC over HTTP/2 specification found at - // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md. - // - // When disabled, gRPC will allow paths that do not contain a leading slash. - // Enabling strict path checking is recommended for security reasons, as it - // prevents potential path traversal vulnerabilities. - // - // A future release will remove this environment variable, enabling strict - // path checking behavior unconditionally. - DisableStrictPathChecking = boolFromEnv("GRPC_GO_EXPERIMENTAL_DISABLE_STRICT_PATH_CHECKING", false) - // EnablePriorityLBChildPolicyCache controls whether the priority balancer // should cache child balancers that are removed from the LB policy config, // for a period of 15 minutes. This is disabled by default, but can be @@ -127,6 +120,18 @@ var ( // GRPC_EXPERIMENTAL_ENABLE_PRIORITY_LB_CHILD_POLICY_CACHE to true. EnablePriorityLBChildPolicyCache = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_PRIORITY_LB_CHILD_POLICY_CACHE", false) + // Enable8KBDefaultHeaderListSize indicates that default maximum header list + // size is restricted to 8KB. This is disabled by default, but can be enabled + // by setting the environment variable + // "GRPC_GO_EXPERIMENTAL_ENABLE_8KB_DEFAULT_HEADER_LIST_SIZE" to "true". + // When disabled, the default maximum header list size of 16MB is used. + // + // When enabled, RPCs with a total size of headers exceeding 8KB will fail + // unless explicitly configured otherwise by the user. + // + // TODO: In release v1.82.0, env var will be enabled by default. + Enable8KBDefaultHeaderListSize = boolFromEnv("GRPC_GO_EXPERIMENTAL_ENABLE_8KB_DEFAULT_HEADER_LIST_SIZE", false) + // EnableHTTPFramerReadBufferPooling enables the use of the // readyreader.Reader interface to perform non-memory-pinning reads, // provided the underlying net.Conn supports it. This reduces memory usage @@ -160,3 +165,52 @@ func uint64FromEnv(envVar string, def, min, max uint64) uint64 { } return v } + +// GoroutineLabels is a bitfield indicating which goroutine labels are enabled. +type GoroutineLabels uint16 + +func goroutineLabelsFromEnv(envVar string, def GoroutineLabels) GoroutineLabels { + val := def + v := os.Getenv(envVar) + if strings.EqualFold(v, "all") { + return AllGoroutineLabels + } else if strings.EqualFold(v, "none") { + return 0 + } + for s := range strings.SplitSeq(v, ",") { + s = strings.TrimSpace(s) + if len(s) == 0 { + continue + } + pre, post, ok := strings.Cut(s, "=") + if !ok { + // no equals sign + continue + } + post = strings.TrimSpace(post) + pre = strings.TrimSpace(pre) + bitDesignator := GoroutineLabels(0) + switch { + case strings.EqualFold(pre, "grpc.method"): + bitDesignator = GoroutineLabelServerMethod + default: + continue + } + if strings.EqualFold(post, "true") { + val |= bitDesignator + } else if strings.EqualFold(post, "false") { + val &^= bitDesignator + } + } + return val +} + +const ( + // GoroutineLabelServerMethod sets the grpc.method label on new + // server-side gRPC streams. + GoroutineLabelServerMethod GoroutineLabels = 1 << iota +) + +// AllGoroutineLabels is an or'd together bitfield of all valid GoroutineLabels +// constant values (above). +const AllGoroutineLabels = GoroutineLabelServerMethod diff --git a/vendor/google.golang.org/grpc/internal/envconfig/xds.go b/vendor/google.golang.org/grpc/internal/envconfig/xds.go index 333d8a0b..a2312f8e 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/xds.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/xds.go @@ -89,4 +89,14 @@ var ( // filtered and prefix-propagated to the LRS server. For more details, see: // https://github.com/grpc/proposal/blob/master/A85-lrs-custom-metrics-changes.md XDSORCAToLRSPropEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION", false) + + // XDSClientExtProcEnabled indicates whether ExtProc filter is enabled on + // the client side. For more details, see: + // https://github.com/grpc/proposal/blob/master/A93-xds-ext-proc.md + XDSClientExtProcEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_EXT_PROC_ON_CLIENT", false) + + // GCPAuthenticationFilterEnabled enables the xDS GCP Authentication + // filter. For more details, see: + // https://github.com/grpc/proposal/blob/master/A83-xds-gcp-authn-filter.md + GCPAuthenticationFilterEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_GCP_AUTHENTICATION_FILTER", false) ) diff --git a/vendor/google.golang.org/grpc/internal/grpcutil/encode_duration.go b/vendor/google.golang.org/grpc/internal/grpcutil/encode_duration.go index b25b0bae..1cc43fc6 100644 --- a/vendor/google.golang.org/grpc/internal/grpcutil/encode_duration.go +++ b/vendor/google.golang.org/grpc/internal/grpcutil/encode_duration.go @@ -39,7 +39,6 @@ func div(d, r time.Duration) int64 { // // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests func EncodeDuration(t time.Duration) string { - // TODO: This is simplistic and not bandwidth efficient. Improve it. if t <= 0 { return "0n" } diff --git a/vendor/google.golang.org/grpc/internal/resolver/config_selector.go b/vendor/google.golang.org/grpc/internal/resolver/config_selector.go index 3db62cca..6320e9b5 100644 --- a/vendor/google.golang.org/grpc/internal/resolver/config_selector.go +++ b/vendor/google.golang.org/grpc/internal/resolver/config_selector.go @@ -106,14 +106,24 @@ type ClientStream interface { // ClientInterceptor is an interceptor for gRPC client streams. type ClientInterceptor interface { - // NewStream produces a ClientStream for an RPC which may optionally use - // the provided function to produce a stream for delegation. Note: - // RPCInfo.Context should not be used (will be nil). + // NewStream creates a ClientStream for an RPC. // - // done is invoked when the RPC is finished using its connection, or could - // not be assigned a connection. RPC operations may still occur on - // ClientStream after done is called, since the interceptor is invoked by - // application-layer operations. done must never be nil when called. + // Implementations must delegate stream creation to the provided newStream + // function. To intercept or override stream behavior, implementations + // may wrap the ClientStream returned by the delegate. + // + // Note: RPCInfo.Context is currently unused and will be nil. + // + // The done function is invoked when the RPC has finished using its + // underlying connection or if a connection could not be assigned. Because + // interceptors operate at the application layer, RPC operations may + // continue on the ClientStream even after done has been called. The + // caller must ensure done is non-nil. + // + // To ensure RPC completion notifications propagate through the entire + // interceptor chain, implementations must ensure that the done function + // passed to the delegate newStream invokes the done function passed to + // NewStream. NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error) // Close closes the interceptor. Once called, no new calls to NewStream are // accepted. Ongoing calls to NewStream are allowed to complete. diff --git a/vendor/google.golang.org/grpc/internal/stats/labels.go b/vendor/google.golang.org/grpc/internal/stats/labels.go index fd33af51..5ea898cb 100644 --- a/vendor/google.golang.org/grpc/internal/stats/labels.go +++ b/vendor/google.golang.org/grpc/internal/stats/labels.go @@ -19,24 +19,56 @@ // Package stats provides internal stats related functionality. package stats -import "context" +import ( + "context" + "maps" +) -// Labels are the labels for metrics. -type Labels struct { - // TelemetryLabels are the telemetry labels to record. - TelemetryLabels map[string]string +// LabelCallback is a function that is executed when telemetry +// label keys are updated. +type LabelCallback func(map[string]string) +type telemetryLabelCallbackKey struct{} + +// UpdateLabels executes registered telemetry callbacks with the update labels. Labels +// are copied before being processed by any callbacks to ensure mutations are not +// shared among derived contexts. +// +// It is the responsibility of the registrant to handle conflicts or label resets. +func UpdateLabels(ctx context.Context, update map[string]string) { + executeTelemetryLabelCallbacks(ctx, update) } -type labelsKey struct{} +// RegisterTelemetryLabelCallback registers a callback function that is executed whenever +// telemetry labels are updated. +func RegisterTelemetryLabelCallback(ctx context.Context, callback LabelCallback) context.Context { + if callback == nil { + return ctx + } + + callbacks, ok := ctx.Value(telemetryLabelCallbackKey{}).([]LabelCallback) + if !ok { + return context.WithValue(ctx, telemetryLabelCallbackKey{}, []LabelCallback{callback}) + } + return context.WithValue(ctx, telemetryLabelCallbackKey{}, append(append([]LabelCallback(nil), callbacks...), callback)) -// GetLabels returns the Labels stored in the context, or nil if there is one. -func GetLabels(ctx context.Context) *Labels { - labels, _ := ctx.Value(labelsKey{}).(*Labels) - return labels } -// SetLabels sets the Labels in the context. -func SetLabels(ctx context.Context, labels *Labels) context.Context { - // could also append - return context.WithValue(ctx, labelsKey{}, labels) +// executeTelemetryLabelCallback runs the registered callbacks in the order they were +// registered on the context with the provided labels. If no callbacks are registered +// it does nothing. +// +// To ensure callbacks do not mutate the state of the provided label map it is copied +// before execution. +func executeTelemetryLabelCallbacks(ctx context.Context, labels map[string]string) { + callbacks, ok := ctx.Value(telemetryLabelCallbackKey{}).([]LabelCallback) + if !ok { + return + } + + labelsCopy := map[string]string{} + maps.Copy(labelsCopy, labels) + for _, callback := range callbacks { + callback(labelsCopy) + } + } diff --git a/vendor/google.golang.org/grpc/internal/transport/client_stream.go b/vendor/google.golang.org/grpc/internal/transport/client_stream.go index cd8152ef..ad382b0f 100644 --- a/vendor/google.golang.org/grpc/internal/transport/client_stream.go +++ b/vendor/google.golang.org/grpc/internal/transport/client_stream.go @@ -19,6 +19,7 @@ package transport import ( + "fmt" "sync/atomic" "golang.org/x/net/http2" @@ -28,6 +29,12 @@ import ( "google.golang.org/grpc/status" ) +// nonGRPCDataMaxLen is the maximum length of nonGRPCDataBuf. +// +// NOTE: If changed this value, you MUST update the corresponding test in: +// - /test/end2end_test.go:TestHTTPServerSendsNonGRPCHeaderSurfaceFurtherData +const nonGRPCDataMaxLen = 1024 + // ClientStream implements streaming functionality for a gRPC client. type ClientStream struct { Stream // Embed for common stream functionality. @@ -46,7 +53,11 @@ type ClientStream struct { // headerValid indicates whether a valid header was received. Only // meaningful after headerChan is closed (always call waitOnHeader() before // reading its value). - headerValid bool + headerValid bool + + nonGRPCStatus *status.Status // the initial status from the non-gRPC response header, finalized with collected data before closing. + nonGRPCDataBuf []byte // stores the data of a non-gRPC response. + noHeaders bool // set if the client never received headers (set only after the stream is done). headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream @@ -54,6 +65,29 @@ type ClientStream struct { statsHandler stats.Handler // nil for internal streams (e.g., health check, ORCA) where telemetry is not supported. } +func (s *ClientStream) startNonGRPCDataCollection(st *status.Status) { + s.nonGRPCStatus = st + s.nonGRPCDataBuf = make([]byte, 0, nonGRPCDataMaxLen) +} + +// finalizeNonGRPCStatus builds the terminal status by appending the collected +// response body to the original non-gRPC status message. +func (s *ClientStream) finalizeNonGRPCStatus() *status.Status { + msg := fmt.Sprintf("%s\ndata: %q", s.nonGRPCStatus.Message(), s.nonGRPCDataBuf) + return status.New(s.nonGRPCStatus.Code(), msg) +} + +// handleNonGRPCData collects non-gRPC body from the given data frame. +// It returns non-nil value when the stream should be closed with it. +func (s *ClientStream) handleNonGRPCData(f *parsedDataFrame) *status.Status { + n := min(f.data.Len(), nonGRPCDataMaxLen-len(s.nonGRPCDataBuf)) + s.nonGRPCDataBuf = append(s.nonGRPCDataBuf, f.data.ReadOnlyData()[0:n]...) + if len(s.nonGRPCDataBuf) >= nonGRPCDataMaxLen || f.StreamEnded() { + return s.finalizeNonGRPCStatus() + } + return nil +} + // Read reads an n byte message from the input stream. func (s *ClientStream) Read(n int) (mem.BufferSlice, error) { b, err := s.Stream.read(n) diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go index 7efa5247..c5a76b70 100644 --- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go +++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go @@ -956,39 +956,16 @@ func (l *loopyWriter) processData() (bool, error) { // from data is copied to h to make as big as the maximum possible HTTP2 frame // size. - if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame - // Client sends out empty data frame with endStream = true - if err := l.framer.writeData(dataItem.streamID, dataItem.endStream, nil); err != nil { - return false, err - } - str.itl.dequeue() // remove the empty data item from stream - reader.Close() - if str.itl.isEmpty() { - str.state = empty - } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers. - if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil { - return false, err - } - if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { - return false, err - } - } else { - l.activeStreams.enqueue(str) - } - return false, nil - } - + isEmpty := len(dataItem.h) == 0 && reader.Remaining() == 0 // Figure out the maximum size we can send maxSize := http2MaxFrameLen - if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control. + strQuota := int(l.oiws) - str.bytesOutStanding + if strQuota <= 0 && !isEmpty { // stream-level flow control. str.state = waitingOnStreamQuota return false, nil - } else if maxSize > strQuota { - maxSize = strQuota - } - if maxSize > int(l.sendQuota) { // connection-level flow control. - maxSize = int(l.sendQuota) } + maxSize = min(maxSize, max(strQuota, 0)) + maxSize = min(maxSize, int(l.sendQuota)) // connection-level flow control. // Compute how much of the header and data we can send within quota and max frame length hSize := min(maxSize, len(dataItem.h)) dSize := min(maxSize-hSize, reader.Remaining()) @@ -1039,19 +1016,23 @@ func (l *loopyWriter) processData() (bool, error) { reader.Close() str.itl.dequeue() } + return false, l.updateStreamAfterWrite(str) +} + +func (l *loopyWriter) updateStreamAfterWrite(str *outStream) error { if str.itl.isEmpty() { str.state = empty - } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers. + } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers. if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil { - return false, err + return err } if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { - return false, err + return err } } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota. str.state = waitingOnStreamQuota } else { // Otherwise add it back to the list of active streams. l.activeStreams.enqueue(str) } - return false, nil + return nil } diff --git a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go index 7cfbc963..98cef9ec 100644 --- a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go +++ b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go @@ -115,7 +115,6 @@ func (f *trInFlow) getSize() uint32 { return atomic.LoadUint32(&f.effectiveWindowSize) } -// TODO(mmukhi): Simplify this code. // inFlow deals with inbound flow control type inFlow struct { mu sync.Mutex @@ -174,14 +173,14 @@ func (f *inFlow) maybeAdjust(n uint32) uint32 { // onData is invoked when some data frame is received. It updates pendingData. func (f *inFlow) onData(n uint32) error { f.mu.Lock() + defer f.mu.Unlock() + f.pendingData += n if f.pendingData+f.pendingUpdate > f.limit+f.delta { limit := f.limit rcvd := f.pendingData + f.pendingUpdate - f.mu.Unlock() return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit) } - f.mu.Unlock() return nil } @@ -189,8 +188,9 @@ func (f *inFlow) onData(n uint32) error { // to be sent to the peer. func (f *inFlow) onRead(n uint32) uint32 { f.mu.Lock() + defer f.mu.Unlock() + if f.pendingData == 0 { - f.mu.Unlock() return 0 } f.pendingData -= n @@ -205,9 +205,7 @@ func (f *inFlow) onRead(n uint32) uint32 { if f.pendingUpdate >= f.limit/4 { wu := f.pendingUpdate f.pendingUpdate = 0 - f.mu.Unlock() return wu } - f.mu.Unlock() return 0 } diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go index 7ab3422b..a8356c9a 100644 --- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go @@ -479,8 +479,8 @@ func (ht *serverHandlerTransport) runStream() { func (ht *serverHandlerTransport) incrMsgRecv() {} -func (ht *serverHandlerTransport) Drain(string) { - panic("Drain() is not implemented") +func (ht *serverHandlerTransport) Drain(s string) { + ht.Close(errors.New(s)) } // mapRecvMsgError returns the non-nil err into the appropriate diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go index d6bc6a6c..133f5d70 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go @@ -39,6 +39,7 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/channelz" icredentials "google.golang.org/grpc/internal/credentials" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpcutil" @@ -318,7 +319,13 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts } writeBufSize := opts.WriteBufferSize readBufSize := opts.ReadBufferSize + // The default header list size is moving from 16MB to 8KB. The 8KB limit + // is only used if Enable8KBDefaultHeaderListSize is true; otherwise, the + // old 16MB default is used. User-specified options always take precedence. maxHeaderListSize := defaultClientMaxHeaderListSize + if envconfig.Enable8KBDefaultHeaderListSize { + maxHeaderListSize = upcomingDefaultHeaderListSize + } if opts.MaxHeaderListSize != nil { maxHeaderListSize = *opts.MaxHeaderListSize } @@ -879,8 +886,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr, handler s return false } } - if sz > int64(upcomingDefaultHeaderListSize) { - t.logger.Warningf("Header list size to send (%d bytes) is larger than the upcoming default limit (%d bytes). In a future release, this will be restricted to %d bytes.", sz, upcomingDefaultHeaderListSize, upcomingDefaultHeaderListSize) + if !envconfig.Enable8KBDefaultHeaderListSize && sz > int64(upcomingDefaultHeaderListSize) { + t.logger.Warningf("Header list size to send (%d bytes) is larger than the upcoming default limit (%d bytes). In release v1.82.0, GRPC_GO_EXPERIMENTAL_ENABLE_8KB_DEFAULT_HEADER_LIST_SIZE will be enabled by default, enforcing this limit.", sz, upcomingDefaultHeaderListSize) } return true } @@ -1224,6 +1231,23 @@ func (t *http2Client) handleData(f *parsedDataFrame) { t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false) return } + + if s.nonGRPCStatus != nil { + // The frame should be handled as a non-gRPC response body + st := s.handleNonGRPCData(f) + if st != nil { + t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, true) + return + } + if w := s.fc.onRead(size); w > 0 { + t.controlBuf.put(&outgoingWindowUpdate{ + streamID: s.id, + increment: w, + }) + } + return + } + dataLen := f.data.Len() if f.Header().Flags.Has(http2.FlagDataPadded) { if w := s.fc.onRead(size - uint32(dataLen)); w > 0 { @@ -1468,6 +1492,17 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } + // If we are collecting non-gRPC response data and receive a trailing + // HEADERS frame with END_STREAM, finalize the buffered data and close + // the stream. + if s.nonGRPCStatus != nil { + if endStream { + st := s.finalizeNonGRPCStatus() + t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, true) + } + return + } + var ( // If a gRPC Response-Headers has already been received, then it means // that the peer is speaking gRPC and we are in gRPC mode. @@ -1568,7 +1603,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } se := status.New(grpcErrorCode, strings.Join(errs, "; ")) - t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) + if endStream { + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, true) + return + } + + s.startNonGRPCDataCollection(se) return } diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index 3a8c36e4..1acd44be 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -38,11 +38,13 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcutil" "google.golang.org/grpc/internal/pretty" istatus "google.golang.org/grpc/internal/status" "google.golang.org/grpc/internal/syscall" + transportinternal "google.golang.org/grpc/internal/transport/internal" "google.golang.org/grpc/mem" "google.golang.org/grpc/codes" @@ -165,7 +167,13 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, } writeBufSize := config.WriteBufferSize readBufSize := config.ReadBufferSize + // The default header list size is moving from 16MB to 8KB. The 8KB limit + // is only used if Enable8KBDefaultHeaderListSize is true; otherwise, the + // old 16MB default is used. User-specified options always take precedence. maxHeaderListSize := defaultServerMaxHeaderListSize + if envconfig.Enable8KBDefaultHeaderListSize { + maxHeaderListSize = upcomingDefaultHeaderListSize + } if config.MaxHeaderListSize != nil { maxHeaderListSize = *config.MaxHeaderListSize } @@ -948,8 +956,8 @@ func (t *http2Server) checkForHeaderListSize(hf []hpack.HeaderField) bool { return false } } - if sz > int64(upcomingDefaultHeaderListSize) { - t.logger.Warningf("Header list size to send (%d bytes) is larger than the upcoming default limit (%d bytes). In a future release, this will be restricted to %d bytes.", sz, upcomingDefaultHeaderListSize, upcomingDefaultHeaderListSize) + if !envconfig.Enable8KBDefaultHeaderListSize && sz > int64(upcomingDefaultHeaderListSize) { + t.logger.Warningf("Header list size to send (%d bytes) is larger than the upcoming default limit (%d bytes). In release v1.82.0, GRPC_GO_EXPERIMENTAL_ENABLE_8KB_DEFAULT_HEADER_LIST_SIZE will be enabled by default, enforcing this limit.", sz, upcomingDefaultHeaderListSize) } return true } @@ -1441,14 +1449,14 @@ func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics { func (t *http2Server) incrMsgSent() { if channelz.IsOn() { t.channelz.SocketMetrics.MessagesSent.Add(1) - t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1) + t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(transportinternal.TimeNowFunc()) } } func (t *http2Server) incrMsgRecv() { if channelz.IsOn() { t.channelz.SocketMetrics.MessagesReceived.Add(1) - t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1) + t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(transportinternal.TimeNowFunc()) } } diff --git a/vendor/google.golang.org/grpc/internal/grpcutil/regex.go b/vendor/google.golang.org/grpc/internal/transport/internal/internal.go similarity index 59% rename from vendor/google.golang.org/grpc/internal/grpcutil/regex.go rename to vendor/google.golang.org/grpc/internal/transport/internal/internal.go index 7a092b2b..a7c7c7d5 100644 --- a/vendor/google.golang.org/grpc/internal/grpcutil/regex.go +++ b/vendor/google.golang.org/grpc/internal/transport/internal/internal.go @@ -1,6 +1,6 @@ /* * - * Copyright 2021 gRPC authors. + * Copyright 2026 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,16 +16,10 @@ * */ -package grpcutil +// Package internal contains functionality internal to the transport package. +package internal -import "regexp" - -// FullMatchWithRegex returns whether the full text matches the regex provided. -func FullMatchWithRegex(re *regexp.Regexp, text string) bool { - if len(text) == 0 { - return re.MatchString(text) - } - re.Longest() - rem := re.FindString(text) - return len(rem) == len(text) -} +// TimeNowFunc is a variable that can be set to override the default behavior of +// getting the current time in nanoseconds. It is used in transport code to set +// channelz timestamps, and is exposed here for testing purposes. +var TimeNowFunc func() int64 diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go index 1e224576..6dfae398 100644 --- a/vendor/google.golang.org/grpc/internal/transport/transport.go +++ b/vendor/google.golang.org/grpc/internal/transport/transport.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/transport/internal" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/mem" "google.golang.org/grpc/metadata" @@ -46,6 +47,10 @@ import ( const logLevel = 2 +func init() { + internal.TimeNowFunc = func() int64 { return time.Now().UnixNano() } +} + // recvMsg represents the received msg from the transport. All transport // protocol specific info has been removed. type recvMsg struct { diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go index ee7f7dea..52f4ea51 100644 --- a/vendor/google.golang.org/grpc/rpc_util.go +++ b/vendor/google.golang.org/grpc/rpc_util.go @@ -128,6 +128,16 @@ func NewGZIPDecompressor() Decompressor { } func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) { + return d.doWithMaxSize(r, math.MaxInt64) +} + +// doWithMaxSize behaves like Do but caps the size of the decompressed +// payload at maxMessageSize+1 bytes. The Decompressor interface does not +// allow extra parameters, so callers inside the package type-assert to +// *gzipDecompressor to invoke this method directly. The +1 byte makes it +// possible for the caller to detect that the limit was exceeded and +// return ResourceExhausted instead of materializing an unbounded payload. +func (d *gzipDecompressor) doWithMaxSize(r io.Reader, maxMessageSize int64) ([]byte, error) { var z *gzip.Reader switch maybeZ := d.pool.Get().(type) { case nil: @@ -148,7 +158,11 @@ func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) { z.Close() d.pool.Put(z) }() - return io.ReadAll(z) + var src io.Reader = z + if maxMessageSize < math.MaxInt64 { + src = io.LimitReader(z, maxMessageSize+1) + } + return io.ReadAll(src) } func (d *gzipDecompressor) Type() string { @@ -830,15 +844,15 @@ func compress(in mem.BufferSlice, cp Compressor, compressor encoding.Compressor, if compressor != nil { z, err := compressor.Compress(w) if err != nil { - return nil, 0, wrapErr(err) + return nil, compressionNone, wrapErr(err) } for _, b := range in { if _, err := z.Write(b.ReadOnlyData()); err != nil { - return nil, 0, wrapErr(err) + return nil, compressionNone, wrapErr(err) } } if err := z.Close(); err != nil { - return nil, 0, wrapErr(err) + return nil, compressionNone, wrapErr(err) } } else { // This is obviously really inefficient since it fully materializes the data, but @@ -848,7 +862,7 @@ func compress(in mem.BufferSlice, cp Compressor, compressor encoding.Compressor, buf := in.MaterializeToBuffer(pool) defer buf.Free() if err := cp.Do(w, buf.ReadOnlyData()); err != nil { - return nil, 0, wrapErr(err) + return nil, compressionNone, wrapErr(err) } } return out, compressionMade, nil @@ -971,7 +985,20 @@ func recvAndDecompress(p *parser, s recvCompressor, dc Decompressor, maxReceiveM func decompress(compressor encoding.Compressor, d mem.BufferSlice, dc Decompressor, maxReceiveMessageSize int, pool mem.BufferPool) (mem.BufferSlice, error) { if dc != nil { r := d.Reader() - uncompressed, err := dc.Do(r) + // For the built-in gzip decompressor, bound the decompressed output + // at maxReceiveMessageSize+1 so that a small but highly compressed + // payload (a "zip bomb") cannot expand to gigabytes in memory before + // the post-decompression size check below has a chance to fire. The + // Decompressor interface does not accept an extra size parameter, + // so we type-assert to invoke a size-aware helper. Third-party + // Decompressor implementations keep the original Do behavior. + var uncompressed []byte + var err error + if gd, ok := dc.(*gzipDecompressor); ok { + uncompressed, err = gd.doWithMaxSize(r, int64(maxReceiveMessageSize)) + } else { + uncompressed, err = dc.Do(r) + } if err != nil { r.Close() // ensure buffers are reused return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", err) @@ -989,6 +1016,9 @@ func decompress(compressor encoding.Compressor, d mem.BufferSlice, dc Decompress r.Close() // ensure buffers are reused return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the message: %v", err) } + if closer, ok := dcReader.(io.Closer); ok { + defer closer.Close() + } // Read at most one byte more than the limit from the decompressor. // Unless the limit is MaxInt64, in which case, that's impossible, so diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index 5229adf7..cf0a2067 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -28,6 +28,7 @@ import ( "net/http" "reflect" "runtime" + "runtime/pprof" "strings" "sync" "sync/atomic" @@ -150,8 +151,6 @@ type Server struct { serverWorkerChannel chan func() serverWorkerChannelClose func() - - strictPathCheckingLogEmitted atomic.Bool } type serverOptions struct { @@ -250,10 +249,8 @@ func newJoinServerOption(opts ...ServerOption) ServerOption { // If this option is set to true every connection will release the buffer after // flushing the data on the wire. // -// # Experimental -// -// Notice: This API is EXPERIMENTAL and may be changed or removed in a -// later release. +// Deprecated: shared write buffer is enabled by default. SharedWriteBuffer +// will be removed in a future release. func SharedWriteBuffer(val bool) ServerOption { return newFuncServerOption(func(o *serverOptions) { o.sharedWriteBuffer = val @@ -302,6 +299,14 @@ func InitialConnWindowSize(s int32) ServerOption { // window size to the value provided and disables dynamic flow control. // The lower bound for window size is 64K and any value smaller than that // will be ignored. +// +// Note that this also disables dynamic flow control for the connection, +// falling back to a default static connection-level window of 64KB. To +// use a larger connection-level window, you must also use the +// [StaticConnWindowSize] ServerOption. +// +// Most users should not configure static flow control windows unless +// operating in a memory-constrained environment. func StaticStreamWindowSize(s int32) ServerOption { return newFuncServerOption(func(o *serverOptions) { o.initialWindowSize = s @@ -313,6 +318,14 @@ func StaticStreamWindowSize(s int32) ServerOption { // window size to the value provided and disables dynamic flow control. // The lower bound for window size is 64K and any value smaller than that // will be ignored. +// +// Note that this also disables dynamic flow control for individual streams, +// falling back to a default static connection-level window of 64KB. To +// explicitly configure the stream-level window size, you must also use the +// [StaticStreamWindowSize] ServerOption. +// +// Most users should not configure static flow control windows unless +// operating in a memory-constrained environment. func StaticConnWindowSize(s int32) ServerOption { return newFuncServerOption(func(o *serverOptions) { o.initialConnWindowSize = s @@ -1787,6 +1800,12 @@ func (s *Server) handleMalformedMethodName(stream *transport.ServerStream, ti *t func (s *Server) handleStream(t transport.ServerTransport, stream *transport.ServerStream) { ctx := stream.Context() ctx = contextWithServer(ctx, s) + if envconfig.LabelServerGoroutines&envconfig.GoroutineLabelServerMethod != 0 { + // This method always runs in its own goroutine, so we can set a + // goroutine label without needing to restore a previous context. + ctx = pprof.WithLabels(ctx, pprof.Labels("grpc.method", stream.Method())) + pprof.SetGoroutineLabels(ctx) + } var ti *traceInfo if EnableTracing { tr := newTrace("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) @@ -1803,28 +1822,11 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Ser } } - sm := stream.Method() - if sm == "" { + sm, found := strings.CutPrefix(stream.Method(), "/") + if !found { s.handleMalformedMethodName(stream, ti) return } - if sm[0] != '/' { - // TODO(easwars): Add a link to the CVE in the below log messages once - // published. - if envconfig.DisableStrictPathChecking { - if old := s.strictPathCheckingLogEmitted.Swap(true); !old { - channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream received malformed method name %q. Allowing it because the environment variable GRPC_GO_EXPERIMENTAL_DISABLE_STRICT_PATH_CHECKING is set to true, but this option will be removed in a future release.", sm) - } - } else { - if old := s.strictPathCheckingLogEmitted.Swap(true); !old { - channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream rejected malformed method name %q. To temporarily allow such requests, set the environment variable GRPC_GO_EXPERIMENTAL_DISABLE_STRICT_PATH_CHECKING to true. Note that this is not recommended as it may allow requests to bypass security policies.", sm) - } - s.handleMalformedMethodName(stream, ti) - return - } - } else { - sm = sm[1:] - } pos := strings.LastIndex(sm, "/") if pos == -1 { s.handleMalformedMethodName(stream, ti) diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go index 3ccfe515..cf114ef4 100644 --- a/vendor/google.golang.org/grpc/version.go +++ b/vendor/google.golang.org/grpc/version.go @@ -19,4 +19,4 @@ package grpc // Version is the current grpc version. -const Version = "1.81.1" +const Version = "1.82.0" diff --git a/vendor/modules.txt b/vendor/modules.txt index 650d009e..af786278 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1140,7 +1140,7 @@ google.golang.org/genproto/googleapis/api/httpbody ## explicit; go 1.25.0 google.golang.org/genproto/googleapis/rpc/errdetails google.golang.org/genproto/googleapis/rpc/status -# google.golang.org/grpc v1.81.1 +# google.golang.org/grpc v1.82.0 ## explicit; go 1.25.0 google.golang.org/grpc google.golang.org/grpc/attributes @@ -1162,6 +1162,7 @@ google.golang.org/grpc/encoding google.golang.org/grpc/encoding/gzip google.golang.org/grpc/encoding/internal google.golang.org/grpc/encoding/proto +google.golang.org/grpc/experimental/balancer/weight google.golang.org/grpc/experimental/stats google.golang.org/grpc/grpclog google.golang.org/grpc/grpclog/internal @@ -1169,7 +1170,6 @@ google.golang.org/grpc/health/grpc_health_v1 google.golang.org/grpc/internal google.golang.org/grpc/internal/backoff google.golang.org/grpc/internal/balancer/gracefulswitch -google.golang.org/grpc/internal/balancer/weight google.golang.org/grpc/internal/balancerload google.golang.org/grpc/internal/binarylog google.golang.org/grpc/internal/buffer @@ -1195,6 +1195,7 @@ google.golang.org/grpc/internal/stats google.golang.org/grpc/internal/status google.golang.org/grpc/internal/syscall google.golang.org/grpc/internal/transport +google.golang.org/grpc/internal/transport/internal google.golang.org/grpc/internal/transport/networktype google.golang.org/grpc/internal/transport/readyreader google.golang.org/grpc/keepalive