Skip to content

Commit

Permalink
add basic consul support
Browse files Browse the repository at this point in the history
  • Loading branch information
shaleman committed Jan 16, 2016
1 parent 34faf4d commit d33fd63
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 1 deletion.
104 changes: 103 additions & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestSetGetPerformance(t *testing.T) {
}
var retVal JsonObj

const testCount = 100
const testCount = 1000

log.Infof("Performing %d write tests", testCount)

Expand Down Expand Up @@ -113,6 +113,108 @@ func TestSetGetPerformance(t *testing.T) {
fmt.Printf("Set/Get/Del test successful\n")
}

func TestConsulClientSetGet(t *testing.T) {
// Get the consul plugin
consulClient := objdb.GetPlugin("consul")

// Initialize the consul client
if err := consulClient.Init([]string{}); err != nil {
log.Fatalf("Error initializing consul plugin. Err: %v", err)
}

setVal := JsonObj{
Value: "test1",
}

if err := consulClient.SetObj("/contiv.io/test", setVal); err != nil {
fmt.Printf("Fatal setting key. Err: %v\n", err)
t.Fatalf("Fatal setting key")
}

var retVal JsonObj

if err := consulClient.GetObj("/contiv.io/test", &retVal); err != nil {
fmt.Printf("Fatal getting key. Err: %v\n", err)
t.Fatalf("Fatal getting key")
}

if retVal.Value != "test1" {
fmt.Printf("Got invalid response: %+v\n", retVal)
t.Fatalf("Got invalid response")
}

if err := consulClient.DelObj("/contiv.io/test"); err != nil {
t.Fatalf("Fatal deleting test object. Err: %v", err)
}

fmt.Printf("Consul Set/Get/Del test successful\n")
}

func TestConsulSetGetPerformance(t *testing.T) {
// Set
setVal := JsonObj{
Value: "test1",
}
var retVal JsonObj

const testCount = 1000

// Get the consul plugin
consulClient := objdb.GetPlugin("consul")

// Initialize the consul client
if err := consulClient.Init([]string{}); err != nil {
log.Fatalf("Error initializing consul plugin. Err: %v", err)
}

log.Infof("Performing %d write tests", testCount)

startTime := time.Now()

for i := 0; i < testCount; i++ {
if err := consulClient.SetObj("/contiv.io/test"+strconv.Itoa(i), setVal); err != nil {
fmt.Printf("Fatal setting key. Err: %v\n", err)
t.Fatalf("Fatal setting key")
}
}

timeTook := time.Since(startTime).Nanoseconds() / 1000000
log.Infof("Write Test took %d milli seconds per write. %d ms total", timeTook/testCount, timeTook)

log.Infof("Performing %d read tests", testCount)

// Get test
startTime = time.Now()

for i := 0; i < testCount; i++ {
if err := consulClient.GetObj("/contiv.io/test"+strconv.Itoa(i), &retVal); err != nil {
fmt.Printf("Fatal getting key. Err: %v\n", err)
t.Fatalf("Fatal getting key")
}

if retVal.Value != "test1" {
fmt.Printf("Got invalid response: %+v\n", retVal)
t.Fatalf("Got invalid response")
}
}

timeTook = time.Since(startTime).Nanoseconds() / 1000000
log.Infof("Read Test took %d milli seconds per read. %d ms total", timeTook/testCount, timeTook)

startTime = time.Now()

for i := 0; i < testCount; i++ {
if err := consulClient.DelObj("/contiv.io/test" + strconv.Itoa(i)); err != nil {
t.Fatalf("Fatal deleting test object. Err: %v", err)
}
}

timeTook = time.Since(startTime).Nanoseconds() / 1000000
log.Infof("Delete Test took %d milli seconds per delete. %d ms total", timeTook/testCount, timeTook)

fmt.Printf("Set/Get/Del test successful\n")
}

