Plan stuff. Log stuff.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

339 lines
6.9 KiB

5 years ago
  1. package bolt
  2. import (
  3. "bytes"
  4. "errors"
  5. "go.etcd.io/bbolt"
  6. )
  7. import "github.com/vmihailenco/msgpack/v4"
  8. var bnMap = []byte("map")
  9. var bnRev = []byte("rev")
  10. type index struct {
  11. bucketNames [][]byte
  12. }
  13. // newModelIndex creates an index with the naming convention of `_idx.Model.Field`.
  14. func newModelIndex(db *bbolt.DB, model, field string) (*index, error) {
  15. return newIndex(db, "_idx", model, field)
  16. }
  17. // newIndex creates a new index and ensures the bucket chain is in order.
  18. func newIndex(db *bbolt.DB, buckets ...string) (*index, error) {
  19. if len(buckets) == 0 {
  20. panic("no buckets")
  21. }
  22. bucketNames := make([][]byte, len(buckets))
  23. for i := range buckets {
  24. bucketNames[i] = []byte(buckets[i])
  25. }
  26. err := db.Update(func(tx *bbolt.Tx) error {
  27. bucket, err := tx.CreateBucketIfNotExists(bucketNames[0])
  28. if err != nil {
  29. return err
  30. }
  31. for _, bucketName := range bucketNames[1:] {
  32. bucket, err = bucket.CreateBucketIfNotExists(bucketName)
  33. if err != nil {
  34. return err
  35. }
  36. }
  37. _, err = bucket.CreateBucketIfNotExists(bnMap)
  38. if err != nil {
  39. return err
  40. }
  41. _, err = bucket.CreateBucketIfNotExists(bnRev)
  42. if err != nil {
  43. return err
  44. }
  45. return nil
  46. })
  47. return &index{bucketNames: bucketNames}, err
  48. }
  49. func (idx *index) Reset(tx *bbolt.Tx) error {
  50. rootBucket := tx.Bucket(idx.bucketNames[0])
  51. for _, name := range idx.bucketNames[1:] {
  52. rootBucket = rootBucket.Bucket(name)
  53. }
  54. err := rootBucket.DeleteBucket(bnRev)
  55. if err != nil {
  56. return err
  57. }
  58. err = rootBucket.DeleteBucket(bnMap)
  59. if err != nil {
  60. return err
  61. }
  62. _, err = rootBucket.CreateBucket(bnRev)
  63. if err != nil {
  64. return err
  65. }
  66. _, err = rootBucket.CreateBucket(bnMap)
  67. if err != nil {
  68. return err
  69. }
  70. return nil
  71. }
  72. func (idx *index) buckets(tx *bbolt.Tx) (mapBucket, revBucket *bbolt.Bucket) {
  73. rootBucket := tx.Bucket(idx.bucketNames[0])
  74. for _, name := range idx.bucketNames[1:] {
  75. rootBucket = rootBucket.Bucket(name)
  76. }
  77. return rootBucket.Bucket(bnMap), rootBucket.Bucket(bnRev)
  78. }
  79. func (idx *index) WithTx(tx *bbolt.Tx) *indexTx {
  80. mapBucket, revBucket := idx.buckets(tx)
  81. return &indexTx{
  82. tx: tx,
  83. mapBucket: mapBucket,
  84. revBucket: revBucket,
  85. }
  86. }
  87. type indexTx struct {
  88. tx *bbolt.Tx
  89. mapBucket *bbolt.Bucket
  90. revBucket *bbolt.Bucket
  91. }
  92. func (itx *indexTx) Get(value string) (ids [][]byte, err error) {
  93. entry := itx.mapBucket.Get(unsafeStringToBytes(value))
  94. if entry == nil {
  95. return
  96. }
  97. ids = make([][]byte, 0, 8)
  98. err = msgpack.Unmarshal(entry, &ids)
  99. return
  100. }
  101. // Reverse gets all values associated with the ID
  102. func (itx *indexTx) Reverse(id []byte) (values []string, err error) {
  103. value := itx.revBucket.Get(id)
  104. if value == nil {
  105. return
  106. }
  107. values = make([]string, 0, 8)
  108. err = msgpack.Unmarshal(value, &values)
  109. return
  110. }
  111. func (itx *indexTx) Before(value string) (ids [][]byte, err error) {
  112. cursor := itx.mapBucket.Cursor()
  113. ids = make([][]byte, 0, 16)
  114. valueBytes := unsafeStringToBytes(value)
  115. cursor.Seek(valueBytes)
  116. key, entryValue := cursor.Prev()
  117. if key == nil {
  118. return
  119. }
  120. entry := itx.mapBucket.Get([]byte(entryValue))
  121. if entry == nil {
  122. return
  123. }
  124. err = msgpack.Unmarshal(entry, &ids)
  125. return
  126. }
  127. func (itx *indexTx) Between(a, b string) (ids [][]byte, err error) {
  128. cursor := itx.mapBucket.Cursor()
  129. ids = make([][]byte, 0, 16)
  130. aBytes := unsafeStringToBytes(a)
  131. bBytes := unsafeStringToBytes(b)
  132. for key, value := cursor.Seek(aBytes); key != nil && bytes.Compare(key, bBytes) < 1; key, value = cursor.Next() {
  133. entry := itx.mapBucket.Get([]byte(value))
  134. if entry == nil {
  135. return
  136. }
  137. err = msgpack.Unmarshal(entry, &ids)
  138. if err != nil {
  139. return
  140. }
  141. }
  142. return
  143. }
  144. // Set sets the index to the given values. This removes any values not in the given list.
  145. func (itx *indexTx) Set(id []byte, values ...string) error {
  146. oldValues := make([]string, 0, len(values))
  147. newValues := make([]string, 0, len(values))
  148. // Check for duplicates
  149. for i, value := range values {
  150. for _, value2 := range values[i+1:] {
  151. if value == value2 {
  152. return errors.New("Duplicate value for index: " + value)
  153. }
  154. }
  155. }
  156. if value := itx.revBucket.Get(id); value != nil {
  157. // Existing ID
  158. existingValues := make([]string, 0, 16)
  159. err := msgpack.Unmarshal(value, &existingValues)
  160. if err != nil {
  161. return err
  162. }
  163. // Record new and old values if there are any.
  164. if len(values) > 0 {
  165. // Find old values
  166. OldValueLoop:
  167. for _, existingValue := range existingValues {
  168. for _, value := range values {
  169. if value == existingValue {
  170. continue OldValueLoop
  171. }
  172. }
  173. oldValues = append(oldValues, existingValue)
  174. }
  175. // Find new values
  176. NewValueLoop:
  177. for _, value := range values {
  178. if value == "" {
  179. continue
  180. }
  181. for _, existingValue := range existingValues {
  182. if value == existingValue {
  183. continue NewValueLoop
  184. }
  185. }
  186. newValues = append(newValues, value)
  187. }
  188. } else {
  189. // There aren't any values, all values should be considered old.
  190. oldValues = existingValues
  191. newValues = newValues[:0]
  192. }
  193. } else {
  194. // New ID, can be skipped if this is clearing the itx operation
  195. if len(values) == 0 {
  196. return nil
  197. }
  198. // Otherwise, all values are newValues.
  199. for _, value := range values {
  200. if value == "" {
  201. continue
  202. }
  203. newValues = append(newValues, value)
  204. }
  205. }
  206. // Put new reverse lookup entry
  207. if len(values) > 0 {
  208. revData, err := msgpack.Marshal(values)
  209. if err != nil {
  210. return err
  211. }
  212. err = itx.revBucket.Put(id, revData)
  213. if err != nil {
  214. return err
  215. }
  216. }
  217. // Remove old values
  218. for _, oldValue := range oldValues {
  219. ov := []byte(oldValue)
  220. value := itx.mapBucket.Get(ov)
  221. if value == nil {
  222. return errors.New("oldValue expected, but not found. itx probably corrupt")
  223. }
  224. ids := make([][]byte, 0, 8)
  225. err := msgpack.Unmarshal(value, &ids)
  226. if err != nil {
  227. return err
  228. }
  229. for i, existingId := range ids {
  230. if bytes.Equal(existingId, id) {
  231. ids = append(ids[:i], ids[i+1:]...)
  232. break
  233. }
  234. }
  235. if len(ids) == 0 {
  236. err = itx.mapBucket.Delete(ov)
  237. if err != nil {
  238. return err
  239. }
  240. } else {
  241. newValue, err := msgpack.Marshal(ids)
  242. if err != nil {
  243. return err
  244. }
  245. err = itx.mapBucket.Put(ov, newValue)
  246. if err != nil {
  247. return err
  248. }
  249. }
  250. }
  251. // Add new values
  252. for _, newValue := range newValues {
  253. nv := []byte(newValue)
  254. if existing := itx.mapBucket.Get(nv); existing != nil {
  255. ids := make([][]byte, 0, 8)
  256. err := msgpack.Unmarshal(existing, &ids)
  257. if err != nil {
  258. return err
  259. }
  260. newValue, err := msgpack.Marshal(append(ids, id))
  261. if err != nil {
  262. return err
  263. }
  264. err = itx.mapBucket.Put(nv, newValue)
  265. if err != nil {
  266. return err
  267. }
  268. } else {
  269. newValue, err := msgpack.Marshal([][]byte{id})
  270. if err != nil {
  271. return err
  272. }
  273. err = itx.mapBucket.Put(nv, newValue)
  274. if err != nil {
  275. return err
  276. }
  277. }
  278. }
  279. // If this will delete all values, then delete this entry
  280. if len(values) == 0 {
  281. err := itx.revBucket.Delete(id)
  282. if err != nil {
  283. return err
  284. }
  285. }
  286. return nil
  287. }