Skip to content

Commit

Permalink
OVN LB: add support for SCTP protocol (#2331)
Browse files Browse the repository at this point in the history
* OVN LB: add support for SCTP protocol

* add e2e cases for sctp connectivity

* fix lb vips cleanup
  • Loading branch information
zhangzujian committed Feb 14, 2023
1 parent 354fd40 commit f7156c9
Show file tree
Hide file tree
Showing 15 changed files with 384 additions and 492 deletions.
2 changes: 2 additions & 0 deletions dist/images/Dockerfile.base
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ RUN cd /usr/src/ && git clone -b branch-22.03 --depth=1 https://github.com/ovn-o
curl -s https://github.com/kubeovn/ovn/commit/2b7226d936a60140574827a6a34560e0b332e876.patch | git apply && \
# fix reaching resubmit limit in underlay
curl -s https://github.com/kubeovn/ovn/commit/44b44df9ceb37d526594126c6f8737fafb53269c.patch | git apply && \
# do not remove LB if vips is empty
curl -s https://github.com/kubeovn/ovn/commit/45a4a22161e42f17f21baee9106a45964dfd3a1b.patch | git apply && \
sed -i 's/OVN/ovn/g' debian/changelog && \
rm -rf .git && \
./boot.sh && \
Expand Down
4 changes: 4 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,10 @@ spec:
type: string
udpSessionLoadBalancer:
type: string
sctpLoadBalancer:
type: string
sctpSessionLoadBalancer:
type: string
type: object
type: object
served: true
Expand Down
24 changes: 13 additions & 11 deletions pkg/apis/kubeovn/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,17 +395,19 @@ type VpcStatus struct {
// +patchStrategy=merge
Conditions []VpcCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`

Standby bool `json:"standby"`
Default bool `json:"default"`
DefaultLogicalSwitch string `json:"defaultLogicalSwitch"`
Router string `json:"router"`
TcpLoadBalancer string `json:"tcpLoadBalancer"`
UdpLoadBalancer string `json:"udpLoadBalancer"`
TcpSessionLoadBalancer string `json:"tcpSessionLoadBalancer"`
UdpSessionLoadBalancer string `json:"udpSessionLoadBalancer"`
Subnets []string `json:"subnets"`
VpcPeerings []string `json:"vpcPeerings"`
EnableExternal bool `json:"enableExternal"`
Standby bool `json:"standby"`
Default bool `json:"default"`
DefaultLogicalSwitch string `json:"defaultLogicalSwitch"`
Router string `json:"router"`
TcpLoadBalancer string `json:"tcpLoadBalancer"`
UdpLoadBalancer string `json:"udpLoadBalancer"`
SctpLoadBalancer string `json:"sctpLoadBalancer"`
TcpSessionLoadBalancer string `json:"tcpSessionLoadBalancer"`
UdpSessionLoadBalancer string `json:"udpSessionLoadBalancer"`
SctpSessionLoadBalancer string `json:"sctpSessionLoadBalancer"`
Subnets []string `json:"subnets"`
VpcPeerings []string `json:"vpcPeerings"`
EnableExternal bool `json:"enableExternal"`
}

// Condition describes the state of an object at a certain point.
Expand Down
124 changes: 65 additions & 59 deletions pkg/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ type Configuration struct {

ServiceClusterIPRange string

ClusterTcpLoadBalancer string
ClusterUdpLoadBalancer string
ClusterTcpSessionLoadBalancer string
ClusterUdpSessionLoadBalancer string
ClusterTcpLoadBalancer string
ClusterUdpLoadBalancer string
ClusterSctpLoadBalancer string
ClusterTcpSessionLoadBalancer string
ClusterUdpSessionLoadBalancer string
ClusterSctpSessionLoadBalancer string

PodName string
PodNamespace string
Expand Down Expand Up @@ -122,10 +124,12 @@ func ParseFlags() (*Configuration, error) {

argServiceClusterIPRange = pflag.String("service-cluster-ip-range", "10.96.0.0/12", "The kubernetes service cluster ip range")

argClusterTcpLoadBalancer = pflag.String("cluster-tcp-loadbalancer", "cluster-tcp-loadbalancer", "The name for cluster tcp loadbalancer")
argClusterUdpLoadBalancer = pflag.String("cluster-udp-loadbalancer", "cluster-udp-loadbalancer", "The name for cluster udp loadbalancer")
argClusterTcpSessionLoadBalancer = pflag.String("cluster-tcp-session-loadbalancer", "cluster-tcp-session-loadbalancer", "The name for cluster tcp session loadbalancer")
argClusterUdpSessionLoadBalancer = pflag.String("cluster-udp-session-loadbalancer", "cluster-udp-session-loadbalancer", "The name for cluster udp session loadbalancer")
argClusterTcpLoadBalancer = pflag.String("cluster-tcp-loadbalancer", "cluster-tcp-loadbalancer", "The name for cluster tcp loadbalancer")
argClusterUdpLoadBalancer = pflag.String("cluster-udp-loadbalancer", "cluster-udp-loadbalancer", "The name for cluster udp loadbalancer")
argClusterSctpLoadBalancer = pflag.String("cluster-sctp-loadbalancer", "cluster-sctp-loadbalancer", "The name for cluster sctp loadbalancer")
argClusterTcpSessionLoadBalancer = pflag.String("cluster-tcp-session-loadbalancer", "cluster-tcp-session-loadbalancer", "The name for cluster tcp session loadbalancer")
argClusterUdpSessionLoadBalancer = pflag.String("cluster-udp-session-loadbalancer", "cluster-udp-session-loadbalancer", "The name for cluster udp session loadbalancer")
argClusterSctpSessionLoadBalancer = pflag.String("cluster-sctp-session-loadbalancer", "cluster-sctp-session-loadbalancer", "The name for cluster sctp session loadbalancer")

argWorkerNum = pflag.Int("worker-num", 3, "The parallelism of each worker")
argEnablePprof = pflag.Bool("enable-pprof", false, "Enable pprof")
Expand Down Expand Up @@ -178,57 +182,59 @@ func ParseFlags() (*Configuration, error) {
pflag.Parse()

config := &Configuration{
OvnNbAddr: *argOvnNbAddr,
OvnSbAddr: *argOvnSbAddr,
OvnTimeout: *argOvnTimeout,
CustCrdRetryMinDelay: *argCustCrdRetryMinDelay,
CustCrdRetryMaxDelay: *argCustCrdRetryMaxDelay,
KubeConfigFile: *argKubeConfigFile,
DefaultLogicalSwitch: *argDefaultLogicalSwitch,
DefaultCIDR: *argDefaultCIDR,
DefaultGateway: *argDefaultGateway,
DefaultGatewayCheck: *argDefaultGatewayCheck,
DefaultLogicalGateway: *argDefaultLogicalGateway,
DefaultU2OInterconnection: *argDefaultU2OInterconnection,
DefaultExcludeIps: *argDefaultExcludeIps,
ClusterRouter: *argClusterRouter,
NodeSwitch: *argNodeSwitch,
NodeSwitchCIDR: *argNodeSwitchCIDR,
NodeSwitchGateway: *argNodeSwitchGateway,
ServiceClusterIPRange: *argServiceClusterIPRange,
ClusterTcpLoadBalancer: *argClusterTcpLoadBalancer,
ClusterUdpLoadBalancer: *argClusterUdpLoadBalancer,
ClusterTcpSessionLoadBalancer: *argClusterTcpSessionLoadBalancer,
ClusterUdpSessionLoadBalancer: *argClusterUdpSessionLoadBalancer,
WorkerNum: *argWorkerNum,
EnablePprof: *argEnablePprof,
PprofPort: *argPprofPort,
NetworkType: *argNetworkType,
DefaultVlanID: *argDefaultVlanID,
LsDnatModDlDst: *argLsDnatModDlDst,
DefaultProviderName: *argDefaultProviderName,
DefaultHostInterface: *argDefaultInterfaceName,
DefaultExchangeLinkName: *argDefaultExchangeLinkName,
DefaultVlanName: *argDefaultVlanName,
PodName: os.Getenv("POD_NAME"),
PodNamespace: os.Getenv("KUBE_NAMESPACE"),
PodNicType: *argPodNicType,
PodDefaultFipType: *argPodDefaultFipType,
EnableLb: *argEnableLb,
EnableNP: *argEnableNP,
EnableEipSnat: *argEnableEipSnat,
EnableExternalVpc: *argEnableExternalVpc,
ExternalGatewayConfigNS: *argExternalGatewayConfigNS,
ExternalGatewaySwitch: *argExternalGatewaySwitch,
ExternalGatewayNet: *argExternalGatewayNet,
ExternalGatewayVlanID: *argExternalGatewayVlanID,
EnableEcmp: *argEnableEcmp,
EnableKeepVmIP: *argKeepVmIP,
NodePgProbeTime: *argNodePgProbeTime,
GCInterval: *argGCInterval,
InspectInterval: *argInspectInterval,
EnableLbSvc: *argEnableLbSvc,
EnableMetrics: *argEnableMetrics,
OvnNbAddr: *argOvnNbAddr,
OvnSbAddr: *argOvnSbAddr,
OvnTimeout: *argOvnTimeout,
CustCrdRetryMinDelay: *argCustCrdRetryMinDelay,
CustCrdRetryMaxDelay: *argCustCrdRetryMaxDelay,
KubeConfigFile: *argKubeConfigFile,
DefaultLogicalSwitch: *argDefaultLogicalSwitch,
DefaultCIDR: *argDefaultCIDR,
DefaultGateway: *argDefaultGateway,
DefaultGatewayCheck: *argDefaultGatewayCheck,
DefaultLogicalGateway: *argDefaultLogicalGateway,
DefaultU2OInterconnection: *argDefaultU2OInterconnection,
DefaultExcludeIps: *argDefaultExcludeIps,
ClusterRouter: *argClusterRouter,
NodeSwitch: *argNodeSwitch,
NodeSwitchCIDR: *argNodeSwitchCIDR,
NodeSwitchGateway: *argNodeSwitchGateway,
ServiceClusterIPRange: *argServiceClusterIPRange,
ClusterTcpLoadBalancer: *argClusterTcpLoadBalancer,
ClusterUdpLoadBalancer: *argClusterUdpLoadBalancer,
ClusterSctpLoadBalancer: *argClusterSctpLoadBalancer,
ClusterTcpSessionLoadBalancer: *argClusterTcpSessionLoadBalancer,
ClusterUdpSessionLoadBalancer: *argClusterUdpSessionLoadBalancer,
ClusterSctpSessionLoadBalancer: *argClusterSctpSessionLoadBalancer,
WorkerNum: *argWorkerNum,
EnablePprof: *argEnablePprof,
PprofPort: *argPprofPort,
NetworkType: *argNetworkType,
DefaultVlanID: *argDefaultVlanID,
LsDnatModDlDst: *argLsDnatModDlDst,
DefaultProviderName: *argDefaultProviderName,
DefaultHostInterface: *argDefaultInterfaceName,
DefaultExchangeLinkName: *argDefaultExchangeLinkName,
DefaultVlanName: *argDefaultVlanName,
PodName: os.Getenv("POD_NAME"),
PodNamespace: os.Getenv("KUBE_NAMESPACE"),
PodNicType: *argPodNicType,
PodDefaultFipType: *argPodDefaultFipType,
EnableLb: *argEnableLb,
EnableNP: *argEnableNP,
EnableEipSnat: *argEnableEipSnat,
EnableExternalVpc: *argEnableExternalVpc,
ExternalGatewayConfigNS: *argExternalGatewayConfigNS,
ExternalGatewaySwitch: *argExternalGatewaySwitch,
ExternalGatewayNet: *argExternalGatewayNet,
ExternalGatewayVlanID: *argExternalGatewayVlanID,
EnableEcmp: *argEnableEcmp,
EnableKeepVmIP: *argKeepVmIP,
NodePgProbeTime: *argNodePgProbeTime,
GCInterval: *argGCInterval,
InspectInterval: *argInspectInterval,
EnableLbSvc: *argEnableLbSvc,
EnableMetrics: *argEnableMetrics,
}

if config.NetworkType == util.NetworkTypeVlan && config.DefaultHostInterface == "" {
Expand Down
65 changes: 29 additions & 36 deletions pkg/controller/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,8 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
var LbIPs []string
if vip, ok := svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok {
LbIPs = []string{vip}
} else {
LbIPs = svc.Spec.ClusterIPs
if len(LbIPs) == 0 && svc.Spec.ClusterIP != "" && svc.Spec.ClusterIP != v1.ClusterIPNone {
LbIPs = []string{svc.Spec.ClusterIP}
}
if len(LbIPs) == 0 || LbIPs[0] == v1.ClusterIPNone {
return nil
}
} else if LbIPs = util.ServiceClusterIPs(*svc); len(LbIPs) == 0 {
return nil
}

pods, err := c.podsLister.Pods(namespace).List(labels.Set(svc.Spec.Selector).AsSelector())
Expand Down Expand Up @@ -175,43 +169,42 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
}
}

tcpLb, udpLb := vpc.Status.TcpLoadBalancer, vpc.Status.UdpLoadBalancer
tcpLb, udpLb, sctpLb := vpc.Status.TcpLoadBalancer, vpc.Status.UdpLoadBalancer, vpc.Status.SctpLoadBalancer
oldTcpLb, oldUdpLb, oldSctpLb := vpc.Status.TcpSessionLoadBalancer, vpc.Status.UdpSessionLoadBalancer, vpc.Status.SctpSessionLoadBalancer
if svc.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
tcpLb, udpLb = vpc.Status.TcpSessionLoadBalancer, vpc.Status.UdpSessionLoadBalancer
tcpLb, udpLb, sctpLb, oldTcpLb, oldUdpLb, oldSctpLb = oldTcpLb, oldUdpLb, oldSctpLb, tcpLb, udpLb, sctpLb
}

for _, settingIP := range LbIPs {
for _, port := range svc.Spec.Ports {
var lb, oldLb string
switch port.Protocol {
case v1.ProtocolTCP:
lb, oldLb = tcpLb, oldTcpLb
case v1.ProtocolUDP:
lb, oldLb = udpLb, oldUdpLb
case v1.ProtocolSCTP:
lb, oldLb = sctpLb, oldSctpLb
}

vip := util.JoinHostPort(settingIP, port.Port)
backends := getServicePortBackends(ep, pods, port, settingIP)
if port.Protocol == v1.ProtocolTCP {
// for performance reason delete lb with no backends
if len(backends) != 0 {
err = c.ovnLegacyClient.CreateLoadBalancerRule(tcpLb, vip, backends, string(port.Protocol))
if err != nil {
klog.Errorf("failed to update vip %s to tcp lb, %v", vip, err)
return err
}
} else {
err = c.ovnLegacyClient.DeleteLoadBalancerVip(vip, tcpLb)
if err != nil {
klog.Errorf("failed to delete vip %s at tcp lb, %v", vip, err)
return err
}

// for performance reason delete lb with no backends
if len(backends) != 0 {
err = c.ovnLegacyClient.CreateLoadBalancerRule(lb, vip, backends, string(port.Protocol))
if err != nil {
klog.Errorf("failed to add vip %s with backends %s to LB %s: %v", vip, backends, lb, err)
return err
}
} else {
if len(backends) != 0 {
err = c.ovnLegacyClient.CreateLoadBalancerRule(udpLb, vip, backends, string(port.Protocol))
if err != nil {
klog.Errorf("failed to update vip %s to udp lb, %v", vip, err)
return err
}
} else {
err = c.ovnLegacyClient.DeleteLoadBalancerVip(vip, udpLb)
if err != nil {
klog.Errorf("failed to delete vip %s at udp lb, %v", vip, err)
return err
}
if err = c.ovnLegacyClient.DeleteLoadBalancerVip(vip, lb); err != nil {
klog.Errorf("failed to delete vip %s from LB %s: %v", vip, lb, err)
return err
}
if err = c.ovnLegacyClient.DeleteLoadBalancerVip(vip, oldLb); err != nil {
klog.Errorf("failed to delete vip %s from LB %s: %v", vip, oldLb, err)
return err
}
}
}
Expand Down

0 comments on commit f7156c9

Please sign in to comment.