Skip to content

Commit

Permalink
Make service TTL configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
shaleman committed Mar 24, 2016
1 parent b4ac39a commit d554aca
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 85 deletions.
130 changes: 72 additions & 58 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,15 @@ func TestConsulLockAcquireRelease(t *testing.T) {
}

func testLockAcquireRelease(t *testing.T, dbclient API) {
lockTTL := uint64(10)

// Create a lock
lock1, err := dbclient.NewLock("master", "hostname1", 10)
lock1, err := dbclient.NewLock("master", "hostname1", lockTTL)
if err != nil {
t.Fatal(err)
}

lock2, err := dbclient.NewLock("master", "hostname2", 10)
lock2, err := dbclient.NewLock("master", "hostname2", lockTTL)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -311,7 +313,7 @@ func testLockAcquireRelease(t *testing.T, dbclient API) {
}
}()

time.Sleep(time.Second * 2)
time.Sleep(time.Second)

log.Infof("2 timer. releasing Lock1")
// At this point, lock1 should be holding the lock
Expand All @@ -325,7 +327,7 @@ func testLockAcquireRelease(t *testing.T, dbclient API) {
t.Fatalf("Error releasing lock")
}

time.Sleep(time.Second * 2)
time.Sleep(time.Second)

log.Infof("4s timer. checking if lock2 is acquired")

Expand All @@ -352,14 +354,15 @@ func TestConsulLockAcquireTimeout(t *testing.T) {
}

func testLockAcquireTimeout(t *testing.T, dbClient API) {
log.Infof("\n=========================================================== \n")
lockTTL := uint64(10)

// Create a lock
lock1, err := dbClient.NewLock("master", "hostnamet1", 10)
lock1, err := dbClient.NewLock("master", "hostnamet1", lockTTL)
if err != nil {
t.Fatal(err)
}

lock2, err := dbClient.NewLock("master", "hostnamet2", 10)
lock2, err := dbClient.NewLock("master", "hostnamet2", lockTTL)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -400,7 +403,7 @@ func testLockAcquireTimeout(t *testing.T, dbClient API) {
}
}()

time.Sleep(5 * time.Second)
time.Sleep(3 * time.Second)

log.Infof("5sec timer. releasing Lock1\n\n")
// At this point, lock1 should be holding the lock
Expand Down Expand Up @@ -431,13 +434,15 @@ func TestConsulLockAcquireKill(t *testing.T) {
}

func testLockAcquireKill(t *testing.T, dbclient API) {
lockTTL := uint64(10)

// Create a lock
lock1, err := dbclient.NewLock("master", "hostnamek1", 10)
lock1, err := dbclient.NewLock("master", "hostnamek1", lockTTL)
if err != nil {
t.Fatal(err)
}

lock2, err := dbclient.NewLock("master", "hostnamek2", 10)
lock2, err := dbclient.NewLock("master", "hostnamek2", lockTTL)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -480,9 +485,9 @@ func testLockAcquireKill(t *testing.T, dbclient API) {
}
}()

time.Sleep(time.Second * 15)
time.Sleep(time.Second * 2)

