boltdbstorage.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package clustermanager
  2. import (
  3. bolt "github.com/etcd-io/bbolt"
  4. _ "k8s.io/klog"
  5. )
  6. type BoltDBClusterStorage struct {
  7. bucket []byte
  8. db *bolt.DB
  9. }
  10. func NewBoltDBClusterStorage(bucket string, db *bolt.DB) (ClusterStorage, error) {
  11. bucketKey := []byte(bucket)
  12. err := db.Update(func(tx *bolt.Tx) error {
  13. _, err := tx.CreateBucketIfNotExists(bucketKey)
  14. if err != nil {
  15. return err
  16. }
  17. return nil
  18. })
  19. if err != nil {
  20. return nil, err
  21. }
  22. return &BoltDBClusterStorage{
  23. bucket: bucketKey,
  24. db: db,
  25. }, nil
  26. }
  27. // Adds the entry if the key does not exist
  28. func (cs *BoltDBClusterStorage) AddIfNotExists(key string, cluster []byte) error {
  29. return cs.db.Update(func(tx *bolt.Tx) error {
  30. k := []byte(key)
  31. bucket := tx.Bucket(cs.bucket)
  32. if bucket.Get(k) != nil {
  33. return nil
  34. }
  35. return bucket.Put(k, cluster)
  36. })
  37. }
  38. // Adds the encoded cluster to storage if it doesn't exist. Otherwise, update the existing
  39. // value with the provided.
  40. func (cs *BoltDBClusterStorage) AddOrUpdate(key string, cluster []byte) error {
  41. return cs.db.Update(func(tx *bolt.Tx) error {
  42. bucket := tx.Bucket(cs.bucket)
  43. return bucket.Put([]byte(key), cluster)
  44. })
  45. }
  46. // Removes a key from the cluster storage
  47. func (cs *BoltDBClusterStorage) Remove(key string) error {
  48. return cs.db.Update(func(tx *bolt.Tx) error {
  49. bucket := tx.Bucket(cs.bucket)
  50. return bucket.Delete([]byte(key))
  51. })
  52. }
  53. // Retrieve all the keys stored
  54. func (cs *BoltDBClusterStorage) Each(handler func(string, []byte)) error {
  55. return cs.db.View(func(tx *bolt.Tx) error {
  56. bucket := tx.Bucket(cs.bucket)
  57. return bucket.ForEach(func(k, v []byte) error {
  58. // Allow the bytes to live outside transaction by copy
  59. key := make([]byte, len(k))
  60. value := make([]byte, len(v))
  61. copy(key, k)
  62. copy(value, v)
  63. handler(string(key), value)
  64. return nil
  65. })
  66. })
  67. }
  68. // Closes the backing storage
  69. func (cs *BoltDBClusterStorage) Close() error {
  70. return cs.db.Close()
  71. }