/
agents.go
110 lines (93 loc) · 2.5 KB
/
agents.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package consul
import (
"crypto/tls"
"errors"
"math/rand"
"net/http"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/allegro/marathon-consul/metrics"
"github.com/allegro/marathon-consul/utils"
consulapi "github.com/hashicorp/consul/api"
)
type Agents interface {
GetAgent(agentAddress string) (agent *consulapi.Client, err error)
GetAnyAgent() (agent *Agent, err error)
RemoveAgent(agentAddress string)
}
type ConcurrentAgents struct {
agents map[string]*Agent
config *Config
lock sync.Mutex
client *http.Client
}
func NewAgents(config *Config) *ConcurrentAgents {
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: !config.SslVerify,
},
},
Timeout: config.Timeout.Duration,
}
return &ConcurrentAgents{
agents: make(map[string]*Agent),
config: config,
client: client,
}
}
func (a *ConcurrentAgents) GetAnyAgent() (*Agent, error) {
a.lock.Lock()
defer a.lock.Unlock()
if len(a.agents) > 0 {
ipAddress := a.getRandomAgentIPAddress()
return a.agents[ipAddress], nil
}
return nil, errors.New("No Consul client available in agents cache")
}
func (a *ConcurrentAgents) getRandomAgentIPAddress() string {
ipAddresses := []string{}
for ipAddress := range a.agents {
ipAddresses = append(ipAddresses, ipAddress)
}
idx := rand.Intn(len(a.agents))
return ipAddresses[idx]
}
func (a *ConcurrentAgents) RemoveAgent(agentAddress string) {
a.lock.Lock()
defer a.lock.Unlock()
if IP, err := utils.HostToIPv4(agentAddress); err != nil {
log.WithError(err).Error("Could not remove agent from cache")
} else {
ipAddress := IP.String()
log.WithField("Address", ipAddress).Info("Removing agent from cache")
delete(a.agents, ipAddress)
a.updateAgentsCacheSizeMetricValue()
}
}
func (a *ConcurrentAgents) GetAgent(agentAddress string) (*consulapi.Client, error) {
a.lock.Lock()
defer a.lock.Unlock()
IP, err := utils.HostToIPv4(agentAddress)
if err != nil {
return nil, err
}
ipAddress := IP.String()
if agent, ok := a.agents[ipAddress]; ok {
return agent.Client, nil
}
newAgent, err := a.createAgent(ipAddress)
if err != nil {
return nil, err
}
a.addAgent(ipAddress, newAgent)
return newAgent.Client, nil
}
func (a *ConcurrentAgents) addAgent(agentHost string, agent *Agent) {
a.agents[agentHost] = agent
a.updateAgentsCacheSizeMetricValue()
}
func (a *ConcurrentAgents) updateAgentsCacheSizeMetricValue() {
metrics.UpdateGauge("consul.agents.cache.size", int64(len(a.agents)))
}