func TestLockAcquireRelease(t *testing.T) {
// Create a lock
lock1, err := client.NewLock("master", "hostname1", 10)
Expand Down
135 changes: 135 additions & 0 deletions plugins/consulClient/consulClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package consulClient

import (
"encoding/json"
"strings"
"sync"

"github.com/hashicorp/consul/api"
"github.com/netplugin.orig/core"

log "github.com/Sirupsen/logrus"
"github.com/contiv/objdb"
)

type ConsulPlugin struct {
client *api.Client // consul client
consulConfig api.Config

mutex *sync.Mutex
}

var consulPlugin = &ConsulPlugin{mutex: new(sync.Mutex)}

// InitPlugin Register the plugin
func InitPlugin() {
objdb.RegisterPlugin("consul", consulPlugin)
}

// Init initializes the consul client
func (self *ConsulPlugin) Init(machines []string) error {
// default consul config
self.consulConfig = api.Config{Address: "127.0.0.1:8500"}

// Init consul client
client, err := api.NewClient(&self.consulConfig)
if err != nil {
log.Fatalf("Error initializing consul client")
return err
}

self.client = client

return nil
}

func processKey(inKey string) string {
//consul doesn't accepts keys starting with a '/', so trim the leading slash
return strings.TrimPrefix(inKey, "/")
}

func (self *ConsulPlugin) GetObj(key string, retVal interface{}) error {
key = processKey("/contiv.io/obj/" + processKey(key))

resp, _, err := self.client.KV().Get(key, nil)
if err != nil {
return err
}
// Consul returns success and a nil kv when a key is not found,
// translate it to 'Key not found' error
if resp == nil {
return core.Errorf("Key not found")
}

// Parse JSON response
if err := json.Unmarshal([]byte(resp.Value), retVal); err != nil {
log.Errorf("Error parsing object %s, Err %v", resp.Value, err)
return err
}

return nil
}

// ListDir returns a list of keys in a directory
func (self *ConsulPlugin) ListDir(key string) ([]string, error) {
key = processKey("/contiv.io/obj/" + processKey(key))

kvs, _, err := self.client.KV().List(key, nil)
if err != nil {
return nil, err
}
// Consul returns success and a nil kv when a key is not found,
// translate it to 'Key not found' error
if kvs == nil {
return nil, core.Errorf("Key not found")
}

var keys []string
for _, kv := range kvs {
keys = append(keys, kv.Key)
}

return keys, nil
}

// SetObj writes an object
func (self *ConsulPlugin) SetObj(key string, value interface{}) error {
key = processKey("/contiv.io/obj/" + processKey(key))

// JSON format the object
jsonVal, err := json.Marshal(value)
if err != nil {
log.Errorf("Json conversion error. Err %v", err)
return err
}

_, err = self.client.KV().Put(&api.KVPair{Key: key, Value: jsonVal}, nil)

return err
}

// DelObj deletes an object
func (self *ConsulPlugin) DelObj(key string) error {
key = processKey("/contiv.io/obj/" + processKey(key))
_, err := self.client.KV().Delete(key, nil)
return err
}

func (self *ConsulPlugin) GetLocalAddr() (string, error) {
return "", nil
}
func (self *ConsulPlugin) NewLock(name string, myId string, ttl uint64) (objdb.LockInterface, error) {
return nil, nil
}
func (self *ConsulPlugin) RegisterService(serviceInfo objdb.ServiceInfo) error {
return nil
}
func (self *ConsulPlugin) GetService(name string) ([]objdb.ServiceInfo, error) {
return nil, nil
}
func (self *ConsulPlugin) WatchService(name string, eventCh chan objdb.WatchServiceEvent, stopCh chan bool) error {
return nil
}
func (self *ConsulPlugin) DeregisterService(serviceInfo objdb.ServiceInfo) error {
return nil
}
2 changes: 2 additions & 0 deletions plugins/plugins.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package plugins

import (
"github.com/contiv/objdb/plugins/consulClient"
"github.com/contiv/objdb/plugins/etcdClient"
)

func Init() {
// Initialize all conf store plugins
etcdClient.InitPlugin()
consulClient.InitPlugin()
}

0 comments on commit d33fd63

Please sign in to comment.