Skip to content

Commit 1b4d6dd

Browse files
authored
Merge pull request #13 from VictorGaiva/feat/user-commands-buffer
feat: Split user and internal commands buffers
2 parents 0ef9697 + fb26880 commit 1b4d6dd

5 files changed

Lines changed: 70 additions & 27 deletions

File tree

lib/connection/connection.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,8 @@ defmodule RabbitMQStream.Connection do
529529
},
530530
frames_buffer: RabbitMQStream.Message.Buffer.t(),
531531
request_buffer: :queue.queue({term(), pid()}),
532-
commands_buffer: :queue.queue({atom(), atom(), list({atom(), term()})}),
532+
internal_buffer: :queue.queue({atom(), atom(), list({atom(), term()})}),
533+
user_buffer: :queue.queue({atom(), atom(), list({atom(), term()})}),
533534
# this should not be here. Should find a better way to return the close reason from the 'handler' module
534535
close_reason: String.t() | atom() | nil,
535536
transport: RabbitMQStream.Connection.Transport.t()
@@ -552,7 +553,8 @@ defmodule RabbitMQStream.Connection do
552553
commands: %{},
553554
request_buffer: :queue.new(),
554555
frames_buffer: RabbitMQStream.Message.Buffer.init(),
555-
commands_buffer: :queue.new(),
556+
internal_buffer: :queue.new(),
557+
user_buffer: :queue.new(),
556558
close_reason: nil
557559
]
558560
end

lib/connection/handler.ex

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ defmodule RabbitMQStream.Connection.Handler do
1111
Logger.debug("Connection closed")
1212

1313
%{conn | state: :closing, close_reason: request.data.reason}
14-
|> Helpers.push(:response, :close, correlation_id: request.correlation_id, code: :ok)
14+
|> Helpers.push_internal(:response, :close, correlation_id: request.correlation_id, code: :ok)
1515
end
1616

1717
def handle_message(%Connection{} = conn, %Request{command: :tune} = request) do
@@ -22,8 +22,8 @@ defmodule RabbitMQStream.Connection.Handler do
2222
options = Keyword.merge(conn.options, frame_max: request.data.frame_max, heartbeat: request.data.heartbeat)
2323

2424
%{conn | options: options, state: :opening}
25-
|> Helpers.push(:response, :tune, correlation_id: 0)
26-
|> Helpers.push(:request, :open)
25+
|> Helpers.push_internal(:response, :tune, correlation_id: 0)
26+
|> Helpers.push_internal(:request, :open)
2727
end
2828

2929
def handle_message(%Connection{} = conn, %Request{command: :heartbeat}) do
@@ -32,7 +32,7 @@ defmodule RabbitMQStream.Connection.Handler do
3232

3333
def handle_message(%Connection{} = conn, %Request{command: :metadata_update} = request) do
3434
conn
35-
|> Helpers.push(:request, :query_metadata, streams: [request.data.stream_name])
35+
|> Helpers.push_internal(:request, :query_metadata, streams: [request.data.stream_name])
3636
end
3737

3838
def handle_message(%Connection{} = conn, %Request{command: :deliver} = response) do
@@ -129,14 +129,14 @@ defmodule RabbitMQStream.Connection.Handler do
129129
peer_properties = Map.put(response.data.peer_properties, "base-version", version)
130130

131131
%{conn | peer_properties: peer_properties}
132-
|> Helpers.push(:request, :sasl_handshake)
132+
|> Helpers.push_internal(:request, :sasl_handshake)
133133
end
134134

135135
def handle_message(%Connection{} = conn, %Response{command: :sasl_handshake} = response) do
136136
Logger.debug("SASL handshake successful. Initiating authentication.")
137137

138138
%{conn | mechanisms: response.data.mechanisms}
139-
|> Helpers.push(:request, :sasl_authenticate)
139+
|> Helpers.push_internal(:request, :sasl_authenticate)
140140
end
141141

142142
def handle_message(%Connection{} = conn, %Response{command: :sasl_authenticate, data: %{sasl_opaque_data: ""}}) do
@@ -150,7 +150,7 @@ defmodule RabbitMQStream.Connection.Handler do
150150
Logger.debug("Opening connection to vhost: \"#{conn.options[:vhost]}\"")
151151

152152
conn
153-
|> Helpers.push(:request, :open)
153+
|> Helpers.push_internal(:request, :open)
154154
|> Map.put(:state, :opening)
155155
end
156156

@@ -162,7 +162,7 @@ defmodule RabbitMQStream.Connection.Handler do
162162

163163
%{conn | options: options}
164164
|> Map.put(:state, :opening)
165-
|> Helpers.push(:request, :open)
165+
|> Helpers.push_internal(:request, :open)
166166
end
167167

168168
# If the server has a version lower than 3.13, this is the 'terminating' response.
@@ -192,7 +192,7 @@ defmodule RabbitMQStream.Connection.Handler do
192192
)
193193

194194
%{conn | connection_properties: response.data.connection_properties}
195-
|> Helpers.push(:request, :exchange_command_versions)
195+
|> Helpers.push_internal(:request, :exchange_command_versions)
196196
end
197197

198198
def handle_message(%Connection{} = conn, %Response{command: :query_metadata} = response) do
@@ -310,7 +310,10 @@ defmodule RabbitMQStream.Connection.Handler do
310310
conn
311311
else
312312
conn
313-
|> Helpers.push(:response, :consumer_update, correlation_id: request.correlation_id, code: :internal_error)
313+
|> Helpers.push_internal(:response, :consumer_update,
314+
correlation_id: request.correlation_id,
315+
code: :internal_error
316+
)
314317
end
315318
end
316319
end

