Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/backend/gporca/libgpopt/include/gpopt/base/CUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,21 @@ class CUtils
static CTableDescriptorHashSet *RemoveDuplicateMdids(
CMemoryPool *mp, CTableDescriptorHashSet *tabdescs);

// hash set from CTE ids
typedef CHashSet<ULONG, gpos::HashValue<ULONG>, gpos::Equals<ULONG>,
CleanupDelete<ULONG> >
UlongCteIdHashSet;

static void CollectConsumersAndProducers(CMemoryPool *mp,
CExpression *pexpr,
ULongPtrArray *cteConsumers,
UlongCteIdHashSet *cteProducerSet);

static BOOL hasUnpairedCTEConsumer(CMemoryPool *mp, CExpression *pexpr);
Comment thread
Alena0704 marked this conversation as resolved.
Outdated

static BOOL FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp,
CExpression *pexpr);
Comment thread
Alena0704 marked this conversation as resolved.
Outdated

static CExpression *ReplaceColrefWithProjectExpr(CMemoryPool *mp,
CExpression *pexpr,
CColRef *pcolref,
Expand Down
174 changes: 174 additions & 0 deletions src/backend/gporca/libgpopt/src/base/CUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,180 @@ CUtils::FHasCTEAnchor(CExpression *pexpr)
return false;
}

// return CTEConsumers' and a set of CTEProducers' CTE ids in the given subtree
void
CUtils::CollectConsumersAndProducers(CMemoryPool *mp, CExpression *pexpr,
ULongPtrArray *cteConsumers,
UlongCteIdHashSet *cteProducerSet)
{
Comment thread
Alena0704 marked this conversation as resolved.
Outdated
COperator *pop = pexpr->Pop();

if (COperator::EopPhysicalCTEConsumer == pop->Eopid())
{
cteConsumers->Append(GPOS_NEW(mp) ULONG(
CPhysicalCTEConsumer::PopConvert(pop)->UlCTEId()));
}
else if (COperator::EopPhysicalCTEProducer == pop->Eopid())
{
cteProducerSet->Insert(GPOS_NEW(mp) ULONG(
CPhysicalCTEProducer::PopConvert(pop)->UlCTEId()));
}

for (ULONG ul = 0; ul < pexpr->Arity(); ul++)
{
CExpression *pexprChild = (*pexpr)[ul];

if (!pexprChild->Pop()->FScalar())
{
CollectConsumersAndProducers(mp, pexprChild, cteConsumers,
cteProducerSet);
}
}
}

BOOL
CUtils::hasUnpairedCTEConsumer(CMemoryPool *mp, CExpression *pexpr)
{
BOOL hasUnpairedConsumer = false;

ULongPtrArray *cteConsumers = GPOS_NEW(mp) ULongPtrArray(mp);
UlongCteIdHashSet *cteProducerSet = GPOS_NEW(mp) UlongCteIdHashSet(mp);

CollectConsumersAndProducers(mp, pexpr, cteConsumers, cteProducerSet);

// check if every consumer's producer is in ProducerSet
for (ULONG ul = 0; ul < cteConsumers->Size(); ul++)
{
if (!cteProducerSet->Contains((*cteConsumers)[ul]))
{
hasUnpairedConsumer = true;
break;
}
}
cteConsumers->Release();
cteProducerSet->Release();

return hasUnpairedConsumer;
}

// True if the distribution is replicated-like.
static BOOL
FReplicatedLikeDistribution(CDistributionSpec::EDistributionType edt)
{
return (CDistributionSpec::EdtStrictReplicated == edt ||
CDistributionSpec::EdtTaintedReplicated == edt ||
CDistributionSpec::EdtUniversal == edt);
}

struct SCTEInfo
{
ULONG cteId;
ULONG sliceId;

SCTEInfo(ULONG cte_id, ULONG slice_id) : cteId(cte_id), sliceId(slice_id)
{
}
};

typedef CDynamicPtrArray<SCTEInfo, CleanupDelete<SCTEInfo> > CTEInfoArray;

