Skip to content

Commit

Permalink
feat: vpc load balancer
Browse files Browse the repository at this point in the history
  • Loading branch information
fanriming committed Jun 19, 2021
1 parent cfabf16 commit fde8991
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 112 deletions.
14 changes: 12 additions & 2 deletions pkg/controller/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,19 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
return nil
}

tcpLb, udpLb := c.config.ClusterTcpLoadBalancer, c.config.ClusterUdpLoadBalancer
vpcName := svc.Annotations[util.VpcAnnotation]
if vpcName == "" {
vpcName = util.DefaultVpc
}
vpc, err := c.vpcsLister.Get(vpcName)
if err != nil {
klog.Errorf("failed to get vpc %s of lb, %v", vpcName, err)
return err
}

tcpLb, udpLb := vpc.Status.TcpLoadBalancer, vpc.Status.UdpLoadBalancer
if svc.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
tcpLb, udpLb = c.config.ClusterTcpSessionLoadBalancer, c.config.ClusterUdpSessionLoadBalancer
tcpLb, udpLb = vpc.Status.TcpSessionLoadBalancer, vpc.Status.UdpSessionLoadBalancer
}

for _, port := range svc.Spec.Ports {
Expand Down
136 changes: 96 additions & 40 deletions pkg/controller/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,65 +298,121 @@ func (c *Controller) gcLoadBalancer() error {
}
}

lbUuid, err = c.ovnClient.FindLoadbalancer(c.config.ClusterTcpSessionLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
}
vips, err = c.ovnClient.GetLoadBalancerVips(lbUuid)
vpcs, err := c.vpcsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get tcp session lb vips %v", err)
klog.Errorf("failed to list vpc, %v", err)
return err
}
for vip := range vips {
if !util.IsStringIn(vip, tcpSessionVips) {
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpSessionLoadBalancer)
var vpcLbs []string
for _, vpc := range vpcs {
tcpLb, udpLb := vpc.Status.TcpLoadBalancer, vpc.Status.UdpLoadBalancer
tcpSessLb, udpSessLb := vpc.Status.TcpSessionLoadBalancer, vpc.Status.UdpSessionLoadBalancer
vpcLbs = append(vpcLbs, tcpLb, udpLb, tcpSessLb, udpSessLb)

if tcpLb != "" {
lbUuid, err := c.ovnClient.FindLoadbalancer(tcpLb)
if err != nil {
klog.Errorf("failed to get lb %v", err)
}
vips, err := c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to delete vip %s from tcp session lb, %v", vip, err)
klog.Errorf("failed to get tcp lb vips %v", err)
return err
}
for vip := range vips {
if !util.IsStringIn(vip, tcpVips) {
err := c.ovnClient.DeleteLoadBalancerVip(vip, tcpLb)
if err != nil {
klog.Errorf("failed to delete vip %s from tcp lb %s, %v", vip, tcpLb, err)
return err
}
}
}
}
}

lbUuid, err = c.ovnClient.FindLoadbalancer(c.config.ClusterUdpLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
return err
}
vips, err = c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get udp lb vips %v", err)
return err
}
for vip := range vips {
if !util.IsStringIn(vip, udpVips) {
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpLoadBalancer)
if tcpSessLb != "" {
lbUuid, err := c.ovnClient.FindLoadbalancer(tcpSessLb)
if err != nil {
klog.Errorf("failed to delete vip %s from tcp lb, %v", vip, err)
klog.Errorf("failed to get lb %v", err)
}
vips, err := c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get tcp session lb vips %v", err)
return err
}
for vip := range vips {
if !util.IsStringIn(vip, tcpSessionVips) {
err := c.ovnClient.DeleteLoadBalancerVip(vip, tcpSessLb)
if err != nil {
klog.Errorf("failed to delete vip %s from tcp session lb %s, %v", vip, tcpSessLb, err)
return err
}
}
}
}
}

lbUuid, err = c.ovnClient.FindLoadbalancer(c.config.ClusterUdpSessionLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
return err
}
vips, err = c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get udp session lb vips %v", err)
return err
}
for vip := range vips {
if !util.IsStringIn(vip, udpSessionVips) {
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpSessionLoadBalancer)
if udpLb != "" {
lbUuid, err := c.ovnClient.FindLoadbalancer(udpLb)
if err != nil {
klog.Errorf("failed to delete vip %s from udp session lb, %v", vip, err)
klog.Errorf("failed to get lb %v", err)
return err
}
vips, err := c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get udp lb vips %v", err)
return err
}
for vip := range vips {
if !util.IsStringIn(vip, udpVips) {
err := c.ovnClient.DeleteLoadBalancerVip(vip, udpLb)
if err != nil {
klog.Errorf("failed to delete vip %s from tcp lb %s, %v", vip, udpLb, err)
return err
}
}
}
}

