Skip to content

Commit

Permalink
fix LB in dual stack cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzujian committed Nov 2, 2021
1 parent 3773bed commit 1857130
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 43 deletions.
107 changes: 64 additions & 43 deletions pkg/controller/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
)

Expand Down Expand Up @@ -115,8 +114,11 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
return err
}

clusterIP := svc.Spec.ClusterIP
if clusterIP == "" || clusterIP == v1.ClusterIPNone {
clusterIPs := svc.Spec.ClusterIPs
if len(clusterIPs) == 0 && svc.Spec.ClusterIP != "" && svc.Spec.ClusterIP != v1.ClusterIPNone {
clusterIPs = []string{svc.Spec.ClusterIP}
}
if len(clusterIPs) == 0 {
return nil
}

Expand Down Expand Up @@ -177,51 +179,49 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
tcpLb, udpLb = vpc.Status.TcpSessionLoadBalancer, vpc.Status.UdpSessionLoadBalancer
}

for _, port := range svc.Spec.Ports {
var vip string
if util.CheckProtocol(clusterIP) == kubeovnv1.ProtocolIPv6 {
vip = fmt.Sprintf("[%s]:%d", clusterIP, port.Port)
} else {
vip = fmt.Sprintf("%s:%d", clusterIP, port.Port)
}

backends := getServicePortBackends(ep, port, clusterIP)
if port.Protocol == v1.ProtocolTCP {
// for performance reason delete lb with no backends
if len(backends) > 0 {
err = c.ovnClient.CreateLoadBalancerRule(tcpLb, vip, backends, string(port.Protocol))
if err != nil {
klog.Errorf("failed to update vip %s to tcp lb, %v", vip, err)
return err
}
} else {
err = c.ovnClient.DeleteLoadBalancerVip(vip, tcpLb)
if err != nil {
klog.Errorf("failed to delete vip %s at tcp lb, %v", vip, err)
return err
}
}
} else {
if len(backends) > 0 {
err = c.ovnClient.CreateLoadBalancerRule(udpLb, vip, backends, string(port.Protocol))
if err != nil {
klog.Errorf("failed to update vip %s to udp lb, %v", vip, err)
return err
for _, clusterIP := range clusterIPs {
for _, port := range svc.Spec.Ports {
vip := util.JoinHostPort(clusterIP, port.Port)
backends := getServicePortBackends(ep, pods, port, clusterIP)
if port.Protocol == v1.ProtocolTCP {
// for performance reason delete lb with no backends
if len(backends) != 0 {
err = c.ovnClient.CreateLoadBalancerRule(tcpLb, vip, backends, string(port.Protocol))
if err != nil {
klog.Errorf("failed to update vip %s to tcp lb, %v", vip, err)
return err
}
} else {
err = c.ovnClient.DeleteLoadBalancerVip(vip, tcpLb)
if err != nil {
klog.Errorf("failed to delete vip %s at tcp lb, %v", vip, err)
return err
}
}
} else {
err = c.ovnClient.DeleteLoadBalancerVip(vip, udpLb)
if err != nil {
klog.Errorf("failed to delete vip %s at udp lb, %v", vip, err)
return err
if len(backends) != 0 {
err = c.ovnClient.CreateLoadBalancerRule(udpLb, vip, backends, string(port.Protocol))
if err != nil {
klog.Errorf("failed to update vip %s to udp lb, %v", vip, err)
return err
}
} else {
err = c.ovnClient.DeleteLoadBalancerVip(vip, udpLb)
if err != nil {
klog.Errorf("failed to delete vip %s at udp lb, %v", vip, err)
return err
}
}
}
}
}

return nil
}

func getServicePortBackends(endpoints *v1.Endpoints, servicePort v1.ServicePort, serviceIP string) string {
func getServicePortBackends(endpoints *v1.Endpoints, pods []*v1.Pod, servicePort v1.ServicePort, serviceIP string) string {
backends := []string{}
protocol := util.CheckProtocol(serviceIP)
for _, subset := range endpoints.Subsets {
var targetPort int32
for _, port := range subset.Ports {
Expand All @@ -235,14 +235,35 @@ func getServicePortBackends(endpoints *v1.Endpoints, servicePort v1.ServicePort,
}

for _, address := range subset.Addresses {
if util.CheckProtocol(serviceIP) == util.CheckProtocol(address.IP) {
if util.CheckProtocol(address.IP) == kubeovnv1.ProtocolIPv6 {
backends = append(backends, fmt.Sprintf("[%s]:%d", address.IP, targetPort))
} else {
backends = append(backends, fmt.Sprintf("%s:%d", address.IP, targetPort))
if address.TargetRef == nil || address.TargetRef.Kind != "Pod" {
backends = append(backends, util.JoinHostPort(address.IP, targetPort))
continue
}

var ip string
for _, pod := range pods {
if pod.Name == address.TargetRef.Name {
podIPs := pod.Status.PodIPs
if len(podIPs) == 0 && pod.Status.PodIP != "" {
podIPs = []v1.PodIP{{IP: pod.Status.PodIP}}
}
for _, podIP := range podIPs {
if util.CheckProtocol(podIP.IP) == protocol {
ip = podIP.IP
break
}
}
break
}
}
if ip == "" {
ip = address.IP
}
if ip != "" {
backends = append(backends, util.JoinHostPort(ip, targetPort))
}
}
}

return strings.Join(backends, ",")
}
4 changes: 4 additions & 0 deletions pkg/util/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,3 +438,7 @@ func GatewayContains(gatewayNodeStr, gateway string) bool {
}
return false
}

func JoinHostPort(host string, port int32) string {
return net.JoinHostPort(host, strconv.FormatInt(int64(port), 10))
}

0 comments on commit 1857130

Please sign in to comment.