Skip to content

Commit

Permalink
feat: support announce service ip
Browse files Browse the repository at this point in the history
  • Loading branch information
withlin committed May 7, 2021
1 parent 7a20fbf commit d176dac
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 37 deletions.
61 changes: 32 additions & 29 deletions pkg/speaker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ const (
)

type Configuration struct {
GrpcHost string
GrpcPort uint32
ClusterAs uint32
RouterId string
NeighborAddress string
NeighborAs uint32
AuthPassword string
HoldTime float64
BgpServer *gobgp.BgpServer
GrpcHost string
GrpcPort uint32
ClusterAs uint32
RouterId string
NeighborAddress string
NeighborAs uint32
AuthPassword string
HoldTime float64
BgpServer *gobgp.BgpServer
AnnounceClusterIP bool

KubeConfigFile string
KubeClient kubernetes.Interface
Expand All @@ -47,16 +48,17 @@ type Configuration struct {

func ParseFlags() (*Configuration, error) {
var (
argGrpcHost = pflag.String("grpc-host", "127.0.0.1", "The host address for grpc to listen, default: 127.0.0.1")
argGrpcPort = pflag.Uint32("grpc-port", DefaultBGPGrpcPort, "The port for grpc to listen, default:50051")
argClusterAs = pflag.Uint32("cluster-as", DefaultBGPClusterAs, "The as number of container network, default 65000")
argRouterId = pflag.String("router-id", "", "The address for the speaker to use as router id, default the node ip")
argNeighborAddress = pflag.String("neighbor-address", "", "The router address the speaker connects to.")
argNeighborAs = pflag.Uint32("neighbor-as", DefaultBGPNeighborAs, "The router as number, default 65001")
argAuthPassword = pflag.String("auth-password", "", "bgp peer auth password")
argHoldTime = pflag.Duration("holdtime", DefaultBGPHoldtime, "ovn-speaker goes down abnormally, the local saving time of BGP route will be affected.Holdtime must be in the range 3s to 65536s. (default 90s)")
argPprofPort = pflag.Uint32("pprof-port", DefaultPprofPort, "The port to get profiling data, default: 10667")
argKubeConfigFile = pflag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information. If not set use the inCluster token.")
argAnnounceClusterIP = pflag.BoolP("announce-cluster-ip", "", false, "The Cluster IP of the service to announce to the BGP peers.")
argGrpcHost = pflag.String("grpc-host", "127.0.0.1", "The host address for grpc to listen, default: 127.0.0.1")
argGrpcPort = pflag.Uint32("grpc-port", DefaultBGPGrpcPort, "The port for grpc to listen, default:50051")
argClusterAs = pflag.Uint32("cluster-as", DefaultBGPClusterAs, "The as number of container network, default 65000")
argRouterId = pflag.String("router-id", "", "The address for the speaker to use as router id, default the node ip")
argNeighborAddress = pflag.String("neighbor-address", "", "The router address the speaker connects to.")
argNeighborAs = pflag.Uint32("neighbor-as", DefaultBGPNeighborAs, "The router as number, default 65001")
argAuthPassword = pflag.String("auth-password", "", "bgp peer auth password")
argHoldTime = pflag.Duration("holdtime", DefaultBGPHoldtime, "ovn-speaker goes down abnormally, the local saving time of BGP route will be affected.Holdtime must be in the range 3s to 65536s. (default 90s)")
argPprofPort = pflag.Uint32("pprof-port", DefaultPprofPort, "The port to get profiling data, default: 10667")
argKubeConfigFile = pflag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information. If not set use the inCluster token.")
)

klogFlags := flag.NewFlagSet("klog", flag.ExitOnError)
Expand All @@ -83,16 +85,17 @@ func ParseFlags() (*Configuration, error) {
}

config := &Configuration{
GrpcHost: *argGrpcHost,
GrpcPort: *argGrpcPort,
ClusterAs: *argClusterAs,
RouterId: *argRouterId,
NeighborAddress: *argNeighborAddress,
NeighborAs: *argNeighborAs,
AuthPassword: *argAuthPassword,
HoldTime: ht,
PprofPort: *argPprofPort,
KubeConfigFile: *argKubeConfigFile,
AnnounceClusterIP: *argAnnounceClusterIP,
GrpcHost: *argGrpcHost,
GrpcPort: *argGrpcPort,
ClusterAs: *argClusterAs,
RouterId: *argRouterId,
NeighborAddress: *argNeighborAddress,
NeighborAs: *argNeighborAs,
AuthPassword: *argAuthPassword,
HoldTime: ht,
PprofPort: *argPprofPort,
KubeConfigFile: *argKubeConfigFile,
}

if config.RouterId == "" {
Expand Down
22 changes: 14 additions & 8 deletions pkg/speaker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ const controllerAgentName = "ovn-speaker"
type Controller struct {
config *Configuration

podsLister listerv1.PodLister
podsSynced cache.InformerSynced
subnetsLister kubeovnlister.SubnetLister
subnetSynced cache.InformerSynced
podsLister listerv1.PodLister
podsSynced cache.InformerSynced
subnetsLister kubeovnlister.SubnetLister
subnetSynced cache.InformerSynced
servicesLister listerv1.ServiceLister
servicesSynced cache.InformerSynced

informerFactory kubeinformers.SharedInformerFactory
kubeovnInformerFactory kubeovninformer.SharedInformerFactory
Expand All @@ -54,13 +56,17 @@ func NewController(config *Configuration) *Controller {

podInformer := informerFactory.Core().V1().Pods()
subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
serviceInformer := informerFactory.Core().V1().Services()

controller := &Controller{
config: config,

podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
subnetsLister: subnetInformer.Lister(),
subnetSynced: subnetInformer.Informer().HasSynced,
podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
subnetsLister: subnetInformer.Lister(),
subnetSynced: subnetInformer.Informer().HasSynced,
servicesLister: serviceInformer.Lister(),
servicesSynced: serviceInformer.Informer().HasSynced,

informerFactory: informerFactory,
kubeovnInformerFactory: kubeovnInformerFactory,
Expand Down
21 changes: 21 additions & 0 deletions pkg/speaker/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func isPodAlive(p *v1.Pod) bool {
return true
}

func isClusterIP(svc *v1.Service) bool {
return svc.Spec.Type == "ClusterIP"
}

// TODO: ipv4 only, need ipv6/dualstack support later
func (c *Controller) syncSubnetRoutes() {
bgpExpected, bgpExists := []string{}, []string{}
Expand All @@ -47,6 +51,22 @@ func (c *Controller) syncSubnetRoutes() {
return
}

if c.config.AnnounceClusterIP {
services, err := c.servicesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list services, %v", err)
return
}
for _, svc := range services {

if isClusterIP(svc) && svc.Annotations[util.BgpAnnotation] == "true" && svc.Spec.ClusterIP != "None" &&
svc.Spec.ClusterIP != "" {
bgpExpected = append(bgpExpected, fmt.Sprintf("%s/32", svc.Spec.ClusterIP))
}

}
}

for _, subnet := range subnets {
if subnet.Status.IsReady() && subnet.Annotations != nil && subnet.Annotations[util.BgpAnnotation] == "true" {
bgpExpected = append(bgpExpected, subnet.Spec.CIDRBlock)
Expand All @@ -58,6 +78,7 @@ func (c *Controller) syncSubnetRoutes() {
bgpExpected = append(bgpExpected, fmt.Sprintf("%s/32", pod.Status.PodIP))
}
}

klog.V(5).Infof("expected routes %v", bgpExpected)
listPathRequest := &bgpapi.ListPathRequest{
TableType: bgpapi.TableType_GLOBAL,
Expand Down

0 comments on commit d176dac

Please sign in to comment.