From 05d2c86074a21d482945b9994d85e3b66de0480d Mon Sep 17 00:00:00 2001 From: Kevin Pollet Date: Wed, 1 May 2024 06:38:03 +0200 Subject: [PATCH] Set Gateway HTTPRoute status Co-authored-by: Romain --- integration/k8s_conformance_test.go | 53 +- pkg/provider/kubernetes/gateway/client.go | 44 +- .../kubernetes/gateway/client_mock_test.go | 254 ---- .../mixed/with_namespace_selector.yml | 11 - pkg/provider/kubernetes/gateway/kubernetes.go | 1222 +++++++++-------- .../kubernetes/gateway/kubernetes_test.go | 294 +++- 6 files changed, 995 insertions(+), 883 deletions(-) delete mode 100644 pkg/provider/kubernetes/gateway/client_mock_test.go diff --git a/integration/k8s_conformance_test.go b/integration/k8s_conformance_test.go index 537227e72..01dd4856b 100644 --- a/integration/k8s_conformance_test.go +++ b/integration/k8s_conformance_test.go @@ -195,40 +195,28 @@ func (s *K8sConformanceSuite) TestK8sGatewayAPIConformance() { LatestObservedGenerationSet: 5 * time.Second, RequiredConsecutiveSuccesses: 0, }, - SupportedFeatures: sets.New[ksuite.SupportedFeature](). - Insert(ksuite.GatewayCoreFeatures.UnsortedList()...). - Insert(ksuite.ReferenceGrantCoreFeatures.UnsortedList()...), EnableAllSupportedFeatures: false, RunTest: *k8sConformanceRunTest, // Until the feature are all supported, following tests are skipped. SkipTests: []string{ - "HTTPExactPathMatching", - "HTTPRouteHostnameIntersection", - "HTTPRouteListenerHostnameMatching", - "HTTPRouteRequestHeaderModifier", - "GatewayClassObservedGenerationBump", - "HTTPRouteInvalidNonExistentBackendRef", - "GatewayWithAttachedRoutes", - "HTTPRouteCrossNamespace", - "HTTPRouteDisallowedKind", - "HTTPRouteInvalidReferenceGrant", - "HTTPRouteObservedGenerationBump", - "TLSRouteSimpleSameNamespace", - "TLSRouteInvalidReferenceGrant", - "HTTPRouteInvalidCrossNamespaceParentRef", - "HTTPRouteInvalidParentRefNotMatchingSectionName", - "GatewayModifyListeners", - "GatewayInvalidTLSConfiguration", - "HTTPRouteInvalidCrossNamespaceBackendRef", - "HTTPRouteMatchingAcrossRoutes", - "HTTPRoutePartiallyInvalidViaInvalidReferenceGrant", - "HTTPRouteRedirectHostAndStatus", - "HTTPRouteInvalidBackendRefUnknownKind", - "HTTPRoutePathMatchOrder", - "HTTPRouteSimpleSameNamespace", - "HTTPRouteMatching", - "HTTPRouteHeaderMatching", - "HTTPRouteReferenceGrant", + tests.GatewayClassObservedGenerationBump.ShortName, + tests.GatewayWithAttachedRoutes.ShortName, + tests.GatewayModifyListeners.ShortName, + tests.GatewayInvalidTLSConfiguration.ShortName, + tests.HTTPRouteHostnameIntersection.ShortName, + tests.HTTPRouteListenerHostnameMatching.ShortName, + tests.HTTPRouteInvalidNonExistentBackendRef.ShortName, + tests.HTTPRouteInvalidReferenceGrant.ShortName, + tests.HTTPRouteInvalidCrossNamespaceParentRef.ShortName, + tests.HTTPRouteInvalidParentRefNotMatchingSectionName.ShortName, + tests.HTTPRouteInvalidCrossNamespaceBackendRef.ShortName, + tests.HTTPRouteMatchingAcrossRoutes.ShortName, + tests.HTTPRoutePartiallyInvalidViaInvalidReferenceGrant.ShortName, + tests.HTTPRouteRedirectHostAndStatus.ShortName, + tests.HTTPRouteInvalidBackendRefUnknownKind.ShortName, + tests.HTTPRoutePathMatchOrder.ShortName, + tests.HTTPRouteHeaderMatching.ShortName, + tests.HTTPRouteReferenceGrant.ShortName, }, } @@ -241,10 +229,7 @@ func (s *K8sConformanceSuite) TestK8sGatewayAPIConformance() { Version: version.Version, Contact: []string{"@traefik/maintainers"}, }, - ConformanceProfiles: sets.New[ksuite.ConformanceProfileName]( - ksuite.HTTPConformanceProfileName, - ksuite.TLSConformanceProfileName, - ), + ConformanceProfiles: sets.New(ksuite.HTTPConformanceProfileName), }) require.NoError(s.T(), err) diff --git a/pkg/provider/kubernetes/gateway/client.go b/pkg/provider/kubernetes/gateway/client.go index 05ae47f44..06809147c 100644 --- a/pkg/provider/kubernetes/gateway/client.go +++ b/pkg/provider/kubernetes/gateway/client.go @@ -14,6 +14,7 @@ import ( kerror "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + ktypes "k8s.io/apimachinery/pkg/types" kinformers "k8s.io/client-go/informers" kclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -51,6 +52,7 @@ type Client interface { GetGatewayClasses() ([]*gatev1.GatewayClass, error) UpdateGatewayStatus(gateway *gatev1.Gateway, gatewayStatus gatev1.GatewayStatus) error UpdateGatewayClassStatus(gatewayClass *gatev1.GatewayClass, condition metav1.Condition) error + UpdateHTTPRouteStatus(ctx context.Context, gateway *gatev1.Gateway, nsName ktypes.NamespacedName, status gatev1.HTTPRouteStatus) error GetGateways() []*gatev1.Gateway GetHTTPRoutes(namespaces []string) ([]*gatev1.HTTPRoute, error) GetTCPRoutes(namespaces []string) ([]*gatev1alpha2.TCPRoute, error) @@ -384,7 +386,7 @@ func (c *clientWrapper) GetGateways() []*gatev1.Gateway { var result []*gatev1.Gateway for ns, factory := range c.factoriesGateway { - gateways, err := factory.Gateway().V1().Gateways().Lister().List(labels.Everything()) + gateways, err := factory.Gateway().V1().Gateways().Lister().Gateways(ns).List(labels.Everything()) if err != nil { log.Error().Err(err).Msgf("Failed to list Gateways in namespace %s", ns) continue @@ -453,6 +455,46 @@ func (c *clientWrapper) UpdateGatewayStatus(gateway *gatev1.Gateway, gatewayStat return nil } +func (c *clientWrapper) UpdateHTTPRouteStatus(ctx context.Context, gateway *gatev1.Gateway, nsName ktypes.NamespacedName, status gatev1.HTTPRouteStatus) error { + if !c.isWatchedNamespace(nsName.Namespace) { + return fmt.Errorf("updating HTTPRoute status %s/%s: namespace is not within watched namespaces", nsName.Namespace, nsName.Name) + } + + route, err := c.factoriesGateway[c.lookupNamespace(nsName.Namespace)].Gateway().V1().HTTPRoutes().Lister().HTTPRoutes(nsName.Namespace).Get(nsName.Name) + if err != nil { + return fmt.Errorf("getting HTTPRoute %s/%s: %w", nsName.Namespace, nsName.Name, err) + } + + var statuses []gatev1.RouteParentStatus + for _, status := range route.Status.Parents { + if status.ControllerName != controllerName { + statuses = append(statuses, status) + continue + } + if status.ParentRef.Namespace != nil && string(*status.ParentRef.Namespace) != gateway.Namespace { + statuses = append(statuses, status) + continue + } + if string(status.ParentRef.Name) != gateway.Name { + statuses = append(statuses, status) + continue + } + } + statuses = append(statuses, status.Parents...) + + route = route.DeepCopy() + route.Status = gatev1.HTTPRouteStatus{ + RouteStatus: gatev1.RouteStatus{ + Parents: statuses, + }, + } + + if _, err := c.csGateway.GatewayV1().HTTPRoutes(nsName.Namespace).UpdateStatus(ctx, route, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("updating HTTPRoute %s/%s status: %w", nsName.Namespace, nsName.Name, err) + } + return nil +} + func statusEquals(oldStatus, newStatus gatev1.GatewayStatus) bool { if len(oldStatus.Listeners) != len(newStatus.Listeners) { return false diff --git a/pkg/provider/kubernetes/gateway/client_mock_test.go b/pkg/provider/kubernetes/gateway/client_mock_test.go deleted file mode 100644 index b19dd9609..000000000 --- a/pkg/provider/kubernetes/gateway/client_mock_test.go +++ /dev/null @@ -1,254 +0,0 @@ -package gateway - -import ( - "fmt" - "os" - "path/filepath" - - "github.com/traefik/traefik/v3/pkg/provider/kubernetes/k8s" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - kscheme "k8s.io/client-go/kubernetes/scheme" - gatev1 "sigs.k8s.io/gateway-api/apis/v1" - gatev1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" - gatev1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" -) - -var _ Client = (*clientMock)(nil) - -func init() { - // required by k8s.MustParseYaml - err := gatev1alpha2.AddToScheme(kscheme.Scheme) - if err != nil { - panic(err) - } - - err = gatev1beta1.AddToScheme(kscheme.Scheme) - if err != nil { - panic(err) - } - - err = gatev1.AddToScheme(kscheme.Scheme) - if err != nil { - panic(err) - } -} - -type clientMock struct { - services []*corev1.Service - secrets []*corev1.Secret - endpoints []*corev1.Endpoints - namespaces []*corev1.Namespace - - apiServiceError error - apiSecretError error - apiEndpointsError error - - gatewayClasses []*gatev1.GatewayClass - gateways []*gatev1.Gateway - httpRoutes []*gatev1.HTTPRoute - tcpRoutes []*gatev1alpha2.TCPRoute - tlsRoutes []*gatev1alpha2.TLSRoute - referenceGrants []*gatev1beta1.ReferenceGrant - - watchChan chan interface{} -} - -func newClientMock(paths ...string) clientMock { - var c clientMock - - for _, path := range paths { - yamlContent, err := os.ReadFile(filepath.FromSlash("./fixtures/" + path)) - if err != nil { - panic(err) - } - - k8sObjects := k8s.MustParseYaml(yamlContent) - for _, obj := range k8sObjects { - switch o := obj.(type) { - case *corev1.Service: - c.services = append(c.services, o) - case *corev1.Secret: - c.secrets = append(c.secrets, o) - case *corev1.Namespace: - c.namespaces = append(c.namespaces, o) - case *corev1.Endpoints: - c.endpoints = append(c.endpoints, o) - case *gatev1.GatewayClass: - c.gatewayClasses = append(c.gatewayClasses, o) - case *gatev1.Gateway: - c.gateways = append(c.gateways, o) - case *gatev1.HTTPRoute: - c.httpRoutes = append(c.httpRoutes, o) - case *gatev1alpha2.TCPRoute: - c.tcpRoutes = append(c.tcpRoutes, o) - case *gatev1alpha2.TLSRoute: - c.tlsRoutes = append(c.tlsRoutes, o) - case *gatev1beta1.ReferenceGrant: - c.referenceGrants = append(c.referenceGrants, o) - default: - panic(fmt.Sprintf("Unknown runtime object %+v %T", o, o)) - } - } - } - - return c -} - -func (c clientMock) UpdateGatewayStatus(gateway *gatev1.Gateway, gatewayStatus gatev1.GatewayStatus) error { - for _, g := range c.gateways { - if g.Name == gateway.Name { - if !statusEquals(g.Status, gatewayStatus) { - g.Status = gatewayStatus - return nil - } - return fmt.Errorf("cannot update gateway %v", gateway.Name) - } - } - return nil -} - -func (c clientMock) UpdateGatewayClassStatus(gatewayClass *gatev1.GatewayClass, condition metav1.Condition) error { - for _, gc := range c.gatewayClasses { - if gc.Name == gatewayClass.Name { - for _, c := range gc.Status.Conditions { - if c.Type == condition.Type && c.Status != condition.Status { - c.Status = condition.Status - c.LastTransitionTime = condition.LastTransitionTime - c.Message = condition.Message - c.Reason = condition.Reason - } - } - } - } - return nil -} - -func (c clientMock) UpdateGatewayStatusConditions(gateway *gatev1.Gateway, condition metav1.Condition) error { - for _, g := range c.gatewayClasses { - if g.Name == gateway.Name { - for _, c := range g.Status.Conditions { - if c.Type == condition.Type && (c.Status != condition.Status || c.Reason != condition.Reason) { - c.Status = condition.Status - c.LastTransitionTime = condition.LastTransitionTime - c.Message = condition.Message - c.Reason = condition.Reason - } - } - } - } - return nil -} - -func (c clientMock) GetGatewayClasses() ([]*gatev1.GatewayClass, error) { - return c.gatewayClasses, nil -} - -func (c clientMock) GetGateways() []*gatev1.Gateway { - return c.gateways -} - -func inNamespace(m metav1.ObjectMeta, s string) bool { - return s == metav1.NamespaceAll || m.Namespace == s -} - -func (c clientMock) GetNamespaces(selector labels.Selector) ([]string, error) { - var ns []string - for _, namespace := range c.namespaces { - if selector.Matches(labels.Set(namespace.Labels)) { - ns = append(ns, namespace.Name) - } - } - return ns, nil -} - -func (c clientMock) GetHTTPRoutes(namespaces []string) ([]*gatev1.HTTPRoute, error) { - var httpRoutes []*gatev1.HTTPRoute - for _, namespace := range namespaces { - for _, httpRoute := range c.httpRoutes { - if inNamespace(httpRoute.ObjectMeta, namespace) { - httpRoutes = append(httpRoutes, httpRoute) - } - } - } - return httpRoutes, nil -} - -func (c clientMock) GetTCPRoutes(namespaces []string) ([]*gatev1alpha2.TCPRoute, error) { - var tcpRoutes []*gatev1alpha2.TCPRoute - for _, namespace := range namespaces { - for _, tcpRoute := range c.tcpRoutes { - if inNamespace(tcpRoute.ObjectMeta, namespace) { - tcpRoutes = append(tcpRoutes, tcpRoute) - } - } - } - return tcpRoutes, nil -} - -func (c clientMock) GetTLSRoutes(namespaces []string) ([]*gatev1alpha2.TLSRoute, error) { - var tlsRoutes []*gatev1alpha2.TLSRoute - for _, namespace := range namespaces { - for _, tlsRoute := range c.tlsRoutes { - if inNamespace(tlsRoute.ObjectMeta, namespace) { - tlsRoutes = append(tlsRoutes, tlsRoute) - } - } - } - return tlsRoutes, nil -} - -func (c clientMock) GetReferenceGrants(namespace string) ([]*gatev1beta1.ReferenceGrant, error) { - var referenceGrants []*gatev1beta1.ReferenceGrant - for _, referenceGrant := range c.referenceGrants { - if inNamespace(referenceGrant.ObjectMeta, namespace) { - referenceGrants = append(referenceGrants, referenceGrant) - } - } - return referenceGrants, nil -} - -func (c clientMock) GetService(namespace, name string) (*corev1.Service, bool, error) { - if c.apiServiceError != nil { - return nil, false, c.apiServiceError - } - - for _, service := range c.services { - if inNamespace(service.ObjectMeta, namespace) && service.Name == name { - return service, true, nil - } - } - return nil, false, c.apiServiceError -} - -func (c clientMock) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) { - if c.apiEndpointsError != nil { - return nil, false, c.apiEndpointsError - } - - for _, endpoints := range c.endpoints { - if inNamespace(endpoints.ObjectMeta, namespace) && endpoints.Name == name { - return endpoints, true, nil - } - } - - return &corev1.Endpoints{}, false, nil -} - -func (c clientMock) GetSecret(namespace, name string) (*corev1.Secret, bool, error) { - if c.apiSecretError != nil { - return nil, false, c.apiSecretError - } - - for _, secret := range c.secrets { - if inNamespace(secret.ObjectMeta, namespace) && secret.Name == name { - return secret, true, nil - } - } - return nil, false, nil -} - -func (c clientMock) WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error) { - return c.watchChan, nil -} diff --git a/pkg/provider/kubernetes/gateway/fixtures/mixed/with_namespace_selector.yml b/pkg/provider/kubernetes/gateway/fixtures/mixed/with_namespace_selector.yml index eb16d4727..1178ad391 100644 --- a/pkg/provider/kubernetes/gateway/fixtures/mixed/with_namespace_selector.yml +++ b/pkg/provider/kubernetes/gateway/fixtures/mixed/with_namespace_selector.yml @@ -17,17 +17,6 @@ data: tls.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0= tls.key: LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCi0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0= ---- -apiVersion: v1 -kind: Secret -metadata: - name: supersecret - namespace: default - -data: - tls.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0= - tls.key: LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCi0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0= - --- kind: GatewayClass apiVersion: gateway.networking.k8s.io/v1 diff --git a/pkg/provider/kubernetes/gateway/kubernetes.go b/pkg/provider/kubernetes/gateway/kubernetes.go index 590f840b4..e4fee7308 100644 --- a/pkg/provider/kubernetes/gateway/kubernetes.go +++ b/pkg/provider/kubernetes/gateway/kubernetes.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "regexp" + "slices" "sort" "strconv" "strings" @@ -31,14 +32,15 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + ktypes "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" - "k8s.io/utils/strings/slices" gatev1 "sigs.k8s.io/gateway-api/apis/v1" gatev1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" ) const ( - providerName = "kubernetesgateway" + providerName = "kubernetesgateway" + controllerName = "traefik.io/gateway-controller" groupCore = "core" @@ -72,6 +74,12 @@ type Provider struct { routerTransform k8s.RouterTransform } +// Entrypoint defines the available entry points. +type Entrypoint struct { + Address string + HasHTTPTLSConf bool +} + // StatusAddress holds the Gateway Status address configuration. type StatusAddress struct { IP string `description:"IP used to set Kubernetes Gateway status address." json:"ip,omitempty" toml:"ip,omitempty" yaml:"ip,omitempty"` @@ -137,12 +145,6 @@ func (p *Provider) applyRouterTransform(ctx context.Context, rt *dynamic.Router, } } -// Entrypoint defines the available entry points. -type Entrypoint struct { - Address string - HasHTTPTLSConf bool -} - func (p *Provider) newK8sClient(ctx context.Context) (*clientWrapper, error) { // Label selector validation _, err := labels.Parse(p.LabelSelector) @@ -286,7 +288,7 @@ func (p *Provider) loadConfigurationFromGateway(ctx context.Context, client Clie } for _, gatewayClass := range gatewayClasses { - if gatewayClass.Spec.ControllerName == "traefik.io/gateway-controller" { + if gatewayClass.Spec.ControllerName == controllerName { gatewayClassNames[gatewayClass.Name] = struct{}{} err := client.UpdateGatewayClassStatus(gatewayClass, metav1.Condition{ @@ -355,6 +357,11 @@ func (p *Provider) loadConfigurationFromGateway(ctx context.Context, client Clie } func (p *Provider) createGatewayConf(ctx context.Context, client Client, gateway *gatev1.Gateway) (*dynamic.Configuration, error) { + addresses, err := p.gatewayAddresses(client) + if err != nil { + return nil, fmt.Errorf("get Gateway status addresses: %w", err) + } + conf := &dynamic.Configuration{ HTTP: &dynamic.HTTPConfiguration{ Routers: map[string]*dynamic.Router{}, @@ -380,35 +387,43 @@ func (p *Provider) createGatewayConf(ctx context.Context, client Client, gateway // GatewayReasonListenersNotValid is used when one or more // Listeners have an invalid or unsupported configuration // and cannot be configured on the Gateway. - listenerStatuses := p.fillGatewayConf(ctx, client, gateway, conf, tlsConfigs) - - addresses, err := p.gatewayAddresses(client) - if err != nil { - return nil, fmt.Errorf("get Gateway status addresses: %w", err) - } - - gatewayStatus, errG := p.makeGatewayStatus(gateway, listenerStatuses, addresses) - - err = client.UpdateGatewayStatus(gateway, gatewayStatus) - if err != nil { - return nil, fmt.Errorf("an error occurred while updating gateway status: %w", err) - } - - if errG != nil { - return nil, fmt.Errorf("an error occurred while creating gateway status: %w", errG) - } + listenerStatuses, httpRouteParentStatuses := p.fillGatewayConf(ctx, client, gateway, conf, tlsConfigs) if len(tlsConfigs) > 0 { conf.TLS.Certificates = append(conf.TLS.Certificates, getTLSConfig(tlsConfigs)...) } + httpRouteStatuses := makeHTTPRouteStatuses(gateway.Namespace, httpRouteParentStatuses) + for nsName, status := range httpRouteStatuses { + if err := client.UpdateHTTPRouteStatus(ctx, gateway, nsName, status); err != nil { + log.Error(). + Err(err). + Str("namespace", nsName.Namespace). + Str("name", nsName.Name). + Msg("Unable to update HTTPRoute status") + } + } + + gatewayStatus, errG := p.makeGatewayStatus(gateway, listenerStatuses, addresses) + if err = client.UpdateGatewayStatus(gateway, gatewayStatus); err != nil { + log.Error(). + Err(err). + Str("namespace", gateway.Namespace). + Str("name", gateway.Name). + Msg("Unable to update Gateway status") + } + if errG != nil { + return nil, fmt.Errorf("creating gateway status: %w", errG) + } + return conf, nil } -func (p *Provider) fillGatewayConf(ctx context.Context, client Client, gateway *gatev1.Gateway, conf *dynamic.Configuration, tlsConfigs map[string]*tls.CertAndStores) []gatev1.ListenerStatus { +func (p *Provider) fillGatewayConf(ctx context.Context, client Client, gateway *gatev1.Gateway, conf *dynamic.Configuration, tlsConfigs map[string]*tls.CertAndStores) ([]gatev1.ListenerStatus, map[ktypes.NamespacedName][]gatev1.RouteParentStatus) { logger := log.Ctx(ctx) - listenerStatuses := make([]gatev1.ListenerStatus, len(gateway.Spec.Listeners)) allocatedListeners := make(map[string]struct{}) + listenerStatuses := make([]gatev1.ListenerStatus, len(gateway.Spec.Listeners)) + httpRouteParentStatuses := make(map[ktypes.NamespacedName][]gatev1.RouteParentStatus) for i, listener := range gateway.Spec.Listeners { listenerStatuses[i] = gatev1.ListenerStatus{ @@ -625,7 +640,12 @@ func (p *Provider) fillGatewayConf(ctx context.Context, client Client, gateway * for _, routeKind := range routeKinds { switch routeKind.Kind { case kindHTTPRoute: - listenerStatuses[i].Conditions = append(listenerStatuses[i].Conditions, p.gatewayHTTPRouteToHTTPConf(ctx, ep, listener, gateway, client, conf)...) + listenerConditions, routeStatuses := p.gatewayHTTPRouteToHTTPConf(ctx, ep, listener, gateway, client, conf) + listenerStatuses[i].Conditions = append(listenerStatuses[i].Conditions, listenerConditions...) + for nsName, status := range routeStatuses { + httpRouteParentStatuses[nsName] = append(httpRouteParentStatuses[nsName], status) + } + case kindTCPRoute: listenerStatuses[i].Conditions = append(listenerStatuses[i].Conditions, gatewayTCPRouteToTCPConf(ctx, ep, listener, gateway, client, conf)...) case kindTLSRoute: @@ -634,7 +654,7 @@ func (p *Provider) fillGatewayConf(ctx context.Context, client Client, gateway * } } - return listenerStatuses + return listenerStatuses, httpRouteParentStatuses } func (p *Provider) makeGatewayStatus(gateway *gatev1.Gateway, listenerStatuses []gatev1.ListenerStatus, addresses []gatev1.GatewayStatusAddress) (gatev1.GatewayStatus, error) { @@ -786,6 +806,543 @@ func (p *Provider) entryPointName(port gatev1.PortNumber, protocol gatev1.Protoc return "", fmt.Errorf("no matching entryPoint for port %d and protocol %q", port, protocol) } +func (p *Provider) gatewayHTTPRouteToHTTPConf(ctx context.Context, ep string, listener gatev1.Listener, gateway *gatev1.Gateway, client Client, conf *dynamic.Configuration) ([]metav1.Condition, map[ktypes.NamespacedName]gatev1.RouteParentStatus) { + // Should not happen due to validation. + if listener.AllowedRoutes == nil { + return nil, nil + } + + namespaces, err := getRouteBindingSelectorNamespace(client, gateway.Namespace, listener.AllowedRoutes.Namespaces) + if err != nil { + // update "ResolvedRefs" status true with "InvalidRoutesRef" reason + return []metav1.Condition{{ + Type: string(gatev1.ListenerConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: gateway.Generation, + LastTransitionTime: metav1.Now(), + Reason: "InvalidRouteNamespacesSelector", // Should never happen as the selector is validated by kubernetes + Message: fmt.Sprintf("Invalid route namespaces selector: %v", err), + }}, nil + } + + routes, err := client.GetHTTPRoutes(namespaces) + if err != nil { + // update "ResolvedRefs" status true with "RefNotPermitted" reason + return []metav1.Condition{{ + Type: string(gatev1.ListenerConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: gateway.Generation, + LastTransitionTime: metav1.Now(), + Reason: string(gatev1.ListenerReasonRefNotPermitted), + Message: fmt.Sprintf("Cannot fetch HTTPRoutes: %v", err), + }}, nil + } + + if len(routes) == 0 { + log.Ctx(ctx).Debug().Msg("No HTTPRoutes found") + return nil, nil + } + + var listenerConditions []metav1.Condition + routeStatuses := map[ktypes.NamespacedName]gatev1.RouteParentStatus{} + for _, route := range routes { + parentRef, ok := shouldAttach(gateway, listener, route.Namespace, route.Spec.CommonRouteSpec) + if !ok { + continue + } + + hostnames := matchingHostnames(listener, route.Spec.Hostnames) + if len(hostnames) == 0 && listener.Hostname != nil && *listener.Hostname != "" && len(route.Spec.Hostnames) > 0 { + // TODO update the corresponding route parent status + // https://gateway-api.sigs.k8s.io/v1alpha2/references/spec/#gateway.networking.k8s.io/v1alpha2.TLSRoute + continue + } + + hostRule, err := hostRule(hostnames) + if err != nil { + listenerConditions = append(listenerConditions, metav1.Condition{ + Type: string(gatev1.ListenerConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: gateway.Generation, + LastTransitionTime: metav1.Now(), + Reason: "InvalidRouteHostname", // TODO check the spec if a proper reason is introduced at some point + Message: fmt.Sprintf("Skipping HTTPRoute %s: invalid hostname: %v", route.Name, err), + }) + continue + } + + for _, routeRule := range route.Spec.Rules { + rule, err := extractRule(routeRule, hostRule) + if err != nil { + // update "ResolvedRefs" status true with "UnsupportedPathOrHeaderType" reason + listenerConditions = append(listenerConditions, metav1.Condition{ + Type: string(gatev1.ListenerConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: gateway.Generation, + LastTransitionTime: metav1.Now(), + Reason: "UnsupportedPathOrHeaderType", // TODO check the spec if a proper reason is introduced at some point + Message: fmt.Sprintf("Skipping HTTPRoute %s: cannot generate rule: %v", route.Name, err), + }) + continue + } + + router := dynamic.Router{ + Rule: rule, + RuleSyntax: "v3", + EntryPoints: []string{ep}, + } + + if listener.Protocol == gatev1.HTTPSProtocolType && listener.TLS != nil { + // TODO support let's encrypt + router.TLS = &dynamic.RouterTLSConfig{} + } + + // Adding the gateway desc and the entryPoint desc prevents overlapping of routers build from the same routes. + routerName := route.Name + "-" + gateway.Name + "-" + ep + routerKey, err := makeRouterKey(router.Rule, makeID(route.Namespace, routerName)) + if err != nil { + // update "ResolvedRefs" status true with "DroppedRoutes" reason + listenerConditions = append(listenerConditions, metav1.Condition{ + Type: string(gatev1.ListenerConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: gateway.Generation, + LastTransitionTime: metav1.Now(), + Reason: "InvalidRouterKey", // Should never happen + Message: fmt.Sprintf("Skipping HTTPRoute %s: cannot make router's key with rule %s: %v", route.Name, router.Rule, err), + }) + + // TODO update the RouteStatus condition / deduplicate conditions on listener + continue + } + + middlewares, err := p.loadMiddlewares(listener, route.Namespace, routerKey, routeRule.Filters) + if err != nil { + // update "ResolvedRefs" status true with "InvalidFilters" reason + listenerConditions = append(listenerConditions, metav1.Condition{ + Type: string(gatev1.ListenerConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: gateway.Generation, + LastTransitionTime: metav1.Now(), + Reason: "InvalidFilters", // TODO check the spec if a proper reason is introduced at some point + Message: fmt.Sprintf("Cannot load HTTPRoute filter %s/%s: %v", route.Namespace, route.Name, err), + }) + + // TODO update the RouteStatus condition / deduplicate conditions on listener + continue + } + + for middlewareName, middleware := range middlewares { + // If the middleware is not defined in the return of the loadMiddlewares function, it means we just need a reference to that middleware. + if middleware != nil { + conf.HTTP.Middlewares[middlewareName] = middleware + } + + router.Middlewares = append(router.Middlewares, middlewareName) + } + + if len(routeRule.BackendRefs) == 0 { + continue + } + + // Traefik internal service can be used only if there is only one BackendRef service reference. + if len(routeRule.BackendRefs) == 1 && isInternalService(routeRule.BackendRefs[0].BackendRef) { + router.Service = string(routeRule.BackendRefs[0].Name) + } else { + wrrService, subServices, err := p.loadServices(client, route.Namespace, routeRule.BackendRefs) + if err != nil { + // update "ResolvedRefs" status true with "DroppedRoutes" reason + listenerConditions = append(listenerConditions, metav1.Condition{ + Type: string(gatev1.ListenerConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: gateway.Generation, + LastTransitionTime: metav1.Now(), + Reason: "InvalidBackendRefs", // TODO check the spec if a proper reason is introduced at some point + Message: fmt.Sprintf("Cannot load HTTPRoute service %s/%s: %v", route.Namespace, route.Name, err), + }) + + // TODO update the RouteStatus condition / deduplicate conditions on listener + continue + } + + for svcName, svc := range subServices { + if svc != nil { + conf.HTTP.Services[svcName] = svc + } + } + + serviceName := provider.Normalize(routerKey + "-wrr") + conf.HTTP.Services[serviceName] = wrrService + + router.Service = serviceName + } + + rt := &router + p.applyRouterTransform(ctx, rt, route) + + routerKey = provider.Normalize(routerKey) + conf.HTTP.Routers[routerKey] = rt + } + + routeStatuses[ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}] = gatev1.RouteParentStatus{ + ParentRef: parentRef, + ControllerName: controllerName, + Conditions: []metav1.Condition{ + { + Type: string(gatev1.RouteConditionAccepted), + Status: metav1.ConditionTrue, + ObservedGeneration: route.Generation, + LastTransitionTime: metav1.Now(), + Reason: string(gatev1.RouteReasonAccepted), + }, + { + Type: string(gatev1.RouteConditionResolvedRefs), + Status: metav1.ConditionTrue, + ObservedGeneration: route.Generation, + LastTransitionTime: metav1.Now(), + Reason: string(gatev1.RouteConditionResolvedRefs), + }, + }, + } + } + + return listenerConditions, routeStatuses +} + +// loadServices is generating a WRR service, even when there is only one target. +func (p *Provider) loadServices(client Client, namespace string, backendRefs []gatev1.HTTPBackendRef) (*dynamic.Service, map[string]*dynamic.Service, error) { + services := map[string]*dynamic.Service{} + + wrrSvc := &dynamic.Service{ + Weighted: &dynamic.WeightedRoundRobin{ + Services: []dynamic.WRRService{}, + }, + } + + for _, backendRef := range backendRefs { + if backendRef.Group == nil || backendRef.Kind == nil { + // Should not happen as this is validated by kubernetes + continue + } + + if isInternalService(backendRef.BackendRef) { + return nil, nil, fmt.Errorf("traefik internal service %s is not allowed in a WRR loadbalancer", backendRef.BackendRef.Name) + } + + weight := int(ptr.Deref(backendRef.Weight, 1)) + + if *backendRef.Group != "" && *backendRef.Group != groupCore && *backendRef.Kind != "Service" { + if backendRef.Namespace != nil && string(*backendRef.Namespace) != namespace { + // TODO: support backend reference grant. + return nil, nil, fmt.Errorf("unsupported HTTPBackendRef %s/%s/%s", *backendRef.Group, *backendRef.Kind, backendRef.Name) + } + + name, service, err := p.loadHTTPBackendRef(namespace, backendRef) + if err != nil { + return nil, nil, fmt.Errorf("unable to load HTTPBackendRef %s/%s/%s: %w", *backendRef.Group, *backendRef.Kind, backendRef.Name, err) + } + + services[name] = service + wrrSvc.Weighted.Services = append(wrrSvc.Weighted.Services, dynamic.WRRService{Name: name, Weight: &weight}) + continue + } + + lb := &dynamic.ServersLoadBalancer{} + lb.SetDefaults() + + svc := dynamic.Service{LoadBalancer: lb} + + // TODO support cross namespace through ReferencePolicy + service, exists, err := client.GetService(namespace, string(backendRef.Name)) + if err != nil { + return nil, nil, err + } + + if !exists { + return nil, nil, errors.New("service not found") + } + + if len(service.Spec.Ports) > 1 && backendRef.Port == nil { + // If the port is unspecified and the backend is a Service + // object consisting of multiple port definitions, the route + // must be dropped from the Gateway. The controller should + // raise the "ResolvedRefs" condition on the Gateway with the + // "DroppedRoutes" reason. The gateway status for this route + // should be updated with a condition that describes the error + // more specifically. + log.Error().Msg("A multiple ports Kubernetes Service cannot be used if unspecified backendRef.Port") + continue + } + + var portSpec corev1.ServicePort + var match bool + + for _, p := range service.Spec.Ports { + if backendRef.Port == nil || p.Port == int32(*backendRef.Port) { + portSpec = p + match = true + break + } + } + + if !match { + return nil, nil, errors.New("service port not found") + } + + endpoints, endpointsExists, endpointsErr := client.GetEndpoints(namespace, string(backendRef.Name)) + if endpointsErr != nil { + return nil, nil, endpointsErr + } + + if !endpointsExists { + return nil, nil, errors.New("endpoints not found") + } + + if len(endpoints.Subsets) == 0 { + return nil, nil, errors.New("subset not found") + } + + var port int32 + var portStr string + for _, subset := range endpoints.Subsets { + for _, p := range subset.Ports { + if portSpec.Name == p.Name { + port = p.Port + break + } + } + + if port == 0 { + return nil, nil, errors.New("cannot define a port") + } + + protocol := getProtocol(portSpec) + + portStr = strconv.FormatInt(int64(port), 10) + for _, addr := range subset.Addresses { + svc.LoadBalancer.Servers = append(svc.LoadBalancer.Servers, dynamic.Server{ + URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(addr.IP, portStr)), + }) + } + } + + serviceName := provider.Normalize(makeID(service.Namespace, service.Name) + "-" + portStr) + services[serviceName] = &svc + + wrrSvc.Weighted.Services = append(wrrSvc.Weighted.Services, dynamic.WRRService{Name: serviceName, Weight: &weight}) + } + + if len(wrrSvc.Weighted.Services) == 0 { + return nil, nil, errors.New("no service has been created") + } + + return wrrSvc, services, nil +} + +func (p *Provider) loadHTTPBackendRef(namespace string, backendRef gatev1.HTTPBackendRef) (string, *dynamic.Service, error) { + // Support for cross-provider references (e.g: api@internal). + // This provides the same behavior as for IngressRoutes. + if *backendRef.Kind == "TraefikService" && strings.Contains(string(backendRef.Name), "@") { + return string(backendRef.Name), nil, nil + } + + backendFunc, ok := p.groupKindBackendFuncs[string(*backendRef.Group)][string(*backendRef.Kind)] + if !ok { + return "", nil, fmt.Errorf("unsupported HTTPBackendRef %s/%s/%s", *backendRef.Group, *backendRef.Kind, backendRef.Name) + } + if backendFunc == nil { + return "", nil, fmt.Errorf("undefined backendFunc for HTTPBackendRef %s/%s/%s", *backendRef.Group, *backendRef.Kind, backendRef.Name) + } + + return backendFunc(string(backendRef.Name), namespace) +} + +func (p *Provider) loadMiddlewares(listener gatev1.Listener, namespace string, prefix string, filters []gatev1.HTTPRouteFilter) (map[string]*dynamic.Middleware, error) { + middlewares := make(map[string]*dynamic.Middleware) + + // The spec allows for an empty string in which case we should use the + // scheme of the request which in this case is the listener scheme. + var listenerScheme string + switch listener.Protocol { + case gatev1.HTTPProtocolType: + listenerScheme = "http" + case gatev1.HTTPSProtocolType: + listenerScheme = "https" + default: + return nil, fmt.Errorf("invalid listener protocol %s", listener.Protocol) + } + + for i, filter := range filters { + var middleware *dynamic.Middleware + switch filter.Type { + case gatev1.HTTPRouteFilterRequestRedirect: + var err error + middleware, err = createRedirectRegexMiddleware(listenerScheme, filter.RequestRedirect) + if err != nil { + return nil, fmt.Errorf("creating RedirectRegex middleware: %w", err) + } + + middlewareName := provider.Normalize(fmt.Sprintf("%s-%s-%d", prefix, strings.ToLower(string(filter.Type)), i)) + middlewares[middlewareName] = middleware + case gatev1.HTTPRouteFilterExtensionRef: + name, middleware, err := p.loadHTTPRouteFilterExtensionRef(namespace, filter.ExtensionRef) + if err != nil { + return nil, fmt.Errorf("unsupported filter %s: %w", filter.Type, err) + } + + middlewares[name] = middleware + + case gatev1.HTTPRouteFilterRequestHeaderModifier: + middlewareName := provider.Normalize(fmt.Sprintf("%s-%s-%d", prefix, strings.ToLower(string(filter.Type)), i)) + middlewares[middlewareName] = createRequestHeaderModifier(filter.RequestHeaderModifier) + + default: + // As per the spec: + // https://gateway-api.sigs.k8s.io/api-types/httproute/#filters-optional + // In all cases where incompatible or unsupported filters are + // specified, implementations MUST add a warning condition to + // status. + return nil, fmt.Errorf("unsupported filter %s", filter.Type) + } + } + + return middlewares, nil +} + +func (p *Provider) loadHTTPRouteFilterExtensionRef(namespace string, extensionRef *gatev1.LocalObjectReference) (string, *dynamic.Middleware, error) { + if extensionRef == nil { + return "", nil, errors.New("filter extension ref undefined") + } + + filterFunc, ok := p.groupKindFilterFuncs[string(extensionRef.Group)][string(extensionRef.Kind)] + if !ok { + return "", nil, fmt.Errorf("unsupported filter extension ref %s/%s/%s", extensionRef.Group, extensionRef.Kind, extensionRef.Name) + } + if filterFunc == nil { + return "", nil, fmt.Errorf("undefined filterFunc for filter extension ref %s/%s/%s", extensionRef.Group, extensionRef.Kind, extensionRef.Name) + } + + return filterFunc(string(extensionRef.Name), namespace) +} + +// loadTCPServices is generating a WRR service, even when there is only one target. +func loadTCPServices(client Client, namespace string, backendRefs []gatev1.BackendRef) (*dynamic.TCPService, map[string]*dynamic.TCPService, error) { + services := map[string]*dynamic.TCPService{} + + wrrSvc := &dynamic.TCPService{ + Weighted: &dynamic.TCPWeightedRoundRobin{ + Services: []dynamic.TCPWRRService{}, + }, + } + + for _, backendRef := range backendRefs { + if backendRef.Group == nil || backendRef.Kind == nil { + // Should not happen as this is validated by kubernetes + continue + } + + if isInternalService(backendRef) { + return nil, nil, fmt.Errorf("traefik internal service %s is not allowed in a WRR loadbalancer", backendRef.Name) + } + + weight := int(ptr.Deref(backendRef.Weight, 1)) + + if isTraefikService(backendRef) { + wrrSvc.Weighted.Services = append(wrrSvc.Weighted.Services, dynamic.TCPWRRService{Name: string(backendRef.Name), Weight: &weight}) + continue + } + + if *backendRef.Group != "" && *backendRef.Group != groupCore && *backendRef.Kind != "Service" { + return nil, nil, fmt.Errorf("unsupported BackendRef %s/%s/%s", *backendRef.Group, *backendRef.Kind, backendRef.Name) + } + + svc := dynamic.TCPService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{}, + } + + service, exists, err := client.GetService(namespace, string(backendRef.Name)) + if err != nil { + return nil, nil, err + } + + if !exists { + return nil, nil, errors.New("service not found") + } + + if len(service.Spec.Ports) > 1 && backendRef.Port == nil { + // If the port is unspecified and the backend is a Service + // object consisting of multiple port definitions, the route + // must be dropped from the Gateway. The controller should + // raise the "ResolvedRefs" condition on the Gateway with the + // "DroppedRoutes" reason. The gateway status for this route + // should be updated with a condition that describes the error + // more specifically. + log.Error().Msg("A multiple ports Kubernetes Service cannot be used if unspecified backendRef.Port") + continue + } + + var portSpec corev1.ServicePort + var match bool + + for _, p := range service.Spec.Ports { + if backendRef.Port == nil || p.Port == int32(*backendRef.Port) { + portSpec = p + match = true + break + } + } + + if !match { + return nil, nil, errors.New("service port not found") + } + + endpoints, endpointsExists, endpointsErr := client.GetEndpoints(namespace, string(backendRef.Name)) + if endpointsErr != nil { + return nil, nil, endpointsErr + } + + if !endpointsExists { + return nil, nil, errors.New("endpoints not found") + } + + if len(endpoints.Subsets) == 0 { + return nil, nil, errors.New("subset not found") + } + + var port int32 + var portStr string + for _, subset := range endpoints.Subsets { + for _, p := range subset.Ports { + if portSpec.Name == p.Name { + port = p.Port + break + } + } + + if port == 0 { + return nil, nil, errors.New("cannot define a port") + } + + portStr = strconv.FormatInt(int64(port), 10) + for _, addr := range subset.Addresses { + svc.LoadBalancer.Servers = append(svc.LoadBalancer.Servers, dynamic.TCPServer{ + Address: net.JoinHostPort(addr.IP, portStr), + }) + } + } + + serviceName := provider.Normalize(makeID(service.Namespace, service.Name) + "-" + portStr) + services[serviceName] = &svc + + wrrSvc.Weighted.Services = append(wrrSvc.Weighted.Services, dynamic.TCPWRRService{Name: serviceName, Weight: &weight}) + } + + if len(wrrSvc.Weighted.Services) == 0 { + return nil, nil, errors.New("no service has been created") + } + + return wrrSvc, services, nil +} + func supportedRouteKinds(protocol gatev1.ProtocolType, experimentalChannel bool) ([]gatev1.RouteGroupKind, []metav1.Condition) { group := gatev1.Group(gatev1.GroupName) @@ -873,184 +1430,6 @@ func getAllowedRouteKinds(gateway *gatev1.Gateway, listener gatev1.Listener, sup return routeKinds, conditions } -func (p *Provider) gatewayHTTPRouteToHTTPConf(ctx context.Context, ep string, listener gatev1.Listener, gateway *gatev1.Gateway, client Client, conf *dynamic.Configuration) []metav1.Condition { - if listener.AllowedRoutes == nil { - // Should not happen due to validation. - return nil - } - - namespaces, err := getRouteBindingSelectorNamespace(client, gateway.Namespace, listener.AllowedRoutes.Namespaces) - if err != nil { - // update "ResolvedRefs" status true with "InvalidRoutesRef" reason - return []metav1.Condition{{ - Type: string(gatev1.ListenerConditionResolvedRefs), - Status: metav1.ConditionFalse, - ObservedGeneration: gateway.Generation, - LastTransitionTime: metav1.Now(), - Reason: "InvalidRouteNamespacesSelector", // Should never happen as the selector is validated by kubernetes - Message: fmt.Sprintf("Invalid route namespaces selector: %v", err), - }} - } - - routes, err := client.GetHTTPRoutes(namespaces) - if err != nil { - // update "ResolvedRefs" status true with "RefNotPermitted" reason - return []metav1.Condition{{ - Type: string(gatev1.ListenerConditionResolvedRefs), - Status: metav1.ConditionFalse, - ObservedGeneration: gateway.Generation, - LastTransitionTime: metav1.Now(), - Reason: string(gatev1.ListenerReasonRefNotPermitted), - Message: fmt.Sprintf("Cannot fetch HTTPRoutes: %v", err), - }} - } - - if len(routes) == 0 { - log.Ctx(ctx).Debug().Msg("No HTTPRoutes found") - return nil - } - - var conditions []metav1.Condition - for _, route := range routes { - if !shouldAttach(gateway, listener, route.Namespace, route.Spec.CommonRouteSpec) { - continue - } - - hostnames := matchingHostnames(listener, route.Spec.Hostnames) - if len(hostnames) == 0 && listener.Hostname != nil && *listener.Hostname != "" && len(route.Spec.Hostnames) > 0 { - // TODO update the corresponding route parent status - // https://gateway-api.sigs.k8s.io/v1alpha2/references/spec/#gateway.networking.k8s.io/v1alpha2.TLSRoute - continue - } - - hostRule, err := hostRule(hostnames) - if err != nil { - conditions = append(conditions, metav1.Condition{ - Type: string(gatev1.ListenerConditionResolvedRefs), - Status: metav1.ConditionFalse, - ObservedGeneration: gateway.Generation, - LastTransitionTime: metav1.Now(), - Reason: "InvalidRouteHostname", // TODO check the spec if a proper reason is introduced at some point - Message: fmt.Sprintf("Skipping HTTPRoute %s: invalid hostname: %v", route.Name, err), - }) - continue - } - - for _, routeRule := range route.Spec.Rules { - rule, err := extractRule(routeRule, hostRule) - if err != nil { - // update "ResolvedRefs" status true with "UnsupportedPathOrHeaderType" reason - conditions = append(conditions, metav1.Condition{ - Type: string(gatev1.ListenerConditionResolvedRefs), - Status: metav1.ConditionFalse, - ObservedGeneration: gateway.Generation, - LastTransitionTime: metav1.Now(), - Reason: "UnsupportedPathOrHeaderType", // TODO check the spec if a proper reason is introduced at some point - Message: fmt.Sprintf("Skipping HTTPRoute %s: cannot generate rule: %v", route.Name, err), - }) - } - - router := dynamic.Router{ - Rule: rule, - RuleSyntax: "v3", - EntryPoints: []string{ep}, - } - - if listener.Protocol == gatev1.HTTPSProtocolType && listener.TLS != nil { - // TODO support let's encrypt - router.TLS = &dynamic.RouterTLSConfig{} - } - - // Adding the gateway desc and the entryPoint desc prevents overlapping of routers build from the same routes. - routerName := route.Name + "-" + gateway.Name + "-" + ep - routerKey, err := makeRouterKey(router.Rule, makeID(route.Namespace, routerName)) - if err != nil { - // update "ResolvedRefs" status true with "DroppedRoutes" reason - conditions = append(conditions, metav1.Condition{ - Type: string(gatev1.ListenerConditionResolvedRefs), - Status: metav1.ConditionFalse, - ObservedGeneration: gateway.Generation, - LastTransitionTime: metav1.Now(), - Reason: "InvalidRouterKey", // Should never happen - Message: fmt.Sprintf("Skipping HTTPRoute %s: cannot make router's key with rule %s: %v", route.Name, router.Rule, err), - }) - - // TODO update the RouteStatus condition / deduplicate conditions on listener - continue - } - - middlewares, err := p.loadMiddlewares(listener, route.Namespace, routerKey, routeRule.Filters) - if err != nil { - // update "ResolvedRefs" status true with "InvalidFilters" reason - conditions = append(conditions, metav1.Condition{ - Type: string(gatev1.ListenerConditionResolvedRefs), - Status: metav1.ConditionFalse, - ObservedGeneration: gateway.Generation, - LastTransitionTime: metav1.Now(), - Reason: "InvalidFilters", // TODO check the spec if a proper reason is introduced at some point - Message: fmt.Sprintf("Cannot load HTTPRoute filter %s/%s: %v", route.Namespace, route.Name, err), - }) - - // TODO update the RouteStatus condition / deduplicate conditions on listener - continue - } - - for middlewareName, middleware := range middlewares { - // If the middleware is not defined in the return of the loadMiddlewares function, it means we just need a reference to that middleware. - if middleware != nil { - conf.HTTP.Middlewares[middlewareName] = middleware - } - - router.Middlewares = append(router.Middlewares, middlewareName) - } - - if len(routeRule.BackendRefs) == 0 { - continue - } - - // Traefik internal service can be used only if there is only one BackendRef service reference. - if len(routeRule.BackendRefs) == 1 && isInternalService(routeRule.BackendRefs[0].BackendRef) { - router.Service = string(routeRule.BackendRefs[0].Name) - } else { - wrrService, subServices, err := p.loadServices(client, route.Namespace, routeRule.BackendRefs) - if err != nil { - // update "ResolvedRefs" status true with "DroppedRoutes" reason - conditions = append(conditions, metav1.Condition{ - Type: string(gatev1.ListenerConditionResolvedRefs), - Status: metav1.ConditionFalse, - ObservedGeneration: gateway.Generation, - LastTransitionTime: metav1.Now(), - Reason: "InvalidBackendRefs", // TODO check the spec if a proper reason is introduced at some point - Message: fmt.Sprintf("Cannot load HTTPRoute service %s/%s: %v", route.Namespace, route.Name, err), - }) - - // TODO update the RouteStatus condition / deduplicate conditions on listener - continue - } - - for svcName, svc := range subServices { - if svc != nil { - conf.HTTP.Services[svcName] = svc - } - } - - serviceName := provider.Normalize(routerKey + "-wrr") - conf.HTTP.Services[serviceName] = wrrService - - router.Service = serviceName - } - - rt := &router - p.applyRouterTransform(ctx, rt, route) - - routerKey = provider.Normalize(routerKey) - conf.HTTP.Routers[routerKey] = rt - } - } - - return conditions -} - func gatewayTCPRouteToTCPConf(ctx context.Context, ep string, listener gatev1.Listener, gateway *gatev1.Gateway, client Client, conf *dynamic.Configuration) []metav1.Condition { if listener.AllowedRoutes == nil { // Should not happen due to validation. @@ -1090,7 +1469,7 @@ func gatewayTCPRouteToTCPConf(ctx context.Context, ep string, listener gatev1.Li var conditions []metav1.Condition for _, route := range routes { - if !shouldAttach(gateway, listener, route.Namespace, route.Spec.CommonRouteSpec) { + if _, ok := shouldAttach(gateway, listener, route.Namespace, route.Spec.CommonRouteSpec); !ok { continue } @@ -1225,7 +1604,7 @@ func gatewayTLSRouteToTCPConf(ctx context.Context, ep string, listener gatev1.Li var conditions []metav1.Condition for _, route := range routes { - if !shouldAttach(gateway, listener, route.Namespace, route.Spec.CommonRouteSpec) { + if _, ok := shouldAttach(gateway, listener, route.Namespace, route.Spec.CommonRouteSpec); !ok { continue } @@ -1392,7 +1771,7 @@ func matchingHostnames(listener gatev1.Listener, hostnames []gatev1.Hostname) [] return matches } -func shouldAttach(gateway *gatev1.Gateway, listener gatev1.Listener, routeNamespace string, routeSpec gatev1.CommonRouteSpec) bool { +func shouldAttach(gateway *gatev1.Gateway, listener gatev1.Listener, routeNamespace string, routeSpec gatev1.CommonRouteSpec) (gatev1.ParentReference, bool) { for _, parentRef := range routeSpec.ParentRefs { if parentRef.Group == nil || *parentRef.Group != gatev1.GroupName { continue @@ -1412,11 +1791,11 @@ func shouldAttach(gateway *gatev1.Gateway, listener gatev1.Listener, routeNamesp } if namespace == gateway.Namespace && string(parentRef.Name) == gateway.Name { - return true + return parentRef, true } } - return false + return gatev1.ParentReference{}, false } func getRouteBindingSelectorNamespace(client Client, gatewayNamespace string, routeNamespaces *gatev1.RouteNamespaces) ([]string, error) { @@ -1529,14 +1908,13 @@ func extractRule(routeRule gatev1.HTTPRouteRule, hostRule string) (string, error var matchRules []string if match.Path != nil && match.Path.Type != nil && match.Path.Value != nil { - // TODO handle other path types switch *match.Path.Type { case gatev1.PathMatchExact: matchRules = append(matchRules, fmt.Sprintf("Path(`%s`)", *match.Path.Value)) case gatev1.PathMatchPathPrefix: - matchRules = append(matchRules, fmt.Sprintf("PathPrefix(`%s`)", *match.Path.Value)) + matchRules = append(matchRules, buildPathMatchPathPrefixRule(*match.Path.Value)) default: - return "", fmt.Errorf("unsupported path match %s", *match.Path.Type) + return "", fmt.Errorf("unsupported path match type %s", *match.Path.Type) } } @@ -1595,6 +1973,15 @@ func extractHeaderRules(headers []gatev1.HTTPHeaderMatch) ([]string, error) { return headerRules, nil } +func buildPathMatchPathPrefixRule(path string) string { + if path == "/" { + return "PathPrefix(`/`)" + } + + path = strings.TrimSuffix(path, "/") + return fmt.Sprintf("(Path(`%[1]s`) || PathPrefix(`%[1]s/`))", path) +} + func makeRouterKey(rule, name string) (string, error) { h := sha256.New() if _, err := h.Write([]byte(rule)); err != nil { @@ -1687,341 +2074,6 @@ func getCertificateBlocks(secret *corev1.Secret, namespace, secretName string) ( return cert, key, nil } -// loadServices is generating a WRR service, even when there is only one target. -func (p *Provider) loadServices(client Client, namespace string, backendRefs []gatev1.HTTPBackendRef) (*dynamic.Service, map[string]*dynamic.Service, error) { - services := map[string]*dynamic.Service{} - - wrrSvc := &dynamic.Service{ - Weighted: &dynamic.WeightedRoundRobin{ - Services: []dynamic.WRRService{}, - }, - } - - for _, backendRef := range backendRefs { - if backendRef.Group == nil || backendRef.Kind == nil { - // Should not happen as this is validated by kubernetes - continue - } - - if isInternalService(backendRef.BackendRef) { - return nil, nil, fmt.Errorf("traefik internal service %s is not allowed in a WRR loadbalancer", backendRef.BackendRef.Name) - } - - weight := int(ptr.Deref(backendRef.Weight, 1)) - - if *backendRef.Group != "" && *backendRef.Group != groupCore && *backendRef.Kind != "Service" { - if backendRef.Namespace != nil && string(*backendRef.Namespace) != namespace { - // TODO: support backend reference grant. - return nil, nil, fmt.Errorf("unsupported HTTPBackendRef %s/%s/%s", *backendRef.Group, *backendRef.Kind, backendRef.Name) - } - - name, service, err := p.loadHTTPBackendRef(namespace, backendRef) - if err != nil { - return nil, nil, fmt.Errorf("unable to load HTTPBackendRef %s/%s/%s: %w", *backendRef.Group, *backendRef.Kind, backendRef.Name, err) - } - - services[name] = service - wrrSvc.Weighted.Services = append(wrrSvc.Weighted.Services, dynamic.WRRService{Name: name, Weight: &weight}) - continue - } - - lb := &dynamic.ServersLoadBalancer{} - lb.SetDefaults() - - svc := dynamic.Service{LoadBalancer: lb} - - // TODO support cross namespace through ReferencePolicy - service, exists, err := client.GetService(namespace, string(backendRef.Name)) - if err != nil { - return nil, nil, err - } - - if !exists { - return nil, nil, errors.New("service not found") - } - - if len(service.Spec.Ports) > 1 && backendRef.Port == nil { - // If the port is unspecified and the backend is a Service - // object consisting of multiple port definitions, the route - // must be dropped from the Gateway. The controller should - // raise the "ResolvedRefs" condition on the Gateway with the - // "DroppedRoutes" reason. The gateway status for this route - // should be updated with a condition that describes the error - // more specifically. - log.Error().Msg("A multiple ports Kubernetes Service cannot be used if unspecified backendRef.Port") - continue - } - - var portSpec corev1.ServicePort - var match bool - - for _, p := range service.Spec.Ports { - if backendRef.Port == nil || p.Port == int32(*backendRef.Port) { - portSpec = p - match = true - break - } - } - - if !match { - return nil, nil, errors.New("service port not found") - } - - endpoints, endpointsExists, endpointsErr := client.GetEndpoints(namespace, string(backendRef.Name)) - if endpointsErr != nil { - return nil, nil, endpointsErr - } - - if !endpointsExists { - return nil, nil, errors.New("endpoints not found") - } - - if len(endpoints.Subsets) == 0 { - return nil, nil, errors.New("subset not found") - } - - var port int32 - var portStr string - for _, subset := range endpoints.Subsets { - for _, p := range subset.Ports { - if portSpec.Name == p.Name { - port = p.Port - break - } - } - - if port == 0 { - return nil, nil, errors.New("cannot define a port") - } - - protocol := getProtocol(portSpec) - - portStr = strconv.FormatInt(int64(port), 10) - for _, addr := range subset.Addresses { - svc.LoadBalancer.Servers = append(svc.LoadBalancer.Servers, dynamic.Server{ - URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(addr.IP, portStr)), - }) - } - } - - serviceName := provider.Normalize(makeID(service.Namespace, service.Name) + "-" + portStr) - services[serviceName] = &svc - - wrrSvc.Weighted.Services = append(wrrSvc.Weighted.Services, dynamic.WRRService{Name: serviceName, Weight: &weight}) - } - - if len(wrrSvc.Weighted.Services) == 0 { - return nil, nil, errors.New("no service has been created") - } - - return wrrSvc, services, nil -} - -func (p *Provider) loadHTTPBackendRef(namespace string, backendRef gatev1.HTTPBackendRef) (string, *dynamic.Service, error) { - // Support for cross-provider references (e.g: api@internal). - // This provides the same behavior as for IngressRoutes. - if *backendRef.Kind == "TraefikService" && strings.Contains(string(backendRef.Name), "@") { - return string(backendRef.Name), nil, nil - } - - backendFunc, ok := p.groupKindBackendFuncs[string(*backendRef.Group)][string(*backendRef.Kind)] - if !ok { - return "", nil, fmt.Errorf("unsupported HTTPBackendRef %s/%s/%s", *backendRef.Group, *backendRef.Kind, backendRef.Name) - } - if backendFunc == nil { - return "", nil, fmt.Errorf("undefined backendFunc for HTTPBackendRef %s/%s/%s", *backendRef.Group, *backendRef.Kind, backendRef.Name) - } - - return backendFunc(string(backendRef.Name), namespace) -} - -// loadTCPServices is generating a WRR service, even when there is only one target. -func loadTCPServices(client Client, namespace string, backendRefs []gatev1.BackendRef) (*dynamic.TCPService, map[string]*dynamic.TCPService, error) { - services := map[string]*dynamic.TCPService{} - - wrrSvc := &dynamic.TCPService{ - Weighted: &dynamic.TCPWeightedRoundRobin{ - Services: []dynamic.TCPWRRService{}, - }, - } - - for _, backendRef := range backendRefs { - if backendRef.Group == nil || backendRef.Kind == nil { - // Should not happen as this is validated by kubernetes - continue - } - - if isInternalService(backendRef) { - return nil, nil, fmt.Errorf("traefik internal service %s is not allowed in a WRR loadbalancer", backendRef.Name) - } - - weight := int(ptr.Deref(backendRef.Weight, 1)) - - if isTraefikService(backendRef) { - wrrSvc.Weighted.Services = append(wrrSvc.Weighted.Services, dynamic.TCPWRRService{Name: string(backendRef.Name), Weight: &weight}) - continue - } - - if *backendRef.Group != "" && *backendRef.Group != groupCore && *backendRef.Kind != "Service" { - return nil, nil, fmt.Errorf("unsupported BackendRef %s/%s/%s", *backendRef.Group, *backendRef.Kind, backendRef.Name) - } - - svc := dynamic.TCPService{ - LoadBalancer: &dynamic.TCPServersLoadBalancer{}, - } - - service, exists, err := client.GetService(namespace, string(backendRef.Name)) - if err != nil { - return nil, nil, err - } - - if !exists { - return nil, nil, errors.New("service not found") - } - - if len(service.Spec.Ports) > 1 && backendRef.Port == nil { - // If the port is unspecified and the backend is a Service - // object consisting of multiple port definitions, the route - // must be dropped from the Gateway. The controller should - // raise the "ResolvedRefs" condition on the Gateway with the - // "DroppedRoutes" reason. The gateway status for this route - // should be updated with a condition that describes the error - // more specifically. - log.Error().Msg("A multiple ports Kubernetes Service cannot be used if unspecified backendRef.Port") - continue - } - - var portSpec corev1.ServicePort - var match bool - - for _, p := range service.Spec.Ports { - if backendRef.Port == nil || p.Port == int32(*backendRef.Port) { - portSpec = p - match = true - break - } - } - - if !match { - return nil, nil, errors.New("service port not found") - } - - endpoints, endpointsExists, endpointsErr := client.GetEndpoints(namespace, string(backendRef.Name)) - if endpointsErr != nil { - return nil, nil, endpointsErr - } - - if !endpointsExists { - return nil, nil, errors.New("endpoints not found") - } - - if len(endpoints.Subsets) == 0 { - return nil, nil, errors.New("subset not found") - } - - var port int32 - var portStr string - for _, subset := range endpoints.Subsets { - for _, p := range subset.Ports { - if portSpec.Name == p.Name { - port = p.Port - break - } - } - - if port == 0 { - return nil, nil, errors.New("cannot define a port") - } - - portStr = strconv.FormatInt(int64(port), 10) - for _, addr := range subset.Addresses { - svc.LoadBalancer.Servers = append(svc.LoadBalancer.Servers, dynamic.TCPServer{ - Address: net.JoinHostPort(addr.IP, portStr), - }) - } - } - - serviceName := provider.Normalize(makeID(service.Namespace, service.Name) + "-" + portStr) - services[serviceName] = &svc - - wrrSvc.Weighted.Services = append(wrrSvc.Weighted.Services, dynamic.TCPWRRService{Name: serviceName, Weight: &weight}) - } - - if len(wrrSvc.Weighted.Services) == 0 { - return nil, nil, errors.New("no service has been created") - } - - return wrrSvc, services, nil -} - -func (p *Provider) loadMiddlewares(listener gatev1.Listener, namespace string, prefix string, filters []gatev1.HTTPRouteFilter) (map[string]*dynamic.Middleware, error) { - middlewares := make(map[string]*dynamic.Middleware) - - // The spec allows for an empty string in which case we should use the - // scheme of the request which in this case is the listener scheme. - var listenerScheme string - switch listener.Protocol { - case gatev1.HTTPProtocolType: - listenerScheme = "http" - case gatev1.HTTPSProtocolType: - listenerScheme = "https" - default: - return nil, fmt.Errorf("invalid listener protocol %s", listener.Protocol) - } - - for i, filter := range filters { - var middleware *dynamic.Middleware - switch filter.Type { - case gatev1.HTTPRouteFilterRequestRedirect: - var err error - middleware, err = createRedirectRegexMiddleware(listenerScheme, filter.RequestRedirect) - if err != nil { - return nil, fmt.Errorf("creating RedirectRegex middleware: %w", err) - } - - middlewareName := provider.Normalize(fmt.Sprintf("%s-%s-%d", prefix, strings.ToLower(string(filter.Type)), i)) - middlewares[middlewareName] = middleware - case gatev1.HTTPRouteFilterExtensionRef: - name, middleware, err := p.loadHTTPRouteFilterExtensionRef(namespace, filter.ExtensionRef) - if err != nil { - return nil, fmt.Errorf("unsupported filter %s: %w", filter.Type, err) - } - - middlewares[name] = middleware - - case gatev1.HTTPRouteFilterRequestHeaderModifier: - middlewareName := provider.Normalize(fmt.Sprintf("%s-%s-%d", prefix, strings.ToLower(string(filter.Type)), i)) - middlewares[middlewareName] = createRequestHeaderModifier(filter.RequestHeaderModifier) - - default: - // As per the spec: - // https://gateway-api.sigs.k8s.io/api-types/httproute/#filters-optional - // In all cases where incompatible or unsupported filters are - // specified, implementations MUST add a warning condition to - // status. - return nil, fmt.Errorf("unsupported filter %s", filter.Type) - } - } - - return middlewares, nil -} - -func (p *Provider) loadHTTPRouteFilterExtensionRef(namespace string, extensionRef *gatev1.LocalObjectReference) (string, *dynamic.Middleware, error) { - if extensionRef == nil { - return "", nil, errors.New("filter extension ref undefined") - } - - filterFunc, ok := p.groupKindFilterFuncs[string(extensionRef.Group)][string(extensionRef.Kind)] - if !ok { - return "", nil, fmt.Errorf("unsupported filter extension ref %s/%s/%s", extensionRef.Group, extensionRef.Kind, extensionRef.Name) - } - if filterFunc == nil { - return "", nil, fmt.Errorf("undefined filterFunc for filter extension ref %s/%s/%s", extensionRef.Group, extensionRef.Kind, extensionRef.Name) - } - - return filterFunc(string(extensionRef.Name), namespace) -} - // createRequestHeaderModifier does not enforce/check the configuration, // as the spec indicates that either the webhook or CEL (since v1.0 GA Release) should enforce that. func createRequestHeaderModifier(filter *gatev1.HTTPHeaderFilter) *dynamic.Middleware { @@ -2205,3 +2257,71 @@ func kindToString(p *gatev1.Kind) string { } return string(*p) } + +func makeHTTPRouteStatuses(gwNs string, routeParentStatuses map[ktypes.NamespacedName][]gatev1.RouteParentStatus) map[ktypes.NamespacedName]gatev1.HTTPRouteStatus { + res := map[ktypes.NamespacedName]gatev1.HTTPRouteStatus{} + + for nsName, parentStatuses := range routeParentStatuses { + var httpRouteStatus gatev1.HTTPRouteStatus + for _, parentStatus := range parentStatuses { + exists := slices.ContainsFunc(httpRouteStatus.Parents, func(status gatev1.RouteParentStatus) bool { + return parentRefEquals(gwNs, parentStatus.ParentRef, status.ParentRef) + }) + if !exists { + httpRouteStatus.Parents = append(httpRouteStatus.Parents, parentStatus) + } + } + + res[nsName] = httpRouteStatus + } + + return res +} + +func parentRefEquals(gwNs string, p1, p2 gatev1.ParentReference) bool { + if !pointerEquals(p1.Group, p2.Group) { + return false + } + + if !pointerEquals(p1.Kind, p2.Kind) { + return false + } + + if !pointerEquals(p1.SectionName, p2.SectionName) { + return false + } + + if p1.Name != p2.Name { + return false + } + + p1Ns := gwNs + if p1.Namespace != nil { + p1Ns = string(*p1.Namespace) + } + + p2Ns := gwNs + if p2.Namespace != nil { + p2Ns = string(*p2.Namespace) + } + + return p1Ns == p2Ns +} + +func pointerEquals[T comparable](p1, p2 *T) bool { + if p1 == nil && p2 == nil { + return true + } + + var val1 T + if p1 != nil { + val1 = *p1 + } + + var val2 T + if p2 != nil { + val2 = *p2 + } + + return val1 == val2 +} diff --git a/pkg/provider/kubernetes/gateway/kubernetes_test.go b/pkg/provider/kubernetes/gateway/kubernetes_test.go index bfc9e3072..c428828b2 100644 --- a/pkg/provider/kubernetes/gateway/kubernetes_test.go +++ b/pkg/provider/kubernetes/gateway/kubernetes_test.go @@ -3,6 +3,8 @@ package gateway import ( "context" "errors" + "os" + "path/filepath" "testing" "time" @@ -12,16 +14,35 @@ import ( "github.com/traefik/traefik/v3/pkg/config/dynamic" "github.com/traefik/traefik/v3/pkg/provider" traefikv1alpha1 "github.com/traefik/traefik/v3/pkg/provider/kubernetes/crd/traefikio/v1alpha1" + "github.com/traefik/traefik/v3/pkg/provider/kubernetes/k8s" "github.com/traefik/traefik/v3/pkg/tls" "github.com/traefik/traefik/v3/pkg/types" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + kubefake "k8s.io/client-go/kubernetes/fake" + kscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/utils/ptr" gatev1 "sigs.k8s.io/gateway-api/apis/v1" + gatev1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gatev1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" + gatefake "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/fake" ) var _ provider.Provider = (*Provider)(nil) +func init() { + // required by k8s.MustParseYaml + if err := gatev1.AddToScheme(kscheme.Scheme); err != nil { + panic(err) + } + if err := gatev1beta1.AddToScheme(kscheme.Scheme); err != nil { + panic(err) + } + if err := gatev1alpha2.AddToScheme(kscheme.Scheme); err != nil { + panic(err) + } +} + func TestLoadHTTPRoutes(t *testing.T) { testCases := []struct { desc string @@ -1248,10 +1269,10 @@ func TestLoadHTTPRoutes(t *testing.T) { }, HTTP: &dynamic.HTTPConfiguration{ Routers: map[string]*dynamic.Router{ - "default-http-app-1-my-gateway-web-4a1b73e6f83804949a37": { + "default-http-app-1-my-gateway-web-6cf37fa71907768d925c": { EntryPoints: []string{"web"}, - Service: "default-http-app-1-my-gateway-web-4a1b73e6f83804949a37-wrr", - Rule: "Host(`foo.com`) && PathPrefix(`/bar`) && Header(`my-header`,`foo`) && Header(`my-header2`,`bar`)", + Service: "default-http-app-1-my-gateway-web-6cf37fa71907768d925c-wrr", + Rule: "Host(`foo.com`) && (Path(`/bar`) || PathPrefix(`/bar/`)) && Header(`my-header`,`foo`) && Header(`my-header2`,`bar`)", RuleSyntax: "v3", }, "default-http-app-1-my-gateway-web-aaba0f24fd26e1ca2276": { @@ -1263,7 +1284,7 @@ func TestLoadHTTPRoutes(t *testing.T) { }, Middlewares: map[string]*dynamic.Middleware{}, Services: map[string]*dynamic.Service{ - "default-http-app-1-my-gateway-web-4a1b73e6f83804949a37-wrr": { + "default-http-app-1-my-gateway-web-6cf37fa71907768d925c-wrr": { Weighted: &dynamic.WeightedRoundRobin{ Services: []dynamic.WRRService{ { @@ -1733,9 +1754,27 @@ func TestLoadHTTPRoutes(t *testing.T) { return } - p := Provider{EntryPoints: test.entryPoints, ExperimentalChannel: test.experimentalChannel} + p := Provider{ + EntryPoints: test.entryPoints, + ExperimentalChannel: test.experimentalChannel, + } - conf := p.loadConfigurationFromGateway(context.Background(), newClientMock(test.paths...)) + k8sObjects, gwObjects := readResources(t, test.paths) + + kubeClient := kubefake.NewSimpleClientset(k8sObjects...) + gwClient := newGatewaySimpleClientSet(t, gwObjects...) + + client := newClientImpl(kubeClient, gwClient) + + eventCh, err := client.WatchAll(nil, make(chan struct{})) + require.NoError(t, err) + + if len(k8sObjects) > 0 || len(gwObjects) > 0 { + // just wait for the first event + <-eventCh + } + + conf := p.loadConfigurationFromGateway(context.Background(), client) assert.Equal(t, test.expected, conf) }) } @@ -1992,12 +2031,28 @@ func TestLoadHTTPRoutes_backendExtensionRef(t *testing.T) { } p := Provider{EntryPoints: test.entryPoints} + + k8sObjects, gwObjects := readResources(t, test.paths) + + kubeClient := kubefake.NewSimpleClientset(k8sObjects...) + gwClient := newGatewaySimpleClientSet(t, gwObjects...) + + client := newClientImpl(kubeClient, gwClient) + + eventCh, err := client.WatchAll(nil, make(chan struct{})) + require.NoError(t, err) + + if len(k8sObjects) > 0 || len(gwObjects) > 0 { + // just wait for the first event + <-eventCh + } + for group, kindFuncs := range test.groupKindBackendFuncs { for kind, backendFunc := range kindFuncs { p.RegisterBackendFuncs(group, kind, backendFunc) } } - conf := p.loadConfigurationFromGateway(context.Background(), newClientMock(test.paths...)) + conf := p.loadConfigurationFromGateway(context.Background(), client) assert.Equal(t, test.expected, conf) }) } @@ -2208,12 +2263,28 @@ func TestLoadHTTPRoutes_filterExtensionRef(t *testing.T) { } p := Provider{EntryPoints: test.entryPoints} + + k8sObjects, gwObjects := readResources(t, []string{"services.yml", "httproute/filter_extension_ref.yml"}) + + kubeClient := kubefake.NewSimpleClientset(k8sObjects...) + gwClient := newGatewaySimpleClientSet(t, gwObjects...) + + client := newClientImpl(kubeClient, gwClient) + + eventCh, err := client.WatchAll(nil, make(chan struct{})) + require.NoError(t, err) + + if len(k8sObjects) > 0 || len(gwObjects) > 0 { + // just wait for the first event + <-eventCh + } + for group, kindFuncs := range test.groupKindFilterFuncs { for kind, filterFunc := range kindFuncs { p.RegisterFilterFuncs(group, kind, filterFunc) } } - conf := p.loadConfigurationFromGateway(context.Background(), newClientMock([]string{"services.yml", "httproute/filter_extension_ref.yml"}...)) + conf := p.loadConfigurationFromGateway(context.Background(), client) assert.Equal(t, test.expected, conf) }) } @@ -2971,8 +3042,28 @@ func TestLoadTCPRoutes(t *testing.T) { return } - p := Provider{EntryPoints: test.entryPoints, ExperimentalChannel: true} - conf := p.loadConfigurationFromGateway(context.Background(), newClientMock(test.paths...)) + p := Provider{ + EntryPoints: test.entryPoints, + ExperimentalChannel: true, + } + + k8sObjects, gwObjects := readResources(t, test.paths) + + kubeClient := kubefake.NewSimpleClientset(k8sObjects...) + gwClient := newGatewaySimpleClientSet(t, gwObjects...) + + client := newClientImpl(kubeClient, gwClient) + client.experimentalChannel = true + + eventCh, err := client.WatchAll(nil, make(chan struct{})) + require.NoError(t, err) + + if len(k8sObjects) > 0 || len(gwObjects) > 0 { + // just wait for the first event + <-eventCh + } + + conf := p.loadConfigurationFromGateway(context.Background(), client) assert.Equal(t, test.expected, conf) }) } @@ -4099,8 +4190,28 @@ func TestLoadTLSRoutes(t *testing.T) { return } - p := Provider{EntryPoints: test.entryPoints, ExperimentalChannel: true} - conf := p.loadConfigurationFromGateway(context.Background(), newClientMock(test.paths...)) + p := Provider{ + EntryPoints: test.entryPoints, + ExperimentalChannel: true, + } + + k8sObjects, gwObjects := readResources(t, test.paths) + + kubeClient := kubefake.NewSimpleClientset(k8sObjects...) + gwClient := newGatewaySimpleClientSet(t, gwObjects...) + + client := newClientImpl(kubeClient, gwClient) + client.experimentalChannel = true + + eventCh, err := client.WatchAll(nil, make(chan struct{})) + require.NoError(t, err) + + if len(k8sObjects) > 0 || len(gwObjects) > 0 { + // just wait for the first event + <-eventCh + } + + conf := p.loadConfigurationFromGateway(context.Background(), client) assert.Equal(t, test.expected, conf) }) } @@ -5112,8 +5223,28 @@ func TestLoadMixedRoutes(t *testing.T) { return } - p := Provider{EntryPoints: test.entryPoints, ExperimentalChannel: test.experimentalChannel} - conf := p.loadConfigurationFromGateway(context.Background(), newClientMock(test.paths...)) + p := Provider{ + EntryPoints: test.entryPoints, + ExperimentalChannel: test.experimentalChannel, + } + + k8sObjects, gwObjects := readResources(t, test.paths) + + kubeClient := kubefake.NewSimpleClientset(k8sObjects...) + gwClient := newGatewaySimpleClientSet(t, gwObjects...) + + client := newClientImpl(kubeClient, gwClient) + client.experimentalChannel = test.experimentalChannel + + eventCh, err := client.WatchAll(nil, make(chan struct{})) + require.NoError(t, err) + + if len(k8sObjects) > 0 || len(gwObjects) > 0 { + // just wait for the first event + <-eventCh + } + + conf := p.loadConfigurationFromGateway(context.Background(), client) assert.Equal(t, test.expected, conf) }) } @@ -5303,8 +5434,28 @@ func TestLoadRoutesWithReferenceGrants(t *testing.T) { return } - p := Provider{EntryPoints: test.entryPoints, ExperimentalChannel: test.experimentalChannel} - conf := p.loadConfigurationFromGateway(context.Background(), newClientMock(test.paths...)) + p := Provider{ + EntryPoints: test.entryPoints, + ExperimentalChannel: test.experimentalChannel, + } + + k8sObjects, gwObjects := readResources(t, test.paths) + + kubeClient := kubefake.NewSimpleClientset(k8sObjects...) + gwClient := newGatewaySimpleClientSet(t, gwObjects...) + + client := newClientImpl(kubeClient, gwClient) + client.experimentalChannel = test.experimentalChannel + + eventCh, err := client.WatchAll(nil, make(chan struct{})) + require.NoError(t, err) + + if len(k8sObjects) > 0 || len(gwObjects) > 0 { + // just wait for the first event + <-eventCh + } + + conf := p.loadConfigurationFromGateway(context.Background(), client) assert.Equal(t, test.expected, conf) }) } @@ -5731,7 +5882,8 @@ func Test_shouldAttach(t *testing.T) { listener gatev1.Listener routeNamespace string routeSpec gatev1.CommonRouteSpec - expectedAttach bool + wantAttach bool + wantParentRef gatev1.ParentReference }{ { desc: "No ParentRefs", @@ -5748,7 +5900,7 @@ func Test_shouldAttach(t *testing.T) { routeSpec: gatev1.CommonRouteSpec{ ParentRefs: nil, }, - expectedAttach: false, + wantAttach: false, }, { desc: "Unsupported Kind", @@ -5773,7 +5925,7 @@ func Test_shouldAttach(t *testing.T) { }, }, }, - expectedAttach: false, + wantAttach: false, }, { desc: "Unsupported Group", @@ -5798,7 +5950,7 @@ func Test_shouldAttach(t *testing.T) { }, }, }, - expectedAttach: false, + wantAttach: false, }, { desc: "Kind is nil", @@ -5822,7 +5974,7 @@ func Test_shouldAttach(t *testing.T) { }, }, }, - expectedAttach: false, + wantAttach: false, }, { desc: "Group is nil", @@ -5846,7 +5998,7 @@ func Test_shouldAttach(t *testing.T) { }, }, }, - expectedAttach: false, + wantAttach: false, }, { desc: "SectionName does not match a listener desc", @@ -5871,7 +6023,7 @@ func Test_shouldAttach(t *testing.T) { }, }, }, - expectedAttach: false, + wantAttach: false, }, { desc: "Namespace does not match the Gateway namespace", @@ -5896,7 +6048,7 @@ func Test_shouldAttach(t *testing.T) { }, }, }, - expectedAttach: false, + wantAttach: false, }, { desc: "Route namespace does not match the Gateway namespace", @@ -5920,7 +6072,7 @@ func Test_shouldAttach(t *testing.T) { }, }, }, - expectedAttach: false, + wantAttach: false, }, { desc: "Unsupported Kind", @@ -5945,7 +6097,7 @@ func Test_shouldAttach(t *testing.T) { }, }, }, - expectedAttach: false, + wantAttach: false, }, { desc: "Route namespace matches the Gateway namespace", @@ -5969,7 +6121,13 @@ func Test_shouldAttach(t *testing.T) { }, }, }, - expectedAttach: true, + wantAttach: true, + wantParentRef: gatev1.ParentReference{ + SectionName: sectionNamePtr("foo"), + Name: "gateway", + Kind: kindPtr("Gateway"), + Group: groupPtr(gatev1.GroupName), + }, }, { desc: "Namespace matches the Gateway namespace", @@ -5994,7 +6152,14 @@ func Test_shouldAttach(t *testing.T) { }, }, }, - expectedAttach: true, + wantAttach: true, + wantParentRef: gatev1.ParentReference{ + SectionName: sectionNamePtr("foo"), + Name: "gateway", + Namespace: namespacePtr("default"), + Kind: kindPtr("Gateway"), + Group: groupPtr(gatev1.GroupName), + }, }, { desc: "Only one ParentRef matches the Gateway", @@ -6024,7 +6189,13 @@ func Test_shouldAttach(t *testing.T) { }, }, }, - expectedAttach: true, + wantAttach: true, + wantParentRef: gatev1.ParentReference{ + Name: "gateway", + Namespace: namespacePtr("default"), + Kind: kindPtr("Gateway"), + Group: groupPtr(gatev1.GroupName), + }, }, } @@ -6032,8 +6203,9 @@ func Test_shouldAttach(t *testing.T) { t.Run(test.desc, func(t *testing.T) { t.Parallel() - got := shouldAttach(test.gateway, test.listener, test.routeNamespace, test.routeSpec) - assert.Equal(t, test.expectedAttach, got) + gotParentRef, gotAttach := shouldAttach(test.gateway, test.listener, test.routeNamespace, test.routeSpec) + assert.Equal(t, test.wantAttach, gotAttach) + assert.Equal(t, test.wantParentRef, gotParentRef) }) } } @@ -6627,7 +6799,22 @@ func Test_gatewayAddresses(t *testing.T) { p := Provider{StatusAddress: test.statusAddress} - got, err := p.gatewayAddresses(newClientMock(test.paths...)) + k8sObjects, gwObjects := readResources(t, test.paths) + + kubeClient := kubefake.NewSimpleClientset(k8sObjects...) + gwClient := newGatewaySimpleClientSet(t, gwObjects...) + + client := newClientImpl(kubeClient, gwClient) + + eventCh, err := client.WatchAll(nil, make(chan struct{})) + require.NoError(t, err) + + if len(k8sObjects) > 0 || len(gwObjects) > 0 { + // just wait for the first event + <-eventCh + } + + got, err := p.gatewayAddresses(client) test.wantErr(t, err) assert.Equal(t, test.want, got) @@ -6662,3 +6849,46 @@ func headerMatchTypePtr(h gatev1.HeaderMatchType) *gatev1.HeaderMatchType { retu func objectNamePtr(objectName gatev1.ObjectName) *gatev1.ObjectName { return &objectName } + +// We cannot use the gateway-api fake.NewSimpleClientset due to Gateway being pluralized as "gatewaies" instead of "gateways". +func newGatewaySimpleClientSet(t *testing.T, objects ...runtime.Object) *gatefake.Clientset { + t.Helper() + + client := gatefake.NewSimpleClientset(objects...) + for _, object := range objects { + gateway, ok := object.(*gatev1.Gateway) + if !ok { + continue + } + + _, err := client.GatewayV1().Gateways(gateway.Namespace).Create(context.Background(), gateway, metav1.CreateOptions{}) + require.NoError(t, err) + } + + return client +} + +func readResources(t *testing.T, paths []string) ([]runtime.Object, []runtime.Object) { + t.Helper() + + var k8sObjects []runtime.Object + var gwObjects []runtime.Object + for _, path := range paths { + yamlContent, err := os.ReadFile(filepath.FromSlash("./fixtures/" + path)) + if err != nil { + panic(err) + } + + objects := k8s.MustParseYaml(yamlContent) + for _, obj := range objects { + switch obj.GetObjectKind().GroupVersionKind().Group { + case "gateway.networking.k8s.io": + gwObjects = append(gwObjects, obj) + default: + k8sObjects = append(k8sObjects, obj) + } + } + } + + return k8sObjects, gwObjects +}