// Walk the physical tree, recording the slice id of every replicated
// CTE Producer and every CTE Consumer. Slices are delimited by Motion
// nodes: each non-scalar child of a Motion lives in a fresh slice --
// same motId-stack idea as in apply_shareinput_xslice.
static void
CollectCTESlices(CMemoryPool *mp, CExpression *pexpr, ULONG curSlice,
ULONG *pNextSlice, CTEInfoArray *prodInfos,
CTEInfoArray *consInfos)
{
Comment thread
Alena0704 marked this conversation as resolved.
COperator *pop = pexpr->Pop();

if (COperator::EopPhysicalCTEProducer == pop->Eopid())
{
// Producer's distribution comes from its only child -- inspect
// it there. Skip non-replicated Producers; they cannot trigger
// the cross-slice issue we are checking for.
GPOS_ASSERT(1 == pexpr->Arity());
CExpression *pexprChild = (*pexpr)[0];
CDrvdPropPlan *pdpplan =
CDrvdPropPlan::Pdpplan(pexprChild->PdpDerive());

if (FReplicatedLikeDistribution(pdpplan->Pds()->Edt()))
{
prodInfos->Append(GPOS_NEW(mp) SCTEInfo(
CPhysicalCTEProducer::PopConvert(pop)->UlCTEId(), curSlice));
}
}
else if (COperator::EopPhysicalCTEConsumer == pop->Eopid())
{
// Consumer is a leaf -- record (cteId, curSlice) and let the
// caller decide later, once the whole tree has been walked.
consInfos->Append(GPOS_NEW(mp) SCTEInfo(
CPhysicalCTEConsumer::PopConvert(pop)->UlCTEId(), curSlice));
}

BOOL isMotion = CUtils::FPhysicalMotion(pop);

for (ULONG ul = 0; ul < pexpr->Arity(); ul++)
{
CExpression *pexprChild = (*pexpr)[ul];

if (pexprChild->Pop()->FScalar())
{
continue;
}

ULONG childSlice = curSlice;
if (isMotion)
{
(*pNextSlice)++;
childSlice = *pNextSlice;
}

CollectCTESlices(mp, pexprChild, childSlice, pNextSlice, prodInfos,
consInfos);
}
}

BOOL
CUtils::FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp, CExpression *pexpr)
{
if (NULL == pexpr)
{
return false;
}

CTEInfoArray *prodInfos = GPOS_NEW(mp) CTEInfoArray(mp);
CTEInfoArray *consInfos = GPOS_NEW(mp) CTEInfoArray(mp);
ULONG nextSlice = 0;

CollectCTESlices(mp, pexpr, 0 /*curSlice*/, &nextSlice, prodInfos,
consInfos);

BOOL cross = false;

for (ULONG ic = 0; ic < consInfos->Size(); ic++)
{
SCTEInfo *cons = (*consInfos)[ic];

for (ULONG ip = 0; ip < prodInfos->Size(); ip++)
{
SCTEInfo *prod = (*prodInfos)[ip];
if (prod->cteId == cons->cteId && prod->sliceId != cons->sliceId)
{
cross = true;
goto lExit;
Comment thread
Alena0704 marked this conversation as resolved.
Outdated
}
}
}
lExit:

prodInfos->Release();
consInfos->Release();

return cross;
}

