GraphQL API and utilities for the rpdata project
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

87 lines
1.5 KiB

  1. package changes
  2. import (
  3. "context"
  4. "sync"
  5. "git.aiterp.net/rpdata/api/models"
  6. )
  7. var subMutex sync.Mutex
  8. var subList []*subscription
  9. type subscription struct {
  10. Keys map[string]bool
  11. Channel chan<- models.Change
  12. WildCard bool
  13. }
  14. // Subscribe subscribes to all changes.
  15. func Subscribe(ctx context.Context, keys []models.ChangeKey, wildcard bool) <-chan models.Change {
  16. channel := make(chan models.Change, 64)
  17. sub := &subscription{
  18. Keys: make(map[string]bool, len(keys)),
  19. Channel: channel,
  20. WildCard: wildcard,
  21. }
  22. for _, key := range keys {
  23. sub.Keys[mapKey(key)] = true
  24. }
  25. subMutex.Lock()
  26. subList = append(subList, sub)
  27. subMutex.Unlock()
  28. go func() {
  29. <-ctx.Done()
  30. subMutex.Lock()
  31. for i := range subList {
  32. if subList[i] == sub {
  33. subList = append(subList[:i], subList[i+1:]...)
  34. }
  35. }
  36. subMutex.Unlock()
  37. }()
  38. return channel
  39. }
  40. func pushToSubscribers(change models.Change) {
  41. keys := make([]string, len(change.Keys))
  42. for i := range change.Keys {
  43. keys[i] = mapKey(change.Keys[i])
  44. }
  45. subMutex.Lock()
  46. SubLoop:
  47. for _, sub := range subList {
  48. if sub.WildCard && change.Listed {
  49. select {
  50. case sub.Channel <- change:
  51. default:
  52. }
  53. } else {
  54. for _, key := range keys {
  55. if sub.Keys[key] {
  56. select {
  57. case sub.Channel <- change:
  58. default:
  59. }
  60. continue SubLoop
  61. }
  62. }
  63. }
  64. }
  65. subMutex.Unlock()
  66. }
  67. func mapKey(ck models.ChangeKey) string {
  68. return ck.Model.String() + "." + ck.ID
  69. }
  70. func init() {
  71. subList = make([]*subscription, 0, 16)
  72. }