boltdbstorage.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package clustermanager
  2. import (
  3. bolt "go.etcd.io/bbolt"
  4. _ "k8s.io/klog"
  5. )
  6. type BoltDBClusterStorage struct {
  7. bucket []byte
  8. db *bolt.DB
  9. }
  10. func NewBoltDBClusterStorage(bucket string, db *bolt.DB) (ClusterStorage, error) {
  11. bucketKey := []byte(bucket)
  12. err := db.Update(func(tx *bolt.Tx) error {
  13. _, err := tx.CreateBucketIfNotExists(bucketKey)
  14. if err != nil {
  15. return err
  16. }
  17. return nil
  18. })
  19. if err != nil {
  20. return nil, err
  21. }
  22. return &BoltDBClusterStorage{
  23. bucket: bucketKey,
  24. db: db,
  25. }, nil
  26. }
  27. // Adds the entry if the key does not exist
  28. func (cs *BoltDBClusterStorage) AddIfNotExists(key string, cluster []byte) error {
  29. return cs.db.Update(func(tx *bolt.Tx) error {
  30. k := []byte(key)
  31. bucket := tx.Bucket(cs.bucket)
  32. if bucket.Get(k) != nil {
  33. return nil
  34. }
  35. return bucket.Put(k, cluster)
  36. })
  37. }
  38. // Adds the encoded cluster to storage if it doesn't exist. Otherwise, update the existing
  39. // value with the provided.
  40. func (cs *BoltDBClusterStorage) AddOrUpdate(key string, cluster []byte) error {
  41. return cs.db.Update(func(tx *bolt.Tx) error {
  42. bucket := tx.Bucket(cs.bucket)
  43. return bucket.Put([]byte(key), cluster)
  44. })
  45. }
  46. // Removes a key from the cluster storage
  47. func (cs *BoltDBClusterStorage) Remove(key string) error {
  48. return cs.db.Update(func(tx *bolt.Tx) error {
  49. bucket := tx.Bucket(cs.bucket)
  50. return bucket.Delete([]byte(key))
  51. })
  52. }
  53. // Iterates through all key/values for the storage and calls the handler func. If a handler returns
  54. // an error, the iteration stops.
  55. func (cs *BoltDBClusterStorage) Each(handler func(string, []byte) error) error {
  56. return cs.db.View(func(tx *bolt.Tx) error {
  57. bucket := tx.Bucket(cs.bucket)
  58. return bucket.ForEach(func(k, v []byte) error {
  59. // Allow the bytes to live outside transaction by copy
  60. key := make([]byte, len(k))
  61. value := make([]byte, len(v))
  62. copy(key, k)
  63. copy(value, v)
  64. if err := handler(string(key), value); err != nil {
  65. return err
  66. }
  67. return nil
  68. })
  69. })
  70. }
  71. // Closes the backing storage
  72. func (cs *BoltDBClusterStorage) Close() error {
  73. return cs.db.Close()
  74. }