diff --git a/internal/stubserver/stubserver.go b/internal/stubserver/stubserver.go index a83a42fa92c4..61f96905991c 100644 --- a/internal/stubserver/stubserver.go +++ b/internal/stubserver/stubserver.go @@ -238,6 +238,7 @@ func (ss *StubServer) Stop() { for i := len(ss.cleanups) - 1; i >= 0; i-- { ss.cleanups[i]() } + ss.cleanups = nil } func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult { diff --git a/xds/internal/balancer/clusterimpl/tests/balancer_test.go b/xds/internal/balancer/clusterimpl/tests/balancer_test.go index 2d3c49e04937..4a5c13b8e6b4 100644 --- a/xds/internal/balancer/clusterimpl/tests/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/tests/balancer_test.go @@ -25,20 +25,29 @@ import ( "testing" "time" + "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/fakeserver" + "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" - "github.com/google/uuid" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3pickfirstpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/pick_first/v3" + v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" @@ -170,3 +179,183 @@ func (s) TestConfigUpdateWithSameLoadReportingServerConfig(t *testing.T) { t.Fatal("New LRS stream created when expected not to") } } + +// Tests whether load is reported correctly when using pickfirst with endpoints +// in multiple localities. +func (s) TestLoadReportingPickFirstMultiLocality(t *testing.T) { + // Create an xDS management server that serves ADS and LRS requests. + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{SupportLoadReportingService: true}) + + // Create bootstrap configuration pointing to the above management server. + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + + // Create an xDS resolver with the above bootstrap configuration. + var resolverBuilder resolver.Builder + var err error + if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil { + resolverBuilder, err = newResolver.(func([]byte) (resolver.Builder, error))(bc) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + } + + // Start two server backends exposing the test service. + server1 := stubserver.StartTestService(t, nil) + defer server1.Stop() + + server2 := stubserver.StartTestService(t, nil) + defer server2.Stop() + + // Configure the xDS management server. + const serviceName = "my-test-xds-service" + routeConfigName := "route-" + serviceName + clusterName := "cluster-" + serviceName + endpointsName := "endpoints-" + serviceName + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)}, + Clusters: []*v3clusterpb.Cluster{ + { + Name: clusterName, + ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS}, + EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{ + EdsConfig: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{ + Ads: &v3corepb.AggregatedConfigSource{}, + }, + }, + ServiceName: endpointsName, + }, + // Specify a custom load balancing policy to use pickfirst. + LoadBalancingPolicy: &v3clusterpb.LoadBalancingPolicy{ + Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{ + { + TypedExtensionConfig: &v3corepb.TypedExtensionConfig{ + TypedConfig: testutils.MarshalAny(t, &v3pickfirstpb.PickFirst{}), + }, + }, + }, + }, + // Include a fake LRS server config pointing to self. + LrsServer: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{ + Self: &v3corepb.SelfConfigSource{}, + }, + }, + }, + }, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: endpointsName, + Host: "localhost", + Localities: []e2e.LocalityOptions{ + { + Backends: []e2e.BackendOptions{ + {Port: testutils.ParsePort(t, server1.Address)}, + }, + Weight: 1, + }, + { + Backends: []e2e.BackendOptions{ + {Port: testutils.ParsePort(t, server2.Address)}, + }, + Weight: 2, + }, + }, + })}, + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(resolverBuilder)) + if err != nil { + t.Fatalf("Failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + var peer peer.Peer + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + + // Verify that the request was sent to server 1. + if got, want := peer.Addr.String(), server1.Address; got != want { + t.Errorf("peer.Addr = %q, want = %q", got, want) + } + + // Ensure that an LRS stream is created. + if _, err = mgmtServer.LRSServer.LRSStreamOpenChan.Receive(ctx); err != nil { + t.Fatalf("Failure when waiting for an LRS stream to be opened: %v", err) + } + + // Handle the initial LRS request from the xDS client. + if _, err = mgmtServer.LRSServer.LRSRequestChan.Receive(ctx); err != nil { + t.Fatalf("Failure waiting for initial LRS request: %v", err) + } + + resp := fakeserver.Response{ + Resp: &v3lrspb.LoadStatsResponse{ + SendAllClusters: true, + LoadReportingInterval: durationpb.New(10 * time.Millisecond), + }, + } + mgmtServer.LRSServer.LRSResponseChan <- &resp + + // Wait for load to be reported for locality of server 2. + // We (incorrectly) wait for load report for region-2 because presently + // pickfirst always reports load for the locality of the last address in the + // subconn. This will be fixed by ensuring there is only one address per + // subconn. + // TODO(#7339): Change region to region-1 once fixed. + if err := waitForSuccessfulLoadReport(ctx, mgmtServer.LRSServer, "region-2"); err != nil { + t.Fatalf("region-2 did not receive load due to error: %v", err) + } + + // Stop server 1 and send one more rpc. Now the request should go to server 2. + server1.Stop() + + // Wait for the balancer to pick up the server state change. + testutils.AwaitState(ctx, t, cc, connectivity.Idle) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + + // Verify that the request was sent to server 2. + if got, want := peer.Addr.String(), server2.Address; got != want { + t.Errorf("peer.Addr = %q, want = %q", got, want) + } + + // Wait for load to be reported for locality of server 2. + if err := waitForSuccessfulLoadReport(ctx, mgmtServer.LRSServer, "region-2"); err != nil { + t.Fatalf("Server 2 did not receive load due to error: %v", err) + } +} + +// waitForSuccessfulLoadReport waits for a successful request to be reported for +// the specified locality region. +func waitForSuccessfulLoadReport(ctx context.Context, lrsServer *fakeserver.Server, region string) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case req := <-lrsServer.LRSRequestChan.C: + loadStats := req.(*fakeserver.Request).Req.(*v3lrspb.LoadStatsRequest) + for _, load := range loadStats.ClusterStats { + for _, locality := range load.UpstreamLocalityStats { + if locality.TotalSuccessfulRequests > 0 && locality.Locality.Region == region { + return nil + } + } + } + } + } +}