Skip to content

Commit

Permalink
add support of user-defined endpoints to SwitchLBRule (#2777)
Browse files Browse the repository at this point in the history
* add support of user-defined endpoints to SwitchLBRule

* Update pkg/controller/switch_lb_rule.go

Co-authored-by: 张祖建 <zhangzujian.7@gmail.com>

---------

Co-authored-by: 夜微澜 <qiutingjun@cmss.chinamobile.com>
Co-authored-by: 张祖建 <zhangzujian.7@gmail.com>
  • Loading branch information
3 people committed May 15, 2023
1 parent 74221a6 commit 781b47d
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 49 deletions.
4 changes: 4 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ spec:
items:
type: string
type: array
endpoints:
items:
type: string
type: array
status:
type: object
properties:
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/kubeovn/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,7 @@ type SwitchLBRuleSpec struct {
Vip string `json:"vip"`
Namespace string `json:"namespace"`
Selector []string `json:"selector"`
Endpoints []string `json:"endpoints"`
SessionAffinity string `json:"sessionAffinity,omitempty"`
Ports []SlrPort `json:"ports"`
}
Expand Down
198 changes: 149 additions & 49 deletions pkg/controller/switch_lb_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type slrInfo struct {
IsRecreate bool
}

func genSvcName(name string) string {
func generateSvcName(name string) string {
return fmt.Sprintf("slr-%s", name)
}

Expand Down Expand Up @@ -127,36 +127,78 @@ func (c *Controller) runAddSwitchLBRuleWorker() {

func (c *Controller) handleAddOrUpdateSwitchLBRule(key string) error {
klog.V(3).Infof("handleAddOrUpdateSwitchLBRule %s", key)
slr, err := c.switchLBRuleLister.Get(key)
if err != nil {

var (
slr *kubeovnv1.SwitchLBRule
oldSvc *corev1.Service
oldEps *corev1.Endpoints
svcName string
needToCreateSvc, needToCreateEps bool
err error
)

if slr, err = c.switchLBRuleLister.Get(key); err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}

needToCreate := false
name := genSvcName(slr.Name)
oldSvc, err := c.servicesLister.Services(slr.Spec.Namespace).Get(name)
if err != nil {
svcName = generateSvcName(slr.Name)
if oldSvc, err = c.servicesLister.Services(slr.Spec.Namespace).Get(svcName); err != nil {
if k8serrors.IsNotFound(err) {
needToCreate = true
needToCreateSvc = true
needToCreateEps = true
} else {
klog.Errorf("failed to create service '%s', err: %v", name, err)
klog.Errorf("failed to create service '%s', err: %v", svcName, err)
return err
}
}

svc := genHeadlessService(slr, oldSvc)
if needToCreate {
if oldEps, err = c.config.KubeClient.CoreV1().Endpoints(slr.Spec.Namespace).Get(context.Background(), svcName, metav1.GetOptions{}); err != nil {
if k8serrors.IsNotFound(err) {
needToCreateEps = true
} else {
klog.Errorf("failed to get service endpoints '%s', err: %v", svcName, err)
return err
}
}

var (
eps *corev1.Endpoints
svc *corev1.Service
)

// user-defined endpoints used to work with the case of static ips which could not get by selector
if len(slr.Spec.Endpoints) > 0 {
eps = generateEndpoints(slr, oldEps)
if needToCreateEps {
if _, err = c.config.KubeClient.CoreV1().Endpoints(slr.Spec.Namespace).Create(context.Background(), eps, metav1.CreateOptions{}); err != nil {
err = fmt.Errorf("failed to create endpoints '%s', err: %v", eps, err)
klog.Error(err)
return err
}
} else {
if _, err = c.config.KubeClient.CoreV1().Endpoints(slr.Spec.Namespace).Update(context.Background(), eps, metav1.UpdateOptions{}); err != nil {
err = fmt.Errorf("failed to update endpoints '%s', err: %v", eps, err)
klog.Error(err)
return err
}
}
// avoid conflicts between selectors and user-defined endpoints
slr.Spec.Selector = nil
}

svc = generateHeadlessService(slr, oldSvc)
if needToCreateSvc {
if _, err = c.config.KubeClient.CoreV1().Services(slr.Spec.Namespace).Create(context.Background(), svc, metav1.CreateOptions{}); err != nil {
err := fmt.Errorf("failed to create service '%s', err: %v", svc, err)
err = fmt.Errorf("failed to create service '%s', err: %v", svc, err)
klog.Error(err)
return err
}
} else {
if _, err = c.config.KubeClient.CoreV1().Services(slr.Spec.Namespace).Update(context.Background(), svc, metav1.UpdateOptions{}); err != nil {
err := fmt.Errorf("failed to update service '%s', err: %v", svc, err)
err = fmt.Errorf("failed to update service '%s', err: %v", svc, err)
klog.Error(err)
return err
}
Expand All @@ -172,21 +214,20 @@ func (c *Controller) handleAddOrUpdateSwitchLBRule(key string) error {
}
formatPorts = fmt.Sprintf("%s,%d/%s", formatPorts, port.Port, protocol)
}

newSlr.Status.Ports = strings.TrimPrefix(formatPorts, ",")

if _, err = c.config.KubeOvnClient.KubeovnV1().SwitchLBRules().UpdateStatus(context.Background(), newSlr, metav1.UpdateOptions{}); err != nil {
err := fmt.Errorf("failed to update switch lb rule status, %v", err)
err = fmt.Errorf("failed to update switch lb rule status, %v", err)
klog.Error(err)
return err
}

return nil
}

func (c *Controller) handleDelSwitchLBRule(info *slrInfo) error {
klog.V(3).Infof("handleDelSwitchLBRule %s", info.Name)

name := genSvcName(info.Name)
name := generateSvcName(info.Name)
err := c.config.KubeClient.CoreV1().Services(info.Namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -214,10 +255,23 @@ func (c *Controller) handleUpdateSwitchLBRule(info *slrInfo) error {
return nil
}

func genHeadlessService(slr *kubeovnv1.SwitchLBRule, oldSvc *corev1.Service) *corev1.Service {
name := genSvcName(slr.Name)
func generateHeadlessService(slr *kubeovnv1.SwitchLBRule, oldSvc *corev1.Service) *corev1.Service {
var (
name string
newSvc *corev1.Service
ports []corev1.ServicePort
selectors map[string]string
)

selectors = make(map[string]string, 0)
for _, s := range slr.Spec.Selector {
keyValue := strings.Split(strings.TrimSpace(s), ":")
if len(keyValue) != 2 {
continue
}
selectors[strings.TrimSpace(keyValue[0])] = strings.TrimSpace(keyValue[1])
}

var ports []corev1.ServicePort
for _, port := range slr.Spec.Ports {
ports = append(ports, corev1.ServicePort{
Name: port.Name,
Expand All @@ -230,39 +284,85 @@ func genHeadlessService(slr *kubeovnv1.SwitchLBRule, oldSvc *corev1.Service) *co
})
}

selectors := make(map[string]string)
for _, s := range slr.Spec.Selector {
keyValue := strings.Split(strings.TrimSpace(s), ":")
if len(keyValue) != 2 {
continue
name = generateSvcName(slr.Name)
if oldSvc != nil {
newSvc = oldSvc.DeepCopy()
newSvc.Name = name
newSvc.Namespace = slr.Spec.Namespace
newSvc.Annotations[util.SwitchLBRuleVipsAnnotation] = slr.Spec.Vip
newSvc.Spec.Ports = ports
newSvc.Spec.Selector = selectors
newSvc.Spec.SessionAffinity = corev1.ServiceAffinity(slr.Spec.SessionAffinity)
} else {
newSvc = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: slr.Spec.Namespace,
Annotations: map[string]string{util.SwitchLBRuleVipsAnnotation: slr.Spec.Vip},
},
Spec: corev1.ServiceSpec{
Ports: ports,
Selector: selectors,
ClusterIP: corev1.ClusterIPNone,
Type: corev1.ServiceTypeClusterIP,
SessionAffinity: corev1.ServiceAffinity(slr.Spec.SessionAffinity),
},
}
selectors[strings.TrimSpace(keyValue[0])] = strings.TrimSpace(keyValue[1])

}
return newSvc
}

var resourceVersion string
annotations := map[string]string{}
if oldSvc != nil {
for k, v := range oldSvc.Annotations {
annotations[k] = v
}
resourceVersion = oldSvc.ResourceVersion
func generateEndpoints(slr *kubeovnv1.SwitchLBRule, oldEps *corev1.Endpoints) *corev1.Endpoints {
var (
name string
newEps *corev1.Endpoints
ports []corev1.EndpointPort
addrs []corev1.EndpointAddress
subsets []corev1.EndpointSubset
)

for _, port := range slr.Spec.Ports {
ports = append(
ports,
corev1.EndpointPort{
Name: port.Name,
Protocol: corev1.Protocol(port.Protocol),
Port: port.TargetPort,
},
)
}
annotations[util.SwitchLBRuleVipsAnnotation] = slr.Spec.Vip

svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: slr.Spec.Namespace,
Annotations: annotations,
ResourceVersion: resourceVersion,
},
Spec: corev1.ServiceSpec{
Ports: ports,
Selector: selectors,
ClusterIP: corev1.ClusterIPNone,
Type: corev1.ServiceTypeClusterIP,
SessionAffinity: corev1.ServiceAffinity(slr.Spec.SessionAffinity),

for _, addr := range slr.Spec.Endpoints {
addrs = append(
addrs,
corev1.EndpointAddress{
IP: addr,
},
)
}

subsets = []corev1.EndpointSubset{
{
Addresses: addrs,
Ports: ports,
},
}
return svc

name = generateSvcName(slr.Name)
if oldEps != nil {
newEps = oldEps.DeepCopy()
newEps.Name = name
newEps.Namespace = slr.Spec.Namespace
newEps.Subsets = subsets
} else {
newEps = &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: slr.Spec.Namespace,
},
Subsets: subsets,
}
}
return newEps
}

0 comments on commit 781b47d

Please sign in to comment.