Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions rel/overlay/etc/default.ini
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,14 @@ partitioned||* = true
; Checkpoint interval
;checkpoint_interval = 30000

; Connect_to override for replicator outbound requests
; Format: pattern:port:target:targetport[,pattern:port:target:targetport,...]
; Examples:
; *.example.com:443:proxy.internal:8443
; api.example.com:443:127.0.0.1:443
; *.example.com:443:[2001:db8::1]:443
;connect_to =

; Some socket options that might boost performance in some scenarios:
; {nodelay, boolean()}
; {sndbuf, integer()}
Expand Down
4 changes: 4 additions & 0 deletions src/couch_replicator/priv/stats_descriptions.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,7 @@
{type, counter},
{desc, <<"number of times a worker is gracefully shut down">>}
]}.
{[couch_replicator, connect_to_applied], [
{type, counter},
{desc, <<"number of times DNS overrides were applied to replication requests">>}
]}.
9 changes: 7 additions & 2 deletions src/couch_replicator/src/couch_replicator_auth_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
]).

-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include_lib("ibrowse/include/ibrowse.hrl").

-type headers() :: [{string(), string()}].
-type code() :: non_neg_integer().
Expand Down Expand Up @@ -311,20 +312,24 @@ refresh(#state{session_url = Url, user = User, pass = Pass} = State) ->
{ok, string(), headers(), binary()} | {error, term()}.
http_request(#state{httpdb_pool = Pool} = State, Url, Headers, Method, Body) ->
Timeout = State#state.httpdb_timeout,

Opts = [
{response_format, binary},
{inactivity_timeout, Timeout}
| State#state.httpdb_ibrowse_options
],

{Url1, Opts1} = couch_replicator_connect:apply_connect_to(Url, Opts),

{ok, Wrk} = couch_replicator_httpc_pool:get_worker(Pool),
try
Result = ibrowse:send_req_direct(
Wrk,
Url,
Url1,
Headers,
Method,
Body,
Opts,
Opts1,
Timeout
),
case Result of
Expand Down
268 changes: 268 additions & 0 deletions src/couch_replicator/src/couch_replicator_connect.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

%% This module implements the connect_to configuration option, which allows
%% routing replication requests through proxies or rewriting connection ports.
%% Similar to curl's --connect-to option.
%%
%% Flow:
%% 1. init/0 - Parse and cache connect_to config at startup
%% 2. apply_connect_to/2 - For each replication request:
%% a. Parse URL to extract host and port
%% b. resolve_connection/2 - Match host:port against override patterns
%% c. On match:
%% - Reconstruct URL with target port
%% - Add ibrowse connect_to option (target host)
%% - Add SNI option for HTTPS (original host)
%% d. Return modified URL and options
%%
%% Configuration format: host:port:target_host:target_port
%% Example: *.example.com:443:proxy.internal:8443
%%
%% Pattern matching:
%% - Exact hostnames: foo.example.com
%% - Leading wildcards: *.example.com (matches sub.example.com, not example.com)
%% - Case-insensitive
%% - Port must match exactly

-module(couch_replicator_connect).

-include_lib("ibrowse/include/ibrowse.hrl").

-export([
init/0,
apply_connect_to/2
]).

-ifdef(TEST).
-export([
parse_config/1,
match_host_pattern/2,
get_overrides/0,
resolve_connection/2,
parse_ip_address/1
]).
-endif.

-type connect_to_override() :: {
PatternHost :: binary(),
PatternPort :: integer(),
TargetHost :: binary() | inet:ip_address(),
TargetPort :: integer()
}.

-define(CONNECT_TO_KEY, {?MODULE, connect_to}).

%% Initialize connect_to overrides cache
-spec init() -> ok.
init() ->
Overrides =
case config:get("replicator", "connect_to", undefined) of
undefined -> [];
ConfigStr -> parse_config(ConfigStr)
end,
persistent_term:put(?CONNECT_TO_KEY, Overrides),
ok.

%% Resolve connection override for a host:port pair.
%% String/binary conversions:
%% - Input: ibrowse provides Host as string
%% - Internal: overrides stored as binaries or tuples (IPs)
%% - Output: ibrowse connect_to option requires string or tuple
-spec resolve_connection(string(), integer()) ->
{string() | inet:ip_address(), integer(), string()} | not_found.
resolve_connection(Host, Port) ->
case find_override(list_to_binary(Host), Port, get_overrides()) of
{ok, {TargetHost, TargetPort}} when is_binary(TargetHost) ->
{binary_to_list(TargetHost), TargetPort, Host};
{ok, {TargetHost, TargetPort}} when is_tuple(TargetHost) ->
{TargetHost, TargetPort, Host};
not_found ->
not_found
end.

-spec get_overrides() -> [connect_to_override()].
get_overrides() ->
case persistent_term:get(?CONNECT_TO_KEY, not_initialized) of
not_initialized ->
% fall back to reading config
case config:get("replicator", "connect_to", undefined) of
undefined -> [];
ConfigStr -> parse_config(ConfigStr)
end;
Overrides ->
Overrides
end.

-spec parse_config(string()) -> [connect_to_override()].
parse_config(ConfigStr) ->
ConfigBin = list_to_binary(ConfigStr),
Entries = binary:split(ConfigBin, <<",">>, [global, trim]),
lists:filtermap(fun parse_entry/1, Entries).

% Format: HOST:PORT:TARGET:TARGET_PORT (matches curl --connect-to)
% Examples:
% *.example.com:443:192.168.1.1:8443
% *.example.com:443:[2001:db8::1]:8443
% IPv6 addresses in targets must be enclosed in brackets
parse_entry(<<>>) ->
false;
parse_entry(Entry0) ->
Entry = string:trim(Entry0),
% Regex: HOST:PORT:TARGET:TARGET_PORT where TARGET can be [IPv6]
% Reject IPv6 patterns (starting with [), ensure non-empty captures
Pattern = "^([^:\\[]+):([0-9]+):([^:]+|\\[[^\\]]+\\]):([0-9]+)$",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit of a gnarly expression but I couldn't come up with anything smaller, after staring at it for a while it does seems right

case re:run(Entry, Pattern, [{capture, all_but_first, binary}]) of
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all_but_first

Hadn't seen that used before, that's neat:

All but the first matching subpattern, that is, all explicitly captured subpatterns, but not the complete matching part of the subject string. This is useful if the regular expression as a whole matches a large part of the subject, but the part you are interested in is in an explicitly captured subpattern.

{match, [PatternHost, PatternPortBin, TargetHost0, TargetPortBin]} ->
% Regex guarantees non-empty hosts and numeric ports
PatternPort = binary_to_integer(PatternPortBin),
TargetPort = binary_to_integer(TargetPortBin),
% Convert IP addresses to tuples; keep hostnames as binaries
TargetHost =
case parse_ip_address(TargetHost0) of
{ok, IpTuple} -> IpTuple;
{error, einval} -> TargetHost0
end,
{true, {PatternHost, PatternPort, TargetHost, TargetPort}};
nomatch ->
couch_log:warning(
"Invalid connect_to entry: ~ts (expected HOST:PORT:TARGET:TARGET_PORT)", [
Entry
]
),
false
end.

-spec find_override(binary(), integer(), [connect_to_override()]) ->
{ok, {binary() | inet:ip_address(), integer()}} | not_found.
find_override(_Host, _Port, []) ->
not_found;
% This relies on pattern matching the host Port and the config entry Port,
% before testing the Host against the config entry Pattern
find_override(Host, Port, [{Pattern, Port, Target, TargetPort} | Rest]) ->
case match_host_pattern(Host, Pattern) of
true ->
{ok, {Target, TargetPort}};
false ->
find_override(Host, Port, Rest)
end;
find_override(Host, Port, [_Mismatch | Rest]) ->
find_override(Host, Port, Rest).

% Host Pattern Matching
%
% Supports leading wildcard patterns only:
% - *.example.com matches any.subdomain.example.com
% - *.example.com does NOT match example.com (requires at least one subdomain)
%
% Not supported:
% - middle wildcards: sub.*.example.com
% - trailing wildcards: example.*
% - multiple wildcards: *.*.example.com
-spec match_host_pattern(binary(), binary()) -> boolean().
match_host_pattern(Host, Pattern) when is_binary(Host), is_binary(Pattern) ->
% DNS names are case-insensitive
HostLower = string:lowercase(Host),
PatternLower = string:lowercase(Pattern),
match_host_pattern_impl(HostLower, PatternLower).

match_host_pattern_impl(Host, <<"*", Suffix/binary>>) ->
% wildcard match: extract last N bytes from Host and compare to Suffix
HostSize = byte_size(Host),
SuffixSize = byte_size(Suffix),
% ensure we have enough bytes before extracting suffix
case HostSize >= SuffixSize of
true ->
Pos = HostSize - SuffixSize,
binary:part(Host, Pos, SuffixSize) =:= Suffix;
false ->
false
end;
match_host_pattern_impl(Host, Pattern) ->
Host =:= Pattern.

%% Parse IP address from string or binary, stripping IPv6 brackets if present.
%% Returns {ok, IpTuple} if valid IP, {error, einval} otherwise.
-spec parse_ip_address(string() | binary()) -> {ok, inet:ip_address()} | {error, einval}.
parse_ip_address(Host) when is_list(Host) ->
HostStripped = string:trim(Host, both, "[]"),
case inet:parse_strict_address(HostStripped) of
{ok, IpTuple} -> {ok, IpTuple};
{error, _} -> {error, einval}
end;
parse_ip_address(Host) when is_binary(Host) ->
parse_ip_address(binary_to_list(Host)).

%% Apply connect_to override to URL and ibrowse options
-spec apply_connect_to(string(), list()) -> {string(), list()}.
apply_connect_to(Url, IbrowseOptions) ->
case ibrowse_lib:parse_url(Url) of
{error, _} ->
{Url, IbrowseOptions};
#url{host = Host, port = Port, protocol = Protocol} = ParsedUrl ->
case resolve_connection(Host, Port) of
{TargetHost, TargetPort, OriginalHost} ->
% Reconstruct URL with target port
Url2 = reconstruct_url(ParsedUrl, TargetPort),
% Apply connection override options
Opts = apply_override_options(
IbrowseOptions,
Protocol,
TargetHost,
OriginalHost
),
{Url2, Opts};
not_found ->
{Url, IbrowseOptions}
end
end.

%% Reconstruct URL with new port.
%% Note: ibrowse:send_req_direct requires a string URL, not a parsed #url{} record.
%% The #url.path field from ibrowse_lib:parse_url includes the full path with
%% query string and fragment, so we don't need to handle those separately.
%% Credentials are not included because normalize_basic_auth() strips them from
%% URLs before they reach this code - they're passed via ibrowse options instead.
-spec reconstruct_url(#url{}, integer()) -> string().
reconstruct_url(#url{protocol = Protocol, host = Host, path = Path}, NewPort) ->
Scheme = atom_to_list(Protocol),
PortStr = ":" ++ integer_to_list(NewPort),
Scheme ++ "://" ++ Host ++ PortStr ++ Path.

%% Apply connect_to and SNI options
-spec apply_override_options(list(), atom(), string() | inet:ip_address(), string()) -> list().
apply_override_options(Opts, Protocol, TargetHost, OriginalHost) ->
couch_log:debug(
"connect_to override (~p): ~s -> ~p",
[Protocol, OriginalHost, TargetHost]
),
couch_stats:increment_counter([couch_replicator, connect_to_applied]),
% Add connect_to option (ibrowse accepts string or tuple)
Opts1 = [{connect_to, TargetHost} | Opts],
% Add SNI for HTTPS if OriginalHost is a hostname (not IP)
SNIHost =
case {Protocol, parse_ip_address(OriginalHost)} of
{https, {error, _}} ->
OriginalHost;
_ ->
disable
end,
add_sni_option(Opts1, SNIHost).

-spec add_sni_option(list(), string() | disable) -> list().
add_sni_option(IbrowseOpts, Host) ->
SslOpts = proplists:get_value(ssl_options, IbrowseOpts, []),
SslOpts1 = [
{server_name_indication, Host}
| proplists:delete(server_name_indication, SslOpts)
],
lists:keystore(ssl_options, 1, IbrowseOpts, {ssl_options, SslOpts1}).
11 changes: 9 additions & 2 deletions src/couch_replicator/src/couch_replicator_httpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb0, Params) ->
UserHeaders = get_value(headers, Params, []),
Headers1 = merge_headers(BaseHeaders, UserHeaders),
{Headers2, HttpDb} = couch_replicator_auth:update_headers(HttpDb0, Headers1),
Url = full_url(HttpDb, Params),
Url0 = full_url(HttpDb, Params),
Body = get_value(body, Params, []),
case get_value(path, Params) == "_changes" of
true ->
Expand All @@ -131,7 +131,7 @@ send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb0, Params) ->
{User, Pass} when is_list(User), is_list(Pass) ->
[{basic_auth, {User, Pass}}]
end,
IbrowseOptions =
IbrowseOptions0 =
BasicAuthOpts ++
[
{response_format, binary},
Expand All @@ -142,6 +142,13 @@ send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb0, Params) ->
HttpDb#httpdb.ibrowse_options
)
],

% Apply connect_to override and SNI configuration
{Url, IbrowseOptions} = couch_replicator_connect:apply_connect_to(
Url0,
IbrowseOptions0
),

backoff_before_request(Worker, HttpDb, Params),
Response = ibrowse:send_req_direct(
Worker, Url, Headers2, Method, Body, IbrowseOptions, Timeout
Expand Down
4 changes: 4 additions & 0 deletions src/couch_replicator/src/couch_replicator_scheduler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ init(_) ->
],
?MODULE = ets:new(?MODULE, EtsOpts),
ok = couch_replicator_share:init(),
ok = couch_replicator_connect:init(),
ok = config:listen_for_changes(?MODULE, nil),
Interval = get_interval_msec(),
MaxJobs = config:get_integer("replicator", "max_jobs", ?DEFAULT_MAX_JOBS),
Expand Down Expand Up @@ -385,6 +386,9 @@ handle_config_change("replicator", "interval", V, _, S) ->
handle_config_change("replicator", "max_history", V, _, S) ->
ok = gen_server:cast(?MODULE, {set_max_history, list_to_integer(V)}),
{ok, S};
handle_config_change("replicator", "connect_to", _, _, S) ->
ok = couch_replicator_connect:init(),
{ok, S};
handle_config_change("replicator.shares", Key, deleted, _, S) ->
ok = gen_server:cast(?MODULE, {reset_shares, list_to_binary(Key)}),
{ok, S};
Expand Down
Loading