Skip to content

Commit 7a8d5fe

Browse files
committed
doc: Adding notes on using the lib on clustered enviroments
1 parent c87a271 commit 7a8d5fe

2 files changed

Lines changed: 56 additions & 1 deletion

File tree

lib/producer/producer.ex

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ defmodule RabbitMQStream.Producer do
3434
The RabbitMQStream.Producer accepts the following options:
3535
3636
* `:stream_name` - The name of the stream to publish to. Required.
37-
* `:reference_name` - The string which is used by the server to prevent [Duplicate Message](https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/). Defaults to `__MODULE__.Producer`.
37+
* `:reference_name` - The string which is used by the server to prevent [Duplicate Message](https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/). Defaults to `__MODULE__.Producer`. (If clustering in production, check notes and the end.)
3838
* `:connection` - The identifier for a `RabbitMQStream.Connection`.
3939
* `:serializer` - The module to use to decode the message. Defaults to `nil`, which means no encoding is done.
4040
@@ -111,6 +111,17 @@ defmodule RabbitMQStream.Producer do
111111
The default value for the `:serializer` is the module itself, unless a default is defined at a higher level of the
112112
configuration. If there is a `encode!/1` callback defined, it is always used
113113
114+
# Notes on Clustering
115+
116+
Be aware that the sequence tracking for each `:reference_name` is global. Meaning the if you are running
117+
your Elixir as a cluster of multiple nodes, and each having a process of a Producer with the same
118+
`:reference_name`, you may encounter issues with message de-duplication, where messages are being
119+
dropped because the sequence on each producer's state might not be up to date after another process
120+
with the same `:reference_name` produced a message.
121+
122+
There might be cases where you would want this behaviour. If not, be sure to declare a unique
123+
`:reference_name` for each process.
124+
114125
"""
115126

116127
defmacro __using__(opts) do

test/consumer_test.exs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ defmodule RabbitMQStreamTest.Consumer do
1010
use RabbitMQStream.Connection
1111
end
1212

13+
defmodule SupervisedConnection2 do
14+
use RabbitMQStream.Connection
15+
end
16+
1317
defmodule SupervisorProducer do
1418
use RabbitMQStream.Producer,
1519
connection: SupervisedConnection
@@ -52,6 +56,9 @@ defmodule RabbitMQStreamTest.Consumer do
5256
{:ok, _conn} = SupervisedConnection.start_link(host: "localhost", vhost: "/")
5357
:ok = SupervisedConnection.connect()
5458

59+
{:ok, _conn} = SupervisedConnection2.start_link(host: "localhost", vhost: "/")
60+
:ok = SupervisedConnection2.connect()
61+
5562
:ok
5663
end
5764

@@ -168,4 +175,41 @@ defmodule RabbitMQStreamTest.Consumer do
168175

169176
SupervisedConnection.delete_stream(@stream)
170177
end
178+
179+
@stream "consumer-test-stream-04"
180+
@reference_name "reference-04"
181+
test "should not duplicate messages when the reference_name is used twice" do
182+
SupervisedConnection.create_stream(@stream)
183+
184+
{:ok, _subscriber} =
185+
Consumer.start_link(
186+
initial_offset: :next,
187+
stream_name: @stream,
188+
private: self(),
189+
offset_tracking: [count: [store_after: 1]]
190+
)
191+
192+
{:ok, first} =
193+
RabbitMQStream.Producer.start_link(
194+
connection: SupervisedConnection,
195+
stream_name: @stream,
196+
reference_name: "#{@reference_name}"
197+
)
198+
199+
{:ok, second} =
200+
RabbitMQStream.Producer.start_link(
201+
connection: SupervisedConnection2,
202+
stream_name: @stream,
203+
reference_name: "#{@reference_name}"
204+
)
205+
206+
Process.sleep(1000)
207+
208+
:ok = RabbitMQStream.Producer.publish(first, Jason.encode!(%{message: "first"}))
209+
Process.sleep(500)
210+
:ok = RabbitMQStream.Producer.publish(second, Jason.encode!(%{message: "second"}))
211+
212+
assert_receive {:message, %{"message" => "first"}}, 500
213+
refute_receive {:message, %{"message" => "second"}}, 500
214+
end
171215
end

0 commit comments

Comments
 (0)