lib/connection/helpers.ex

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,16 @@ defmodule RabbitMQStream.Connection.Helpers do
1313
{entry, %{conn | request_tracker: request_tracker}}
1414
end
1515

16-
def push(conn, action, command, opts \\ []) do
17-
commands_buffer = :queue.in({action, command, opts}, conn.commands_buffer)
16+
def push_user(conn, action, command, opts \\ []) do
17+
user_buffer = :queue.in({action, command, opts}, conn.user_buffer)
1818

19-
%{conn | commands_buffer: commands_buffer}
19+
%{conn | user_buffer: user_buffer}
20+
end
21+
22+
def push_internal(conn, action, command, opts \\ []) do
23+
internal_buffer = :queue.in({action, command, opts}, conn.internal_buffer)
24+
25+
%{conn | internal_buffer: internal_buffer}
2026
end
2127

2228
defguard is_offset(offset)

lib/connection/lifecycle.ex

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ defmodule RabbitMQStream.Connection.Lifecycle do
5252

5353
conn =
5454
%{conn | connect_requests: [from | conn.connect_requests]}
55-
|> send_request(:peer_properties)
55+
|> Helpers.push_internal(:request, :peer_properties)
56+
|> flush_buffer(:internal)
5657

5758
{:noreply, conn}
5859
else
@@ -229,7 +230,8 @@ defmodule RabbitMQStream.Connection.Lifecycle do
229230
# command to the socket. This also would allow us to better test the 'handler' logic.
230231
commands
231232
|> Enum.reduce(conn, &Handler.handle_message(&2, &1))
232-
|> flush_commands()
233+
|> flush_buffer(:internal)
234+
|> flush_buffer(:user)
233235
|> handle_closing()
234236
end
235237

@@ -252,7 +254,10 @@ defmodule RabbitMQStream.Connection.Lifecycle do
252254
def handle_info({:heartbeat}, conn) do
253255
Process.send_after(self(), {:heartbeat}, conn.options[:heartbeat] * 1000)
254256

255-
conn = send_request(conn, :heartbeat, correlation_sum: 0)
257+
conn =
258+
conn
259+
|> Helpers.push_internal(:request, :heartbeat, correlation_sum: 0)
260+
|> flush_buffer(:internal)
256261

257262
{:noreply, conn}
258263
end
@@ -292,7 +297,8 @@ defmodule RabbitMQStream.Connection.Lifecycle do
292297

293298
conn =
294299
conn
295-
|> send_request(:peer_properties)
300+
|> Helpers.push_internal(:request, :peer_properties)
301+
|> flush_buffer(:internal)
296302

297303
{:noreply, conn}
298304
else
@@ -328,30 +334,48 @@ defmodule RabbitMQStream.Connection.Lifecycle do
328334

329335
defp handle_closing(conn), do: {:noreply, conn}
330336

331-
defp send_request(%Connection{} = conn, command, opts \\ []) do
337+
defp send_request(%Connection{} = conn, command, opts) do
332338
conn
333-
|> Helpers.push(:request, command, opts)
334-
|> flush_commands()
339+
|> Helpers.push_user(:request, command, opts)
340+
|> flush_buffer(:user)
335341
end
336342

337343
defp send_response(%Connection{} = conn, command, opts) do
338344
conn
339-
|> Helpers.push(:response, command, opts)
340-
|> flush_commands()
345+
|> Helpers.push_user(:response, command, opts)
346+
|> flush_buffer(:user)
347+
end
348+
349+
defp flush_buffer(%Connection{} = conn, :internal) do
350+
conn =
351+
:queue.fold(
352+
fn
353+
command, conn ->
354+
send_command(conn, command)
355+
end,
356+
conn,
357+
conn.internal_buffer
358+
)
359+
360+
%{conn | internal_buffer: :queue.new()}
341361
end
342362

343-
defp flush_commands(%Connection{} = conn) do
363+
defp flush_buffer(%Connection{state: :open} = conn, :user) do
344364
conn =
345365
:queue.fold(
346366
fn
347367
command, conn ->
348368
send_command(conn, command)
349369
end,
350370
conn,
351-
conn.commands_buffer
371+
conn.user_buffer
352372
)
353373

354-
%{conn | commands_buffer: :queue.new()}
374+
%{conn | user_buffer: :queue.new()}
375+
end
376+
377+
defp flush_buffer(%Connection{} = conn, :user) do
378+
conn
355379
end
356380

357381
defp send_command(%Connection{} = conn, {:request, command, opts}) do

test/connection_test.exs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,4 +168,12 @@ defmodule RabbitMQStreamTest.Connection do
168168
assert {:ok, _data} = SupervisedConnection.stream_stats(@stream)
169169
assert {:error, :stream_does_not_exist} = SupervisedConnection.stream_stats("#{@stream}-NON-EXISTENT")
170170
end
171+
172+
# I'm not really sure how to test this.
173+
# @stream "consumer-test-stream-11"
174+
# test "should buffer user commands before the connection is open" do
175+
# {:ok, _conn} = SupervisedConnection.start_link(host: "localhost", vhost: "/")
176+
# :ok = SupervisedConnection.connect()
177+
178+
# end
171179
end

0 commit comments

Comments
 (0)