GraphQL API and utilities for the rpdata project
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

90 lines
1.5 KiB

  1. package changes
  2. import (
  3. "context"
  4. "sync"
  5. "git.aiterp.net/rpdata/api/models"
  6. )
  7. var subMutex sync.Mutex
  8. var subList []*subscription
  9. type subscription struct {
  10. Keys map[string]bool
  11. Channel chan<- *models.Change
  12. WildCard bool
  13. }
  14. // Subscribe subscribes to all changes.
  15. func Subscribe(ctx context.Context, keys []models.ChangeKey, wildcard bool) <-chan *models.Change {
  16. channel := make(chan *models.Change, 64)
  17. sub := &subscription{
  18. Keys: make(map[string]bool, len(keys)),
  19. Channel: channel,
  20. WildCard: wildcard,
  21. }
  22. for _, key := range keys {
  23. sub.Keys[mapKey(key)] = true
  24. }
  25. subMutex.Lock()
  26. subList = append(subList, sub)
  27. subMutex.Unlock()
  28. go func() {
  29. <-ctx.Done()
  30. subMutex.Lock()
  31. for i := range subList {
  32. if subList[i] == sub {
  33. subList = append(subList[:i], subList[i+1:]...)
  34. break
  35. }
  36. }
  37. subMutex.Unlock()
  38. }()
  39. return channel
  40. }
  41. func pushToSubscribers(change models.Change) {
  42. keys := make([]string, len(change.Keys))
  43. for i := range change.Keys {
  44. keys[i] = mapKey(change.Keys[i])
  45. }
  46. subMutex.Lock()
  47. SubLoop:
  48. for _, sub := range subList {
  49. changeCopy := change
  50. if sub.WildCard && change.Listed {
  51. select {
  52. case sub.Channel <- &changeCopy:
  53. default:
  54. }
  55. } else {
  56. for _, key := range keys {
  57. if sub.Keys[key] {
  58. select {
  59. case sub.Channel <- &changeCopy:
  60. default:
  61. }
  62. continue SubLoop
  63. }
  64. }
  65. }
  66. }
  67. subMutex.Unlock()
  68. }
  69. func mapKey(ck models.ChangeKey) string {
  70. return ck.Model.String() + "." + ck.ID
  71. }
  72. func init() {
  73. subList = make([]*subscription, 0, 16)
  74. }