GraphQL API and utilities for the rpdata project
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

88 lines
1.5 KiB

  1. package changes
  2. import (
  3. "context"
  4. "sync"
  5. "git.aiterp.net/rpdata/api/models"
  6. )
  7. var subMutex sync.Mutex
  8. var subList []*subscription
  9. type subscription struct {
  10. Keys map[string]bool
  11. Channel chan<- models.Change
  12. WildCard bool
  13. }
  14. // Subscribe subscribes to all changes.
  15. func Subscribe(ctx context.Context, keys []models.ChangeKey, wildcard bool) <-chan models.Change {
  16. channel := make(chan models.Change, 64)
  17. sub := &subscription{
  18. Keys: make(map[string]bool, len(keys)),
  19. Channel: channel,
  20. WildCard: wildcard,
  21. }
  22. for _, key := range keys {
  23. sub.Keys[mapKey(key)] = true
  24. }
  25. subMutex.Lock()
  26. subList = append(subList, sub)
  27. subMutex.Unlock()
  28. go func() {
  29. <-ctx.Done()
  30. subMutex.Lock()
  31. for i := range subList {
  32. if subList[i] == sub {
  33. subList = append(subList[:i], subList[i+1:]...)
  34. break
  35. }
  36. }
  37. subMutex.Unlock()
  38. }()
  39. return channel
  40. }
  41. func pushToSubscribers(change models.Change) {
  42. keys := make([]string, len(change.Keys))
  43. for i := range change.Keys {
  44. keys[i] = mapKey(change.Keys[i])
  45. }
  46. subMutex.Lock()
  47. SubLoop:
  48. for _, sub := range subList {
  49. if sub.WildCard && change.Listed {
  50. select {
  51. case sub.Channel <- change:
  52. default:
  53. }
  54. } else {
  55. for _, key := range keys {
  56. if sub.Keys[key] {
  57. select {
  58. case sub.Channel <- change:
  59. default:
  60. }
  61. continue SubLoop
  62. }
  63. }
  64. }
  65. }
  66. subMutex.Unlock()
  67. }
  68. func mapKey(ck models.ChangeKey) string {
  69. return ck.Model.String() + "." + ck.ID
  70. }
  71. func init() {
  72. subList = make([]*subscription, 0, 16)
  73. }