log.Infof("20s timer. killing Lock1")
log.Infof("2s timer. killing Lock1")
// At this point, lock1 should be holding the lock
if !lock1.IsAcquired() {
t.Fatalf("Lock1 failed to acquire lock")
Expand All @@ -494,9 +499,9 @@ func testLockAcquireKill(t *testing.T, dbclient API) {
t.Fatalf("Error releasing lock")
}

time.Sleep(time.Second * 15)
time.Sleep(time.Second * time.Duration(lockTTL*2))

log.Infof("40s timer. checking if lock2 is acquired")
log.Infof("%ds timer. checking if lock2 is acquired", (lockTTL * 2))

// At this point, lock2 should be holding the lock
if !lock2.IsAcquired() {
Expand All @@ -522,14 +527,18 @@ func TestConsulServiceRegisterDeregister(t *testing.T) {
}

func testServiceRegisterDeregister(t *testing.T, dbClient API) {
srvTTL := 10

// Service info
service1Info := ServiceInfo{
ServiceName: "athena",
TTL: srvTTL,
HostAddr: "10.10.10.10",
Port: 4567,
}
service2Info := ServiceInfo{
ServiceName: "athena",
TTL: srvTTL,
HostAddr: "10.10.10.10",
Port: 4568,
}
Expand Down Expand Up @@ -557,7 +566,7 @@ func testServiceRegisterDeregister(t *testing.T, dbClient API) {
}

// Wait a while to make sure background refresh is working correctly
time.Sleep(serviceTTL * 2)
time.Sleep(time.Duration(srvTTL * 2))

resp, err = dbClient.GetService("athena")
if err != nil {
Expand Down Expand Up @@ -600,15 +609,17 @@ func TestConsulServiceRegisterMultiple(t *testing.T) {
}

func testServiceMultipleRegister(t *testing.T, dbClient API) {
srvTTL := 10
// Service info
service1Info := ServiceInfo{
ServiceName: "athena",
TTL: srvTTL,
HostAddr: "10.10.10.10",
Port: 4567,
}

// register it multiple times
for i := 0; i < 5; i++ {
for i := 0; i < 3; i++ {
if err := dbClient.RegisterService(service1Info); err != nil {
t.Fatalf("Fatal registering service. Err: %+v\n", err)
}
Expand Down Expand Up @@ -655,12 +666,21 @@ func TestConsulServiceWatch(t *testing.T) {
}

func testServiceWatch(t *testing.T, dbClient API) {
srvTTL := 10
service1Info := ServiceInfo{
ServiceName: "athena",
TTL: srvTTL,
HostAddr: "10.10.10.10",
Port: 4567,
}

service2Info := ServiceInfo{
ServiceName: "athena",
TTL: srvTTL,
HostAddr: "10.10.10.11",
Port: 4567,
}

// Create event channel
eventChan := make(chan WatchServiceEvent, 10)
stopChan := make(chan bool, 1)
Expand All @@ -676,55 +696,49 @@ func testServiceWatch(t *testing.T, dbClient API) {
}
log.Infof("Registered service: %+v", service1Info)

cnt := 1
regCount := 0
deregCount := 0
for {
select {
case srvEvent := <-eventChan:
log.Infof("\n----\nReceived event: %+v\n----", srvEvent)
if srvEvent.EventType == WatchServiceEventAdd {
regCount++
}
if srvEvent.EventType == WatchServiceEventDel {
deregCount++
}
case <-time.After(time.Millisecond * time.Duration(100)):
service2Info := ServiceInfo{
ServiceName: "athena",
HostAddr: "10.10.10.11",
Port: 4567,
}
if cnt == 1 {
// register it
if err := dbClient.RegisterService(service2Info); err != nil {
t.Fatalf("Fatal registering service. Err: %+v\n", err)
go func() {
for {
select {
case srvEvent := <-eventChan:
log.Infof("\n----\nReceived event: %+v\n----", srvEvent)
if srvEvent.EventType == WatchServiceEventAdd {
regCount++
}
log.Infof("Registered service: %+v", service2Info)
} else if cnt == 5 {
// deregister it
if err := dbClient.DeregisterService(service2Info); err != nil {
t.Fatalf("Fatal deregistering service. Err: %+v\n", err)
if srvEvent.EventType == WatchServiceEventDel {
deregCount++
}
log.Infof("Deregistered service: %+v", service2Info)
} else if cnt == 10 {
// Stop the watch
stopChan <- true
}
}
}()

if regCount != 2 {
t.Fatalf("Did not receive expected number of reg watch event for service")
}
time.Sleep(time.Millisecond * time.Duration(100))

if deregCount != 1 {
t.Fatalf("Did not receive expected number of dereg watch event for service")
}
// register it
if err := dbClient.RegisterService(service2Info); err != nil {
t.Fatalf("Fatal registering service. Err: %+v\n", err)
}
log.Infof("Registered service: %+v", service2Info)

// wait a little and exit
time.Sleep(time.Millisecond)
time.Sleep(time.Millisecond * time.Duration(300))

return
}
cnt++
}
// deregister it
if err := dbClient.DeregisterService(service2Info); err != nil {
t.Fatalf("Fatal deregistering service. Err: %+v\n", err)
}
log.Infof("Deregistered service: %+v", service2Info)

time.Sleep(time.Millisecond * time.Duration(300))

// Stop the watch
stopChan <- true

if regCount != 2 {
t.Fatalf("Did not receive expected number of reg watch event for service")
}

if deregCount != 1 {
t.Fatalf("Did not receive expected number of dereg watch event for service")
}
}
14 changes: 7 additions & 7 deletions consulService.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@ package objdb
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"

log "github.com/Sirupsen/logrus"
"github.com/hashicorp/consul/api"
)

var errKeyNotFound = errors.New("Key not found")
var sessionTTL = "30s"

// Service state
type consulServiceState struct {
ServiceName string // Name of the service
TTL string // Service TTL
HostAddr string // Host name or IP address where its running
Port int // Port number where its listening
SessionID string // session id assigned by consul
Expand Down Expand Up @@ -75,7 +74,7 @@ func (cp *ConsulClient) RegisterService(serviceInfo ServiceInfo) error {
Name: keyName,
Behavior: "delete",
LockDelay: 10 * time.Millisecond,
TTL: sessionTTL,
TTL: fmt.Sprintf("%ds", serviceInfo.TTL),
}

// Create consul session
Expand Down Expand Up @@ -116,11 +115,12 @@ func (cp *ConsulClient) RegisterService(serviceInfo ServiceInfo) error {

// Run refresh in background
stopChan := make(chan struct{})
go cp.client.Session().RenewPeriodic(sessionTTL, sessionID, nil, stopChan)
go cp.client.Session().RenewPeriodic(sessCfg.TTL, sessionID, nil, stopChan)

// Store it in DB
cp.serviceDb[keyName] = &consulServiceState{
ServiceName: serviceInfo.ServiceName,
TTL: sessCfg.TTL,
HostAddr: serviceInfo.HostAddr,
Port: serviceInfo.Port,
SessionID: sessionID,
Expand Down Expand Up @@ -148,7 +148,7 @@ func (cp *ConsulClient) WatchService(srvName string, eventCh chan WatchServiceEv

// Get current list of services
srvList, lastIdx, err := cp.getServiceInstances(keyName, 0)
if err != nil && err != errKeyNotFound {
if err != nil {
log.Errorf("Error getting service instances for (%s): Err: %v", srvName, err)
} else {
// for each instance trigger an add event
Expand All @@ -174,7 +174,7 @@ func (cp *ConsulClient) WatchService(srvName string, eventCh chan WatchServiceEv
}

srvList, lastIdx, err = cp.getServiceInstances(keyName, lastIdx)
if err != nil && err != errKeyNotFound {
if err != nil {
log.Errorf("Error getting service instances for (%s): Err: %v", srvName, err)
} else {
log.Debugf("Got consul srv list: {%+v}. Curr: {%+v}", srvList, currSrvMap)
Expand Down

0 comments on commit d554aca

Please sign in to comment.