if udpSessLb != "" {
lbUuid, err := c.ovnClient.FindLoadbalancer(udpSessLb)
if err != nil {
klog.Errorf("failed to get lb %v", err)
return err
}
vips, err := c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get udp session lb vips %v", err)
return err
}
for vip := range vips {
if !util.IsStringIn(vip, udpSessionVips) {
err := c.ovnClient.DeleteLoadBalancerVip(vip, udpSessLb)
if err != nil {
klog.Errorf("failed to delete vip %s from udp session lb %s, %v", vip, udpSessLb, err)
return err
}
}
}
}
}

ovnLbs, err := c.ovnClient.ListLoadBalancer()
if err != nil {
klog.Errorf("failed to list load balancer, %v", err)
return err
}

klog.Infof("vpcLbs: %v", vpcLbs)
klog.Infof("ovnLbs: %v", ovnLbs)
for _, lb := range ovnLbs {
if util.ContainsString(vpcLbs, lb) {
continue
}
klog.Infof("start to destroy load balancer %s", lb)
if err := c.ovnClient.DeleteLoadBalancer(lb); err != nil {
return err
}
}
return nil
}

Expand Down
112 changes: 67 additions & 45 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,66 +179,88 @@ func (c *Controller) initClusterRouter() error {

// InitLoadBalancer init the default tcp and udp cluster loadbalancer
func (c *Controller) initLoadBalancer() error {
tcpLb, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpLoadBalancer)
vpcs, err := c.vpcsLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("failed to find tcp lb %v", err)
klog.Errorf("failed to list vpc, %v", err)
return err
}
if tcpLb == "" {
klog.Infof("init cluster tcp load balancer %s", c.config.ClusterTcpLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(c.config.ClusterTcpLoadBalancer, util.ProtocolTCP, "")

for _, vpc := range vpcs {
vpcLb := c.GenVpcLoadBalancer(vpc.Name)

tcpLb, err := c.ovnClient.FindLoadbalancer(vpcLb.TcpLoadBalancer)
if err != nil {
klog.Errorf("failed to crate cluster tcp load balancer %v", err)
return err
return fmt.Errorf("failed to find tcp lb %v", err)
}
if tcpLb == "" {
klog.Infof("init cluster tcp load balancer %s", vpcLb.TcpLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(vpcLb.TcpLoadBalancer, util.ProtocolTCP, "")
if err != nil {
klog.Errorf("failed to crate cluster tcp load balancer %v", err)
return err
}
} else {
klog.Infof("tcp load balancer %s exists", tcpLb)
}
} else {
klog.Infof("tcp load balancer %s exists", tcpLb)
}

tcpSessionLb, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpSessionLoadBalancer)
if err != nil {
return fmt.Errorf("failed to find tcp session lb %v", err)
}
if tcpSessionLb == "" {
klog.Infof("init cluster tcp session load balancer %s", c.config.ClusterTcpSessionLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(c.config.ClusterTcpSessionLoadBalancer, util.ProtocolTCP, "ip_src")
tcpSessionLb, err := c.ovnClient.FindLoadbalancer(vpcLb.TcpSessLoadBalancer)
if err != nil {
klog.Errorf("failed to crate cluster tcp session load balancer %v", err)
return err
return fmt.Errorf("failed to find tcp session lb %v", err)
}
if tcpSessionLb == "" {
klog.Infof("init cluster tcp session load balancer %s", vpcLb.TcpSessLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(vpcLb.TcpSessLoadBalancer, util.ProtocolTCP, "ip_src")
if err != nil {
klog.Errorf("failed to crate cluster tcp session load balancer %v", err)
return err
}
} else {
klog.Infof("tcp session load balancer %s exists", vpcLb.TcpSessLoadBalancer)
}
} else {
klog.Infof("tcp session load balancer %s exists", tcpSessionLb)
}

udpLb, err := c.ovnClient.FindLoadbalancer(c.config.ClusterUdpLoadBalancer)
if err != nil {
return fmt.Errorf("failed to find udp lb %v", err)
}
if udpLb == "" {
klog.Infof("init cluster udp load balancer %s", c.config.ClusterUdpLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(c.config.ClusterUdpLoadBalancer, util.ProtocolUDP, "")
udpLb, err := c.ovnClient.FindLoadbalancer(vpcLb.UdpLoadBalancer)
if err != nil {
klog.Errorf("failed to crate cluster udp load balancer %v", err)
return err
return fmt.Errorf("failed to find udp lb %v", err)
}
if udpLb == "" {
klog.Infof("init cluster udp load balancer %s", vpcLb.UdpLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(vpcLb.UdpLoadBalancer, util.ProtocolUDP, "")
if err != nil {
klog.Errorf("failed to crate cluster udp load balancer %v", err)
return err
}
} else {
klog.Infof("udp load balancer %s exists", udpLb)
}
} else {
klog.Infof("udp load balancer %s exists", udpLb)
}

udpSessionLb, err := c.ovnClient.FindLoadbalancer(c.config.ClusterUdpSessionLoadBalancer)
if err != nil {
return fmt.Errorf("failed to find udp session lb %v", err)
}
if udpSessionLb == "" {
klog.Infof("init cluster udp session load balancer %s", c.config.ClusterUdpSessionLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(c.config.ClusterUdpSessionLoadBalancer, util.ProtocolUDP, "ip_src")
udpSessionLb, err := c.ovnClient.FindLoadbalancer(vpcLb.UdpSessLoadBalancer)
if err != nil {
return fmt.Errorf("failed to find udp session lb %v", err)
}
if udpSessionLb == "" {
klog.Infof("init cluster udp session load balancer %s", vpcLb.UdpSessLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(vpcLb.UdpSessLoadBalancer, util.ProtocolUDP, "ip_src")
if err != nil {
klog.Errorf("failed to crate cluster udp session load balancer %v", err)
return err
}
} else {
klog.Infof("udp session load balancer %s exists", vpcLb.UdpSessLoadBalancer)
}

vpc.Status.TcpLoadBalancer = vpcLb.TcpLoadBalancer
vpc.Status.TcpSessionLoadBalancer = vpcLb.TcpSessLoadBalancer
vpc.Status.UdpLoadBalancer = vpcLb.UdpLoadBalancer
vpc.Status.UdpSessionLoadBalancer = vpcLb.UdpSessLoadBalancer
bytes, err := vpc.Status.Bytes()
if err != nil {
return err
}
_, err = c.config.KubeOvnClient.KubeovnV1().Vpcs().Patch(context.Background(), vpc.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status")
if err != nil {
klog.Errorf("failed to crate cluster udp session load balancer %v", err)
return err
}
} else {
klog.Infof("udp session load balancer %s exists", udpSessionLb)
}

return nil
}

Expand Down
23 changes: 16 additions & 7 deletions pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,21 @@ func (c *Controller) handleUpdateService(key string) error {

tcpVips := []string{}
udpVips := []string{}
tcpLb, udpLb := c.config.ClusterTcpLoadBalancer, c.config.ClusterUdpLoadBalancer
oTcpLb, oUdpLb := c.config.ClusterTcpSessionLoadBalancer, c.config.ClusterUdpSessionLoadBalancer
if svc.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
tcpLb, udpLb = c.config.ClusterTcpSessionLoadBalancer, c.config.ClusterUdpSessionLoadBalancer
oTcpLb, oUdpLb = c.config.ClusterTcpLoadBalancer, c.config.ClusterUdpLoadBalancer
vpcName := svc.Annotations[util.VpcAnnotation]
if vpcName == "" {
vpcName = util.DefaultVpc
}
vpc, err := c.vpcsLister.Get(vpcName)
if err != nil {
klog.Errorf("failed to get vpc %s of lb, %v", vpcName, err)
return err
}

tcpLb, udpLb := vpc.Status.TcpLoadBalancer, vpc.Status.UdpLoadBalancer
oTcpLb, oUdpLb := vpc.Status.TcpSessionLoadBalancer, vpc.Status.UdpSessionLoadBalancer
if svc.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
tcpLb, udpLb = vpc.Status.TcpSessionLoadBalancer, vpc.Status.UdpSessionLoadBalancer
oTcpLb, oUdpLb = vpc.Status.TcpLoadBalancer, vpc.Status.UdpLoadBalancer
}

for _, port := range svc.Spec.Ports {
Expand Down Expand Up @@ -254,7 +263,7 @@ func (c *Controller) handleUpdateService(key string) error {
return err
}
if _, ok := vips[vip]; !ok {
klog.Infof("add vip %s to tcp lb", vip)
klog.Infof("add vip %s to tcp lb %s", vip, oTcpLb)
c.updateEndpointQueue.Add(key)
break
}
Expand Down Expand Up @@ -288,7 +297,7 @@ func (c *Controller) handleUpdateService(key string) error {
return err
}
if _, ok := vips[vip]; !ok {
klog.Infof("add vip %s to udp lb", vip)
klog.Infof("add vip %s to udp lb %s", vip, oUdpLb)
c.updateEndpointQueue.Add(key)
break
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,13 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
}
}

if subnet.Name != c.config.NodeSwitch {
if err := c.ovnClient.AddLbToLogicalSwitch(vpc.Status.TcpLoadBalancer, vpc.Status.TcpSessionLoadBalancer, vpc.Status.UdpLoadBalancer, vpc.Status.UdpSessionLoadBalancer, subnet.Name); err != nil {
c.patchSubnetStatus(subnet, "AddLbToLogicalSwitchFailed", err.Error())
return err
}
}

if err := c.reconcileSubnet(subnet); err != nil {
klog.Errorf("reconcile subnet for %s failed, %v", subnet.Name, err)
return err
Expand Down

0 comments on commit fde8991

Please sign in to comment.