Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/cart/crt_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ crt_context_init(struct crt_context *ctx)
D_INIT_LIST_HEAD(&ctx->cc_quotas.rpc_waitq);
D_INIT_LIST_HEAD(&ctx->cc_link);

rc = crt_rpc_rx_pool_init(ctx);
if (rc != 0) {
D_ERROR("crt_rpc_rx_pool_init() failed, " DF_RC "\n", DP_RC(rc));
D_GOTO(out_mutex_destroy, rc);
}

if (crt_gdata.cg_progress_legacy) {
ctx->cc_prog_func = crt_progress_legacy;
ctx->cc_prog_cond_func = crt_progress_cond_legacy;
Expand Down Expand Up @@ -222,6 +228,7 @@ crt_context_init(struct crt_context *ctx)
out_binheap_destroy:
d_binheap_destroy_inplace(&ctx->cc_bh_timeout);
out_mutex_destroy:
crt_rpc_rx_pool_fini(ctx);
D_MUTEX_DESTROY(&ctx->cc_quotas.mutex);
D_MUTEX_DESTROY(&ctx->cc_mutex);
out:
Expand Down Expand Up @@ -867,6 +874,8 @@ crt_context_destroy(crt_context_t crt_ctx, int force)

D_RWLOCK_UNLOCK(&crt_gdata.cg_rwlock);

crt_rpc_rx_pool_fini(ctx);

D_MUTEX_DESTROY(&ctx->cc_mutex);
D_DEBUG(DB_TRACE, "destroyed context (idx %d, force %d)\n", ctx->cc_idx, force);
D_FREE(ctx);
Expand Down
4 changes: 2 additions & 2 deletions src/cart/crt_hg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1252,13 +1252,13 @@ crt_rpc_handler_common(hg_handle_t hg_hdl)
rpc_tmp.crp_pub.cr_opc = opc;

/* allocate rpc struct for a given opcode; in/out size will vary per opc */
rc = crt_rpc_priv_alloc(opc, &rpc_priv, false /* forward */);
rc = crt_rpc_priv_alloc_rx(crt_ctx, opc, &rpc_priv);
if (unlikely(rc != 0)) {
/* set client rc to denial of service if server is out of mem */
if (rc == -DER_NOMEM)
rc = -DER_DOS; /* don't log as we are oom already */
else
D_ERROR("crt_rpc_priv_alloc() failed, rc: %d.\n", rc);
D_ERROR("crt_rpc_priv_alloc_rx() failed, rc: %d.\n", rc);

crt_hg_reply_error_send(&rpc_tmp, rc);
crt_hg_unpack_cleanup(proc);
Expand Down
8 changes: 8 additions & 0 deletions src/cart/crt_internal_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,14 @@ struct crt_context {

/** timeout per-context */
uint32_t cc_timeout_sec;
/** free-list of preallocated receive-side RPC objects */
d_list_t cc_rx_rpc_pool;
/** lock for cc_rx_rpc_pool and cc_rx_rpc_pool_num */
pthread_mutex_t cc_rx_rpc_pool_lock;
/** true after cc_rx_rpc_pool_lock has been initialized */
bool cc_rx_rpc_pool_inited;
/** current number of entries in cc_rx_rpc_pool */
uint32_t cc_rx_rpc_pool_num;

/** Stores self uri for the current context */
char cc_self_uri[CRT_ADDR_STR_MAX_LEN];
Expand Down
157 changes: 157 additions & 0 deletions src/cart/crt_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,78 @@
#include "../object/obj_rpc.h"

#define CRT_CTL_MAX_LOG_MSG_SIZE 256
#define CRT_RX_RPC_POOL_RPC_SIZE 4096
#define CRT_RX_RPC_POOL_MAX_NUM 64

D_CASSERT(sizeof(struct crt_rpc_priv) <= CRT_RX_RPC_POOL_RPC_SIZE);

static void
crt_rpc_rx_pool_put(struct crt_context *ctx, struct crt_rpc_priv *rpc_priv)
{
D_ASSERT(ctx != NULL);
D_ASSERT(rpc_priv != NULL);

memset(rpc_priv, 0, CRT_RX_RPC_POOL_RPC_SIZE);
D_INIT_LIST_HEAD(&rpc_priv->crp_rx_pool_link);

D_MUTEX_LOCK(&ctx->cc_rx_rpc_pool_lock);
d_list_add_tail(&rpc_priv->crp_rx_pool_link, &ctx->cc_rx_rpc_pool);
ctx->cc_rx_rpc_pool_num++;
D_MUTEX_UNLOCK(&ctx->cc_rx_rpc_pool_lock);
}

int
crt_rpc_rx_pool_init(struct crt_context *ctx)
{
int rc;
int i;

D_ASSERT(ctx != NULL);

D_INIT_LIST_HEAD(&ctx->cc_rx_rpc_pool);
ctx->cc_rx_rpc_pool_inited = false;
ctx->cc_rx_rpc_pool_num = 0;

rc = D_MUTEX_INIT(&ctx->cc_rx_rpc_pool_lock, NULL);
if (rc != 0)
return rc;
ctx->cc_rx_rpc_pool_inited = true;

for (i = 0; i < CRT_RX_RPC_POOL_MAX_NUM; i++) {
struct crt_rpc_priv *rpc_priv;

D_ALLOC(rpc_priv, CRT_RX_RPC_POOL_RPC_SIZE);
if (rpc_priv == NULL) {
break;
}

D_INIT_LIST_HEAD(&rpc_priv->crp_rx_pool_link);
d_list_add_tail(&rpc_priv->crp_rx_pool_link, &ctx->cc_rx_rpc_pool);
ctx->cc_rx_rpc_pool_num++;
}

return 0;
}

void
crt_rpc_rx_pool_fini(struct crt_context *ctx)
{
struct crt_rpc_priv *rpc_priv;

D_ASSERT(ctx != NULL);
if (!ctx->cc_rx_rpc_pool_inited)
return;

D_MUTEX_LOCK(&ctx->cc_rx_rpc_pool_lock);
while ((rpc_priv = d_list_pop_entry(&ctx->cc_rx_rpc_pool, struct crt_rpc_priv,
crp_rx_pool_link)) != NULL)
D_FREE(rpc_priv);
ctx->cc_rx_rpc_pool_num = 0;
D_MUTEX_UNLOCK(&ctx->cc_rx_rpc_pool_lock);

D_MUTEX_DESTROY(&ctx->cc_rx_rpc_pool_lock);
ctx->cc_rx_rpc_pool_inited = false;
}

void
crt_hdlr_ctl_fi_toggle(crt_rpc_t *rpc_req)
Expand Down Expand Up @@ -564,12 +636,91 @@ crt_rpc_priv_alloc(crt_opcode_t opc, struct crt_rpc_priv **priv_allocated,
return rc;
}

int
crt_rpc_priv_alloc_rx(struct crt_context *ctx, crt_opcode_t opc,
struct crt_rpc_priv **priv_allocated)
{
struct crt_rpc_priv *rpc_priv = NULL;
struct crt_opc_info *opc_info;
int rc = 0;

D_ASSERT(ctx != NULL);
D_ASSERT(priv_allocated != NULL);

opc_info = crt_opc_lookup(crt_gdata.cg_opc_map, opc, CRT_UNLOCK);
if (opc_info == NULL) {
D_ERROR("opc: %#x, lookup failed.\n", opc);
D_GOTO(out, rc = -DER_UNREG);
}

if (opc_info->coi_crf != NULL && (opc_info->coi_crf->crf_size_in > CRT_MAX_INPUT_SIZE ||
opc_info->coi_crf->crf_size_out > CRT_MAX_OUTPUT_SIZE)) {
D_ERROR("opc: %#x, input_size " DF_U64 " or output_size " DF_U64 " too large.\n",
opc, opc_info->coi_crf->crf_size_in, opc_info->coi_crf->crf_size_out);
D_GOTO(out, rc = -DER_INVAL);
}

if (opc_info->coi_rpc_size <= CRT_RX_RPC_POOL_RPC_SIZE) {
D_MUTEX_LOCK(&ctx->cc_rx_rpc_pool_lock);
rpc_priv =
d_list_pop_entry(&ctx->cc_rx_rpc_pool, struct crt_rpc_priv, crp_rx_pool_link);
if (rpc_priv != NULL)
ctx->cc_rx_rpc_pool_num--;
D_MUTEX_UNLOCK(&ctx->cc_rx_rpc_pool_lock);

if (rpc_priv != NULL) {
memset(rpc_priv, 0, CRT_RX_RPC_POOL_RPC_SIZE);
rpc_priv->crp_from_rx_pool = 1;
}
}

if (rpc_priv == NULL) {
D_ALLOC(rpc_priv, opc_info->coi_rpc_size);
if (rpc_priv == NULL)
D_GOTO(out, rc = -DER_NOMEM);
}

rpc_priv->crp_opc_info = opc_info;
rpc_priv->crp_forward = false;
rpc_priv->crp_pub.cr_opc = opc;

rc = D_SPIN_INIT(&rpc_priv->crp_lock, PTHREAD_PROCESS_PRIVATE);
if (rc != 0) {
if (rpc_priv->crp_from_rx_pool)
crt_rpc_rx_pool_put(ctx, rpc_priv);
else
D_FREE(rpc_priv);
D_GOTO(out, rc);
}

rc = D_MUTEX_INIT(&rpc_priv->crp_mutex, NULL /* attr */);
if (rc != 0) {
D_SPIN_DESTROY(&rpc_priv->crp_lock);
if (rpc_priv->crp_from_rx_pool)
crt_rpc_rx_pool_put(ctx, rpc_priv);
else
D_FREE(rpc_priv);
D_GOTO(out, rc);
}

RPC_TRACE(DB_TRACE, rpc_priv, "(opc: %#x rpc_pub: %p) allocated.\n",
rpc_priv->crp_opc_info->coi_opc, &rpc_priv->crp_pub);

*priv_allocated = rpc_priv;
out:
return rc;
}

void
crt_rpc_priv_free(struct crt_rpc_priv *rpc_priv)
{
struct crt_context *ctx;

if (rpc_priv == NULL)
return;

ctx = rpc_priv->crp_pub.cr_ctx;

if (rpc_priv->crp_coll && rpc_priv->crp_corpc_info)
crt_corpc_info_fini(rpc_priv);

Expand All @@ -582,6 +733,12 @@ crt_rpc_priv_free(struct crt_rpc_priv *rpc_priv)
RPC_TRACE(DB_TRACE, rpc_priv, "destroying\n");

D_FREE(rpc_priv->crp_orig_uri);

if (rpc_priv->crp_from_rx_pool && ctx != NULL) {
crt_rpc_rx_pool_put(ctx, rpc_priv);
return;
}

D_FREE(rpc_priv);
}

Expand Down
13 changes: 12 additions & 1 deletion src/cart/crt_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ struct crt_corpc_info {

struct crt_rpc_priv {
crt_rpc_t crp_pub; /* public part */
/* link to crt_context::cc_rx_rpc_pool free-list */
d_list_t crp_rx_pool_link;
/* link to crt_ep_inflight::epi_req_q/::epi_req_waitq */
d_list_t crp_epi_link;
/* link for temp list used during timeout processing */
Expand Down Expand Up @@ -218,7 +220,9 @@ struct crt_rpc_priv {
/* release input buffer early */
crp_release_input_early : 1,
/* rpc expired */
crp_expired : 1;
crp_expired : 1,
/* allocated from per-context receive RPC pool */
crp_from_rx_pool : 1;

struct crt_opc_info *crp_opc_info;
/* corpc info, only valid when (crp_coll == 1) */
Expand Down Expand Up @@ -664,6 +668,13 @@ crt_rpc_cb_customized(struct crt_context *crt_ctx,
/* crt_rpc.c */
int crt_rpc_priv_alloc(crt_opcode_t opc, struct crt_rpc_priv **priv_allocated,
bool forward);
int
crt_rpc_priv_alloc_rx(struct crt_context *ctx, crt_opcode_t opc,
struct crt_rpc_priv **priv_allocated);
int
crt_rpc_rx_pool_init(struct crt_context *ctx);
void
crt_rpc_rx_pool_fini(struct crt_context *ctx);
void crt_rpc_priv_free(struct crt_rpc_priv *rpc_priv);
void
crt_rpc_priv_init(struct crt_rpc_priv *rpc_priv, crt_context_t crt_ctx, bool srv_flag);
Expand Down
Loading