Skip to content

Commit c87a271

Browse files
committed
refactoring: Improves the manual start of modules by removing default implementations.
1 parent abdedde commit c87a271

10 files changed

Lines changed: 344 additions & 221 deletions

File tree

lib/consumer/behaviour.ex

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
defmodule RabbitMQStream.Consumer.Behaviour do
2+
@doc """
3+
The callback that is invoked when a chunk is received.
4+
5+
The server sends us messages in chunks, which of each contains many messages. But if we are
6+
consuming from an offset inside of a chunk, or if we have enabled the `filter_value` parameter,
7+
some of those messages might not be passed to `handle_message/1` callback.
8+
9+
You can use use `handle_chunk/1` to access the whole chunk for some extra logic.
10+
11+
Implemeting `handle_chunk/1` doesn't prevent `handle_message/1` from being called.
12+
13+
Optionally if you implement `handle_chunk/2`, it also passes the current
14+
state of the consumer. It can be used to access the `private` field
15+
passed to `start_link/1`, or the `stream_name` itself.
16+
17+
"""
18+
@callback handle_chunk(chunk :: RabbitMQStream.OsirisChunk.t()) :: term()
19+
@callback handle_chunk(chunk :: RabbitMQStream.OsirisChunk.t(), state :: RabbitMQStream.Consumer.t()) :: term()
20+
21+
@doc """
22+
Callback invoked on each message received from the stream.
23+
24+
This is the main way of consuming messages, and it applies any filtering and decoding necessary
25+
to the message before being invoked.
26+
"""
27+
@callback handle_message(message :: binary() | term()) :: term()
28+
@callback handle_message(message :: binary() | term(), state :: RabbitMQStream.Consumer.t()) :: term()
29+
@callback handle_message(
30+
message :: binary() | term(),
31+
chunk :: RabbitMQStream.OsirisChunk.t(),
32+
state :: RabbitMQStream.Consumer.t()
33+
) ::
34+
term()
35+
36+
@doc """
37+
If the consumer has been defined with the 'single-active-consumer' parameter,
38+
this callback is invoked when the consumer is being upgraded to being the
39+
active one, or when downgraded to being an inactive one.
40+
41+
When the flag parameter is set to ':upgrade', it means that the consumer is being
42+
upgraded to active and it must return the offset for where it wants to start
43+
consuming from the stream.
44+
45+
When being downgraded, the offset returned by the callback is also sent
46+
to the server but, at the moment, is not being used in any way, and is only
47+
sent because the API requires. But this is actually a good moment to store
48+
the offset so that it can be retrieved by the other consumer that is being
49+
upgraded.
50+
"""
51+
@callback handle_update(consumer :: RabbitMQStream.Consumer.t(), action :: :upgrade | :downgrade) ::
52+
{:ok, RabbitMQStream.Connection.offset()} | {:error, any()}
53+
54+
@doc """
55+
Callback invoked on each message inside of a chunk.
56+
57+
It can be used to decode the message from a binary format into a Map,
58+
or to use GZIP to decompress the content.
59+
60+
You can also globally define a 'Serializer' module, that must implement
61+
the 'decode!/1' callback, at compile-time configuration so it is added
62+
to as the default callback.
63+
"""
64+
@callback decode!(message :: String.t()) :: term()
65+
66+
@doc """
67+
Callback invoked right before subscribing a consumer to the stream.
68+
Might be usefull for setup logic, like creating a stream if it doesn't yet exists.
69+
"""
70+
@callback before_start(RabbitMQStream.Consumer.opts(), RabbitMQStream.Consumer.t()) :: RabbitMQStream.Consumer.t()
71+
72+
@doc """
73+
Send a command to add the provided amount of credits to the consumer.
74+
75+
The credits are tracked by the Server, but it is also stored internally
76+
on the Consumer state, which then can be retreived by calling 'get_credits/0'.
77+
78+
Always returns :ok, and any errors when adding credits to a consumer are logged.
79+
"""
80+
@callback credit(amount :: non_neg_integer()) :: :ok
81+
82+
@doc """
83+
Returns the internally tracked amount of credits for the Consumer.
84+
"""
85+
@callback get_credits() :: non_neg_integer()
86+
87+
@doc """
88+
Persists the consumer's latests offset into the stream.
89+
90+
Be aware that it does not reset any tracking strategy.
91+
"""
92+
@callback store_offset() :: :ok
93+
94+
@doc """
95+
Computes the `filter_value` for a decoded messages, used for filtering incoming messages.
96+
97+
Must follow the same implementation you define at your `c:RabbitMQStream.Producer.filter_value/1`
98+
callback.
99+
100+
Required when passing either `:filter` or `:match_unfiltered` properties when declaring the consumer.
101+
102+
Since the server sends the data in chunks, each of which is guaranted to have at least one message
103+
that match the consumer filter, but it might have some that don't. We need declare this callback to
104+
do additional client side filtering, so that the `handle_message/1` callback only receives those
105+
messages it is really interested in.
106+
"""
107+
@callback filter_value(term()) :: binary() | nil
108+
109+
@optional_callbacks handle_chunk: 1,
110+
handle_chunk: 2,
111+
handle_message: 1,
112+
handle_message: 2,
113+
handle_message: 3,
114+
decode!: 1,
115+
handle_update: 2,
116+
before_start: 2,
117+
get_credits: 0,
118+
store_offset: 0,
119+
filter_value: 1,
120+
credit: 1
121+
end

