Skip to content

Commit a4caad0

Browse files
authored
Merge pull request #21 from emacsway/mediator
The mediator pattern implementation
2 parents 645bd5d + 2405a6a commit a4caad0

3 files changed

Lines changed: 416 additions & 4 deletions

File tree

grade/internal/infrastructure/seedwork/mediator/mediator.go

Lines changed: 0 additions & 4 deletions
This file was deleted.
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package mediator
2+
3+
import (
4+
"context"
5+
"errors"
6+
"reflect"
7+
"sync"
8+
9+
"github.com/emacsway/grade/grade/internal/domain/seedwork/disposable"
10+
"github.com/hashicorp/go-multierror"
11+
)
12+
13+
var (
14+
ErrUnsuitableHandlerSignature = errors.New("passed handler has unsuitable signature")
15+
)
16+
17+
type Handler func(ctx context.Context, command any) (any, error)
18+
19+
func AsUntyped[T any](handler func(ctx context.Context, command T) (any, error)) Handler {
20+
return func(ctx context.Context, command any) (any, error) {
21+
if typedCommand, ok := command.(T); ok {
22+
return handler(ctx, typedCommand)
23+
}
24+
25+
return nil, ErrUnsuitableHandlerSignature
26+
}
27+
}
28+
29+
type RefUntypedMediator struct {
30+
hLock sync.RWMutex
31+
handlers map[reflect.Type]Handler
32+
33+
sLock sync.RWMutex
34+
subscribers map[reflect.Type]map[reflect.Value]Handler
35+
36+
pLock sync.RWMutex
37+
pipes []func(next Handler) Handler
38+
}
39+
40+
func NewRefUntypedMediator() *RefUntypedMediator {
41+
return &RefUntypedMediator{
42+
hLock: sync.RWMutex{},
43+
handlers: map[reflect.Type]Handler{},
44+
45+
sLock: sync.RWMutex{},
46+
subscribers: map[reflect.Type]map[reflect.Value]Handler{},
47+
48+
pLock: sync.RWMutex{},
49+
}
50+
}
51+
52+
func (m *RefUntypedMediator) AddPipe(pipe func(next Handler) Handler) {
53+
m.pLock.Lock()
54+
defer m.pLock.Unlock()
55+
56+
m.pipes = append(m.pipes, pipe)
57+
}
58+
59+
func (m *RefUntypedMediator) executeWithPipeline(handler Handler, ctx context.Context, command any) (any, error) {
60+
m.pLock.RLock()
61+
defer m.pLock.RUnlock()
62+
63+
current := func(ctx context.Context, command any) (any, error) {
64+
return handler(ctx, command)
65+
}
66+
67+
for ixd := range m.pipes {
68+
reverse := len(m.pipes) - 1 - ixd
69+
current = m.pipes[reverse](current)
70+
}
71+
72+
return current(ctx, command)
73+
}
74+
75+
func (m *RefUntypedMediator) Send(ctx context.Context, command any) (any, error) {
76+
m.hLock.RLock()
77+
defer m.hLock.RUnlock()
78+
79+
commandType := reflect.TypeOf(command)
80+
if handler, found := m.handlers[commandType]; found {
81+
return m.executeWithPipeline(handler, ctx, command)
82+
}
83+
84+
return nil, nil
85+
}
86+
87+
func (m *RefUntypedMediator) Register(command any, handler Handler) disposable.Disposable {
88+
m.hLock.Lock()
89+
defer m.hLock.Unlock()
90+
91+
commandType := reflect.TypeOf(command)
92+
m.handlers[commandType] = handler
93+
94+
return disposable.NewDisposable(func() {
95+
m.Unregister(command)
96+
})
97+
}
98+
99+
func (m *RefUntypedMediator) Unregister(command any) {
100+
m.hLock.Lock()
101+
defer m.hLock.Unlock()
102+
103+
commandType := reflect.TypeOf(command)
104+
delete(m.handlers, commandType)
105+
}
106+
107+
func (m *RefUntypedMediator) Subscribe(event any, handler Handler) disposable.Disposable {
108+
m.sLock.Lock()
109+
defer m.sLock.Unlock()
110+
111+
valueType := reflect.TypeOf(event)
112+
if _, found := m.subscribers[valueType]; !found {
113+
m.subscribers[valueType] = map[reflect.Value]Handler{}
114+
}
115+
116+
handlerValue := reflect.ValueOf(handler)
117+
m.subscribers[valueType][handlerValue] = handler
118+
119+
return disposable.NewDisposable(func() {
120+
m.Unsubscribe(event, handler)
121+
})
122+
}
123+
124+
func (m *RefUntypedMediator) Unsubscribe(event any, handler Handler) {
125+
m.sLock.Lock()
126+
defer m.sLock.Unlock()
127+
128+
eventType := reflect.TypeOf(event)
129+
handlerValue := reflect.ValueOf(handler)
130+
131+
delete(m.subscribers[eventType], handlerValue)
132+
}
133+
134+
func (m *RefUntypedMediator) Publish(ctx context.Context, event any) error {
135+
m.sLock.RLock()
136+
defer m.sLock.RUnlock()
137+
138+
var errs error
139+
eventType := reflect.TypeOf(event)
140+
for _, handler := range m.subscribers[eventType] {
141+
_, err := handler(ctx, event)
142+
errs = multierror.Append(errs, err)
143+
}
144+
145+
return errs
146+
}

0 commit comments

Comments
 (0)