//---------------------------------------------------------------------------
// @class:
// CUtils::FHasSubqueryOrApply
Expand Down
18 changes: 18 additions & 0 deletions src/backend/gporca/libgpopt/src/translate/CTranslatorExprToDXL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,20 @@ CTranslatorExprToDXL::PdxlnTranslate(CExpression *pexpr,

GPOS_ASSERT(nullptr == m_pdpplan);

// Walk the physical tree and detect a CTE Consumer placed on a
// different slice than its Producer when the Producer's output is
// replicated-like (StrictReplicated/TaintedReplicated/Universal).
// Fall back to the Postgres optimizer if it is detected because
// it breaks Producer-Consumer locality and can hang the
// query at execution.
if (CUtils::FHasCrossSliceReplicatedCTEConsumer(m_mp, pexpr))
{
GPOS_RAISE(
gpdxl::ExmaDXL, gpdxl::ExmiExpr2DXLUnsupportedFeature,
Comment thread
Alena0704 marked this conversation as resolved.
Outdated
GPOS_WSZ_LIT(
"CTE Consumer placed on a different slice than its replicated Producer"));
}

m_pdpplan = CDrvdPropPlan::Pdpplan(pexpr->PdpDerive());
m_pdpplan->AddRef();

Expand Down Expand Up @@ -4250,6 +4264,10 @@ CTranslatorExprToDXL::BuildScalarSubplans(
{
const ULONG size = pdrgpcrInner->Size();

// Fallback to Postgres optimizer if the SubPlan's inner expression contains a
// CTE Consumer whose Producer lives outside this subtree. Such a Consumer
// would become a cross-slice Shared Scan reader without a local Producer,
// which can hang the query or fail at execution time.
Comment thread
Alena0704 marked this conversation as resolved.
Outdated
CDXLNodeArray *pdrgpdxlnInner = GPOS_NEW(m_mp) CDXLNodeArray(m_mp);
for (ULONG ul = 0; ul < size; ul++)
{
Expand Down
86 changes: 86 additions & 0 deletions src/test/regress/expected/qp_orca_fallback.out
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,92 @@ SELECT * FROM jsonb_array_elements('["b", "a"]'::jsonb) WITH ORDINALITY;
"a" | 2
(2 rows)

-- The walker that detects a CTE Consumer on a different slice than its
-- replicated Producer. Without it ORCA would emit a plan with cross-slice
-- replicated CTE Consumers that hangs at execution.
-- start_ignore
DROP TABLE IF EXISTS tbl1, tbl2;
NOTICE: table "tbl1" does not exist, skipping
NOTICE: table "tbl2" does not exist, skipping
-- end_ignore
CREATE TABLE tbl2 (id numeric, refrcode varchar(255), referenceid numeric)
DISTRIBUTED REPLICATED;
CREATE TABLE tbl1 (id bigserial, iscalctrg varchar(15) NOT NULL,
iscalcdetail varchar(15))
DISTRIBUTED REPLICATED;
-- start_ignore
INSERT INTO tbl2 SELECT i, 'A'||(i%5), 101991
FROM generate_series(1, 50000) i;
INSERT INTO tbl1 (iscalctrg, iscalcdetail)
SELECT 'A'||(i%5), 'A'||(i%7) FROM generate_series(1, 50000) i;
ANALYZE tbl1;
ANALYZE tbl2;
-- end_ignore
-- Case 1: walker triggers fallback. With scalar subqueries on the CTE
-- ORCA produces a plan whose CTE Producer is replicated and Consumers
-- live on a different slice -- the walker raises ExmiExpr2DXLUnsupported
-- and trace_fallback DETAIL says "CTE Consumer placed on a different
-- slice than its replicated Producer".
EXPLAIN (COSTS OFF)
WITH t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991)
SELECT p.iscalctrg,
(SELECT refrcode FROM t2 WHERE refrcode = p.iscalctrg LIMIT 1) AS r,
(SELECT refrcode FROM t2 WHERE refrcode = p.iscalcdetail LIMIT 1) AS r1
FROM tbl1 p
LIMIT 1;
QUERY PLAN
----------------------------------------------------------------------------------------
Gather Motion 1:1 (slice1; segments: 1)
-> Limit
-> Seq Scan on tbl1 p
SubPlan 1
-> Limit
-> Result
Filter: ((tbl2.refrcode)::text = (p.iscalctrg)::text)
-> Materialize
-> Seq Scan on tbl2
Filter: (referenceid = '101991'::numeric)
SubPlan 2
-> Limit
-> Result
Filter: ((tbl2_1.refrcode)::text = (p.iscalcdetail)::text)
-> Materialize
-> Seq Scan on tbl2 tbl2_1
Filter: (referenceid = '101991'::numeric)
Optimizer: Postgres query optimizer
(18 rows)

-- Case 2: walker correctly stays silent. The same CTE referenced from a
-- JOIN: ORCA pins the Producer body to a single segment with a One-Time
-- Filter (gp_execution_segment() = N), so the Producer's child
-- distribution is EdtSingleton, not replicated -- the walker skips it.
EXPLAIN (COSTS OFF)
WITH t1 AS (SELECT * FROM tbl1),
t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991)
SELECT p.* FROM t1 p
JOIN t2 r ON p.iscalctrg = r.refrcode
JOIN t2 r1 ON p.iscalcdetail = r1.refrcode
LIMIT 1;
QUERY PLAN
---------------------------------------------------------------------------------
Gather Motion 1:1 (slice1; segments: 1)
-> Limit
-> Hash Join
Hash Cond: ((tbl1.iscalcdetail)::text = (r1.refrcode)::text)
-> Hash Join
Hash Cond: ((tbl2.refrcode)::text = (tbl1.iscalctrg)::text)
-> Seq Scan on tbl2
Filter: (referenceid = '101991'::numeric)
-> Hash
-> Seq Scan on tbl1
-> Hash
-> Subquery Scan on r1
-> Seq Scan on tbl2 tbl2_1
Filter: (referenceid = '101991'::numeric)
Optimizer: Postgres query optimizer
(15 rows)

DROP TABLE tbl1, tbl2;
-- start_ignore
-- FIXME: gpcheckcat fails due to mismatching distribution policy if this table isn't dropped
-- Keep this table around once this is fixed
Expand Down
Loading
Loading