lib/consumer/consumer.ex

Lines changed: 21 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ defmodule RabbitMQStream.Consumer do
194194
@opts unquote(opts)
195195
require Logger
196196

197-
@behaviour RabbitMQStream.Consumer
197+
@behaviour RabbitMQStream.Consumer.Behaviour
198198

199199
def start_link(opts \\ []) do
200200
unless !Keyword.has_key?(opts, :serializer) do
@@ -215,27 +215,22 @@ defmodule RabbitMQStream.Consumer do
215215
%{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}}
216216
end
217217

218+
def stop() do
219+
GenServer.stop(__MODULE__)
220+
end
221+
218222
def credit(amount) do
219-
GenServer.cast(__MODULE__, {:credit, amount})
223+
RabbitMQStream.Consumer.credit(__MODULE__, amount)
220224
end
221225

222226
def get_credits() do
223-
GenServer.call(__MODULE__, :get_credits)
227+
RabbitMQStream.Consumer.get_credits(__MODULE__)
224228
end
225229

226230
def store_offset() do
227-
GenServer.call(__MODULE__, :store_offset)
231+
RabbitMQStream.Consumer.store_offset(__MODULE__)
228232
end
229233

230-
def before_start(_opts, state), do: state
231-
232-
def handle_chunk(_chunk), do: nil
233-
def handle_chunk(chunk, _state), do: handle_chunk(chunk)
234-
235-
def handle_message(_message), do: nil
236-
def handle_message(message, _state), do: handle_message(message)
237-
def handle_message(message, _chunk, state), do: handle_message(message, state)
238-
239234
unquote(
240235
if serializer != nil do
241236
quote do
@@ -248,7 +243,7 @@ defmodule RabbitMQStream.Consumer do
248243
end
249244
)
250245

251-
defoverridable RabbitMQStream.Consumer
246+
defoverridable RabbitMQStream.Consumer.Behaviour
252247
end
253248
end
254249

@@ -266,121 +261,20 @@ defmodule RabbitMQStream.Consumer do
266261
%{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}}
267262
end
268263

