Cloudreve/pkg/filemanager/eventhub/subscriber.go

318 lines
6.8 KiB
Go

package eventhub
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/inventory"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/gofrs/uuid"
"github.com/samber/lo"
)
type Subscriber interface {
ID() string
Ch() chan *Event
Publish(evt Event)
Stop()
Buffer() []*Event
// Owner returns the owner of the subscriber.
Owner() (*ent.User, error)
// Online returns whether the subscriber is online.
Online() bool
// OfflineSince returns when the subscriber went offline.
// Returns zero time if the subscriber is online.
OfflineSince() time.Time
}
const (
debounceDelay = 5 * time.Second
userCacheTTL = 1 * time.Hour
offlineMaxAge = 14 * 24 * time.Hour // 14 days
)
type subscriber struct {
mu sync.Mutex
userClient inventory.UserClient
fsEventClient inventory.FsEventClient
id string
uid int
ch chan *Event
// Online status
online bool
offlineSince time.Time
// Debounce buffer for pending events
buffer []*Event
timer *time.Timer
// Owner info
ownerCached *ent.User
cachedAt time.Time
// Close signal
closed bool
closedCh chan struct{}
}
func newSubscriber(ctx context.Context, id string, userClient inventory.UserClient, fsEventClient inventory.FsEventClient) (*subscriber, error) {
user := inventory.UserFromContext(ctx)
if user == nil || inventory.IsAnonymousUser(user) {
return nil, errors.New("user not found")
}
return &subscriber{
id: id,
ch: make(chan *Event, bufSize),
userClient: userClient,
fsEventClient: fsEventClient,
ownerCached: user,
uid: user.ID,
cachedAt: time.Now(),
online: true,
closedCh: make(chan struct{}),
}, nil
}
func (s *subscriber) ID() string {
return s.id
}
func (s *subscriber) Ch() chan *Event {
return s.ch
}
func (s *subscriber) Online() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.online
}
func (s *subscriber) OfflineSince() time.Time {
s.mu.Lock()
defer s.mu.Unlock()
return s.offlineSince
}
func (s *subscriber) Owner() (*ent.User, error) {
s.mu.Lock()
defer s.mu.Unlock()
if time.Since(s.cachedAt) > userCacheTTL || s.ownerCached == nil {
user, err := s.userClient.GetLoginUserByID(context.Background(), s.uid)
if err != nil {
return nil, fmt.Errorf("failed to get login user: %w", err)
}
s.ownerCached = user
s.cachedAt = time.Now()
}
return s.ownerCached, nil
}
// Publish adds an event to the buffer and starts/resets the debounce timer.
// Events will be flushed to the channel after the debounce delay.
// If the subscriber is offline, events are kept in the buffer only.
func (s *subscriber) Publish(evt Event) {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
s.publishLocked(evt)
}
// publishLocked adds an event to the buffer and manages the debounce timer.
// Caller must hold s.mu.
func (s *subscriber) publishLocked(evt Event) {
// Add event to buffer
s.buffer = append(s.buffer, &evt)
// Reset or start the debounce timer
if s.timer != nil {
s.timer.Stop()
}
s.timer = time.AfterFunc(debounceDelay, s.flush)
}
// flush sends all buffered events to the channel.
// Called by the debounce timer.
func (s *subscriber) flush() {
s.mu.Lock()
defer s.mu.Unlock()
s.flushLocked(context.Background())
}
// flushLocked sends all buffered events to the channel.
// Caller must hold s.mu.
func (s *subscriber) flushLocked(ctx context.Context) {
if len(s.buffer) == 0 || s.closed {
return
}
if !s.online {
_ = s.fsEventClient.Create(ctx, s.ownerCached.ID, uuid.FromStringOrNil(s.id), lo.Map(s.buffer, func(item *Event, index int) string {
res, _ := json.Marshal(item)
return string(res)
})...)
} else {
// TODO: implement event merging logic here
// For now, send all buffered events individually
debouncedEvents := DebounceEvents(s.buffer)
for _, evt := range debouncedEvents {
select {
case s.ch <- evt:
default:
// Non-blocking send; drop if subscriber is slow
}
}
}
// Clear the buffer
s.buffer = nil
s.timer = nil
}
// Stop cancels any pending debounce timer and flushes remaining events.
// Should be called before closing the subscriber.
func (s *subscriber) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
if s.timer != nil {
s.timer.Stop()
s.timer = nil
}
// Flush any remaining events before stopping
s.flushLocked(context.Background())
}
// setOnline marks the subscriber as online and flushes any buffered events.
func (s *subscriber) setOnline(ctx context.Context) {
l := logging.FromContext(ctx)
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
s.online = true
s.ownerCached = nil
s.offlineSince = time.Time{}
// Retrieve events from inventory
events, err := s.fsEventClient.TakeBySubscriber(ctx, uuid.FromStringOrNil(s.id), s.uid)
if err != nil {
l.Error("Failed to get events from inventory: %s", err)
return
}
// Append events to buffer
for _, event := range events {
var eventParsed Event
err := json.Unmarshal([]byte(event.Event), &eventParsed)
if err != nil {
l.Error("Failed to unmarshal event: %s", err)
continue
}
s.buffer = append(s.buffer, &eventParsed)
}
// Flush buffered events if any
if len(s.buffer) > 0 {
if s.timer != nil {
s.timer.Stop()
}
s.timer = time.AfterFunc(debounceDelay, s.flush)
}
}
// setOffline marks the subscriber as offline.
func (s *subscriber) setOffline() {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
s.online = false
s.offlineSince = time.Now()
// Stop the timer, events will be kept in buffer
if s.timer != nil {
s.timer.Stop()
s.timer = nil
}
// flush the buffer
s.flushLocked(context.Background())
}
// close permanently closes the subscriber.
func (s *subscriber) close() {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
s.closed = true
if s.timer != nil {
s.timer.Stop()
s.timer = nil
}
// Delete the FsEvent
s.fsEventClient.DeleteBySubscriber(context.Background(), uuid.FromStringOrNil(s.id))
// Signal close and close the channel
close(s.closedCh)
close(s.ch)
s.buffer = nil
}
// isClosed returns whether the subscriber is closed.
func (s *subscriber) isClosed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.closed
}
// shouldExpire returns whether the subscriber should be expired (offline for too long).
func (s *subscriber) shouldExpire() bool {
s.mu.Lock()
defer s.mu.Unlock()
return !s.online && !s.offlineSince.IsZero() && time.Since(s.offlineSince) > offlineMaxAge
}
// Buffer returns a copy of the current buffered events.
// Useful for debugging or implementing custom merging logic.
func (s *subscriber) Buffer() []*Event {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.buffer) == 0 {
return nil
}
// Return a copy to avoid data races
buf := make([]*Event, len(s.buffer))
copy(buf, s.buffer)
return buf
}