boltdbstorage.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package clusters
  2. import (
  3. bolt "go.etcd.io/bbolt"
  4. )
  5. // BoltDBClusterStorage is a boltdb implementation of a database used to store cluster definitions
  6. type BoltDBClusterStorage struct {
  7. bucket []byte
  8. db *bolt.DB
  9. }
  10. // NewBoltDBClusterStorage creates a new boltdb backed ClusterStorage implementation
  11. func NewBoltDBClusterStorage(bucket string, db *bolt.DB) (ClusterStorage, error) {
  12. bucketKey := []byte(bucket)
  13. err := db.Update(func(tx *bolt.Tx) error {
  14. _, err := tx.CreateBucketIfNotExists(bucketKey)
  15. if err != nil {
  16. return err
  17. }
  18. return nil
  19. })
  20. if err != nil {
  21. return nil, err
  22. }
  23. return &BoltDBClusterStorage{
  24. bucket: bucketKey,
  25. db: db,
  26. }, nil
  27. }
  28. // AddIfNotExists Adds the entry if the key does not exist
  29. func (cs *BoltDBClusterStorage) AddIfNotExists(key string, cluster []byte) error {
  30. return cs.db.Update(func(tx *bolt.Tx) error {
  31. k := []byte(key)
  32. bucket := tx.Bucket(cs.bucket)
  33. if bucket.Get(k) != nil {
  34. return nil
  35. }
  36. return bucket.Put(k, cluster)
  37. })
  38. }
  39. // AddOrUpdate Adds the encoded cluster to storage if it doesn't exist. Otherwise, update the existing
  40. // value with the provided.
  41. func (cs *BoltDBClusterStorage) AddOrUpdate(key string, cluster []byte) error {
  42. return cs.db.Update(func(tx *bolt.Tx) error {
  43. bucket := tx.Bucket(cs.bucket)
  44. return bucket.Put([]byte(key), cluster)
  45. })
  46. }
  47. // Remove Removes a key from the cluster storage
  48. func (cs *BoltDBClusterStorage) Remove(key string) error {
  49. return cs.db.Update(func(tx *bolt.Tx) error {
  50. bucket := tx.Bucket(cs.bucket)
  51. return bucket.Delete([]byte(key))
  52. })
  53. }
  54. // Each Iterates through all key/values for the storage and calls the handler func. If a handler returns
  55. // an error, the iteration stops.
  56. func (cs *BoltDBClusterStorage) Each(handler func(string, []byte) error) error {
  57. return cs.db.View(func(tx *bolt.Tx) error {
  58. bucket := tx.Bucket(cs.bucket)
  59. return bucket.ForEach(func(k, v []byte) error {
  60. // Allow the bytes to live outside transaction by copy
  61. key := make([]byte, len(k))
  62. value := make([]byte, len(v))
  63. copy(key, k)
  64. copy(value, v)
  65. if err := handler(string(key), value); err != nil {
  66. return err
  67. }
  68. return nil
  69. })
  70. })
  71. }
  72. // Close Closes the backing storage
  73. func (cs *BoltDBClusterStorage) Close() error {
  74. return cs.db.Close()
  75. }