269-
@doc """
270-
The callback that is invoked when a chunk is received.
271-
272-
The server sends us messages in chunks, which of each contains many messages. But if we are
273-
consuming from an offset inside of a chunk, or if we have enabled the `filter_value` parameter,
274-
some of those messages might not be passed to `handle_message/1` callback.
275-
276-
You can use use `handle_chunk/1` to access the whole chunk for some extra logic.
277-
278-
Implemeting `handle_chunk/1` doesn't prevent `handle_message/1` from being called.
279-
280-
Optionally if you implement `handle_chunk/2`, it also passes the current
281-
state of the consumer. It can be used to access the `private` field
282-
passed to `start_link/1`, or the `stream_name` itself.
283-
284-
"""
285-
@callback handle_chunk(chunk :: RabbitMQStream.OsirisChunk.t()) :: term()
286-
@callback handle_chunk(chunk :: RabbitMQStream.OsirisChunk.t(), state :: t()) :: term()
287-
288-
@doc """
289-
Callback invoked on each message received from the stream.
290-
291-
This is the main way of consuming messages, and it applies any filtering and decoding necessary
292-
to the message before being invoked.
293-
"""
294-
@callback handle_message(message :: binary() | term()) :: term()
295-
@callback handle_message(message :: binary() | term(), state :: t()) :: term()
296-
@callback handle_message(message :: binary() | term(), chunk :: RabbitMQStream.OsirisChunk.t(), state :: t()) ::
297-
term()
298-
299-
@doc """
300-
If the consumer has been defined with the 'single-active-consumer' parameter,
301-
this callback is invoked when the consumer is being upgraded to being the
302-
active one, or when downgraded to being an inactive one.
303-
304-
When the flag parameter is set to ':upgrade', it means that the consumer is being
305-
upgraded to active and it must return the offset for where it wants to start
306-
consuming from the stream.
307-
308-
When being downgraded, the offset returned by the callback is also sent
309-
to the server but, at the moment, is not being used in any way, and is only
310-
sent because the API requires. But this is actually a good moment to store
311-
the offset so that it can be retrieved by the other consumer that is being
312-
upgraded.
313-
"""
314-
@callback handle_update(consumer :: t(), action :: :upgrade | :downgrade) ::
315-
{:ok, RabbitMQStream.Connection.offset()} | {:error, any()}
316-
317-
@doc """
318-
Callback invoked on each message inside of a chunk.
319-
320-
It can be used to decode the message from a binary format into a Map,
321-
or to use GZIP to decompress the content.
322-
323-
You can also globally define a 'Serializer' module, that must implement
324-
the 'decode!/1' callback, at compile-time configuration so it is added
325-
to as the default callback.
326-
"""
327-
@callback decode!(message :: String.t()) :: term()
328-
329-
@doc """
330-
Callback invoked right before subscribing a consumer to the stream.
331-
Might be usefull for setup logic, like creating a stream if it doesn't yet exists.
332-
"""
333-
@callback before_start(opts(), t()) :: t()
334-
335-
@doc """
336-
Send a command to add the provided amount of credits to the consumer.
337-
338-
The credits are tracked by the Server, but it is also stored internally
339-
on the Consumer state, which then can be retreived by calling 'get_credits/0'.
340-
341-
Always returns :ok, and any errors when adding credits to a consumer are logged.
342-
"""
343-
@callback credit(amount :: non_neg_integer()) :: :ok
344-
345-
@doc """
346-
Returns the internally tracked amount of credits for the Consumer.
347-
"""
348-
@callback get_credits() :: non_neg_integer()
349-
350-
@doc """
351-
Persists the consumer's latests offset into the stream.
352-
353-
Be aware that it does not reset any tracking strategy.
354-
"""
355-
@callback store_offset() :: :ok
356-
357-
@doc """
358-
Computes the `filter_value` for a decoded messages, used for filtering incoming messages.
359-
360-
Must follow the same implementation you define at your `c:RabbitMQStream.Producer.filter_value/1`
361-
callback.
264+
@spec credit(GenServer.server(), amount :: non_neg_integer()) :: :ok
265+
def credit(server, amount) do
266+
GenServer.cast(server, {:credit, amount})
267+
end
362268

363-
Required when passing either `:filter` or `:match_unfiltered` properties when declaring the consumer.
269+
@spec get_credits(GenServer.server()) :: non_neg_integer()
270+
def get_credits(server) do
271+
GenServer.call(server, :get_credits)
272+
end
364273

365-
Since the server sends the data in chunks, each of which is guaranted to have at least one message
366-
that match the consumer filter, but it might have some that don't. We need declare this callback to
367-
do additional client side filtering, so that the `handle_message/1` callback only receives those
368-
messages it is really interested in.
369-
"""
370-
@callback filter_value(term()) :: binary() | nil
371-
372-
@optional_callbacks handle_chunk: 1,
373-
handle_chunk: 2,
374-
handle_message: 1,
375-
handle_message: 2,
376-
handle_message: 3,
377-
decode!: 1,
378-
handle_update: 2,
379-
before_start: 2,
380-
get_credits: 0,
381-
store_offset: 0,
382-
filter_value: 1,
383-
credit: 1
274+
@spec store_offset(GenServer.server()) :: :ok
275+
def store_offset(server) do
276+
GenServer.call(server, :store_offset)
277+
end
384278

385279
defstruct [
386280
:offset_reference,

lib/consumer/lifecycle.ex

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@ defmodule RabbitMQStream.Consumer.LifeCycle do
4949

5050
@impl true
5151
def handle_continue({:init, opts}, state) do
52-
state = apply(state.consumer_module, :before_start, [opts, state])
52+
state =
53+
if function_exported?(state.consumer_module, :before_start, 2) do
54+
apply(state.consumer_module, :before_start, [opts, state])
55+
else
56+
state
57+
end
5358

5459
last_offset =
5560
case RabbitMQStream.Connection.query_offset(state.connection, state.stream_name, state.offset_reference) do
@@ -118,13 +123,31 @@ defmodule RabbitMQStream.Consumer.LifeCycle do
118123

119124
@impl true
120125
def handle_info({:deliver, %DeliverData{osiris_chunk: %RabbitMQStream.OsirisChunk{} = chunk}}, state) do
121-
# We could wrap the application with a `try do` clause, but it would break the "let it crash" filosophy
122-
apply(state.consumer_module, :handle_chunk, [chunk, state])
126+
if function_exported?(state.consumer_module, :handle_chunk, 2) do
127+
apply(state.consumer_module, :handle_chunk, [chunk, state])
128+
end
129+
130+
for message <- Enum.slice(chunk.data_entries, (state.last_offset - chunk.chunk_id)..chunk.num_entries) do
131+
message =
132+
if function_exported?(state.consumer_module, :decode!, 1) do
133+
apply(state.consumer_module, :decode!, [message])
134+
else
135+
message
136+
end
137+
138+
if filtered?(message, state) do
139+
if function_exported?(state.consumer_module, :handle_message, 3) do
140+
apply(state.consumer_module, :handle_message, [message, chunk, state])
141+
end
123142

124-
for message <- Enum.slice(chunk.data_entries, (state.last_offset - chunk.chunk_id)..chunk.num_entries),
125-
decoded = apply(state.consumer_module, :decode!, [message]),
126-
filtered?(decoded, state) do
127-
apply(state.consumer_module, :handle_message, [decoded, chunk, state])
143+
if function_exported?(state.consumer_module, :handle_message, 2) do
144+
apply(state.consumer_module, :handle_message, [message, state])
145+
end
146+
147+
if function_exported?(state.consumer_module, :handle_message, 1) do
148+
apply(state.consumer_module, :handle_message, [message])
149+
end
150+
end
128151
end
129152

130153
# Based on the [Python implementation](https://github.com/qweeze/rstream/blob/a81176a5c7cf4accaee25ca7725bd7bd94bf0ce8/rstream/consumer.py#L327),
@@ -213,7 +236,12 @@ defmodule RabbitMQStream.Consumer.LifeCycle do
213236
match_unfiltered = state.properties[:match_unfiltered]
214237

215238
if filter != nil or match_unfiltered != nil do
216-
filter_value = apply(state.consumer_module, :filter_value, [decoded])
239+
filter_value =
240+
if function_exported?(state.consumer_module, :filter_value, 1) do
241+
apply(state.consumer_module, :filter_value, [decoded])
242+
else
243+
nil
244+
end
217245

218246
case {filter_value, filter, match_unfiltered} do
219247
{nil, _, true} ->

0 commit comments

Comments
 (0)