Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/build-cloudberry.yml
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ jobs:
},
{"test":"ic-cbdb-parallel",
"make_configs":["src/test/regress:installcheck-cbdb-parallel"]
},
{"test":"ic-orca-parallel",
"make_configs":["src/test/regress:installcheck-orca-parallel"]
}
]
}'
Expand Down
2 changes: 2 additions & 0 deletions src/backend/gpopt/config/CConfigParamMapping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ CConfigParamMapping::PackConfigParamInBitset(
// disable table scan if the corresponding GUC is turned off
traceflag_bitset->ExchangeSet(
GPOPT_DISABLE_XFORM_TF(CXform::ExfGet2TableScan));
traceflag_bitset->ExchangeSet(
GPOPT_DISABLE_XFORM_TF(CXform::ExfGet2ParallelTableScan));
}

if (!optimizer_enable_push_join_below_union_all)
Expand Down
52 changes: 52 additions & 0 deletions src/backend/gpopt/gpdbwrappers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <limits> // std::numeric_limits

#include "gpos/base.h"
#include "gpopt/base/COptCtxt.h"
#include "gpopt/optimizer/COptimizerConfig.h"
#include "gpos/error/CAutoExceptionStack.h"
#include "gpos/error/CException.h"

Expand All @@ -36,8 +38,10 @@ extern "C" {
#include "access/amapi.h"
#include "access/external.h"
#include "access/genam.h"
#include "access/parallel.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_inherits.h"
#include "cdb/cdbvars.h"
#include "foreign/fdwapi.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/clauses.h"
Expand All @@ -52,6 +56,9 @@ extern "C" {
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/partcache.h"

extern bool enable_parallel;
extern int max_parallel_workers_per_gather;
}
#define GP_WRAP_START \
sigjmp_buf local_sigjmp_buf; \
Expand Down Expand Up @@ -2548,6 +2555,19 @@ gpdb::GetForeignServerId(Oid reloid)
return 0;
}

int16
gpdb::GetAppendOnlySegmentFilesCount(Relation rel)
{
GP_WRAP_START;
{
FormData_pg_appendonly aoFormData;
GetAppendOnlyEntry(rel, &aoFormData);
return aoFormData.segfilecount;
}
GP_WRAP_END;
return -1;
}

// Locks on partition leafs and indexes are held during optimizer (after
// parse-analyze stage). ORCA need this function to lock relation. Here
// we do not need to consider lock-upgrade issue, reasons are:
Expand Down Expand Up @@ -2706,4 +2726,36 @@ gpdb::TestexprIsHashable(Node *testexpr, List *param_ids)
return false;
}

// check if parallel mode is OK (comprehensive check)
bool
gpdb::IsParallelModeOK(void)
{
GP_WRAP_START;
{
if (!enable_parallel)
return false;

if (IS_SINGLENODE())
return false;

if (max_parallel_workers_per_gather <= 0)
return false;

// Check if parallel plans are enabled in current optimizer context
gpopt::COptCtxt *poctxt = gpopt::COptCtxt::PoctxtFromTLS();
if (nullptr != poctxt)
{
gpopt::COptimizerConfig *optimizer_config = poctxt->GetOptimizerConfig();
if (nullptr != optimizer_config)
{
if (!optimizer_config->CreateParallelPlan())
return false;
}
}
return true;
}
GP_WRAP_END;
return false; // default to disabled if no context
}

// EOF
222 changes: 217 additions & 5 deletions src/backend/gpopt/translate/CTranslatorDXLToPlStmt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ extern "C" {
#include "partitioning/partdesc.h"
#include "storage/lmgr.h"
#include "utils/guc.h"
#include "optimizer/cost.h"
#include "utils/lsyscache.h"
#include "utils/partcache.h"
#include "utils/rel.h"
Expand Down Expand Up @@ -83,6 +84,7 @@ extern "C" {
#include "naucrates/dxl/operators/CDXLPhysicalSplit.h"
#include "naucrates/dxl/operators/CDXLPhysicalTVF.h"
#include "naucrates/dxl/operators/CDXLPhysicalTableScan.h"
#include "naucrates/dxl/operators/CDXLPhysicalParallelTableScan.h"
#include "naucrates/dxl/operators/CDXLPhysicalValuesScan.h"
#include "naucrates/dxl/operators/CDXLPhysicalWindow.h"
#include "naucrates/dxl/operators/CDXLScalarBitmapBoolOp.h"
Expand Down Expand Up @@ -348,6 +350,12 @@ CTranslatorDXLToPlStmt::TranslateDXLOperatorToPlan(
ctxt_translation_prev_siblings);
break;
}
case EdxlopPhysicalParallelTableScan:
{
plan = TranslateDXLParallelTblScan(dxlnode, output_context,
ctxt_translation_prev_siblings);
break;
}
case EdxlopPhysicalIndexScan:
{
plan = TranslateDXLIndexScan(dxlnode, output_context,
Expand Down Expand Up @@ -712,14 +720,118 @@ CTranslatorDXLToPlStmt::TranslateDXLTblScan(
}


//---------------------------------------------------------------------------
// @function:
// CTranslatorDXLToPlStmt::TranslateDXLParallelTblScan
//
// @doc:
// Translates a DXL parallel table scan node into a parallel SeqScan node
Plan *
CTranslatorDXLToPlStmt::TranslateDXLParallelTblScan(
const CDXLNode *tbl_scan_dxlnode, CDXLTranslateContext *output_context,
CDXLTranslationContextArray * /*ctxt_translation_prev_siblings*/)
{
// translate table descriptor into a range table entry
CDXLPhysicalParallelTableScan *phy_parallel_tbl_scan_dxlop =
CDXLPhysicalParallelTableScan::Cast(tbl_scan_dxlnode->GetOperator());

ULONG parallel_workers = phy_parallel_tbl_scan_dxlop->UlParallelWorkers();

// translation context for column mappings in the base relation
CDXLTranslateContextBaseTable base_table_context(m_mp);

const CDXLTableDescr *dxl_table_descr =
phy_parallel_tbl_scan_dxlop->GetDXLTableDescr();
const IMDRelation *md_rel =
m_md_accessor->RetrieveRel(dxl_table_descr->MDId());

// Lock any table we are to scan, since it may not have been properly locked
// by the parser (e.g in case of generated scans for partitioned tables)
OID oidRel = CMDIdGPDB::CastMdid(md_rel->MDId())->Oid();
GPOS_ASSERT(dxl_table_descr->LockMode() != -1);
gpdb::GPDBLockRelationOid(oidRel, dxl_table_descr->LockMode());

Index index = ProcessDXLTblDescr(dxl_table_descr, &base_table_context);

// a table scan node must have 2 children: projection list and filter
GPOS_ASSERT(2 == tbl_scan_dxlnode->Arity());

// translate proj list and filter
CDXLNode *project_list_dxlnode = (*tbl_scan_dxlnode)[EdxltsIndexProjList];
CDXLNode *filter_dxlnode = (*tbl_scan_dxlnode)[EdxltsIndexFilter];

List *targetlist = NIL;

// List to hold the quals after translating filter_dxlnode node.
List *query_quals = NIL;

TranslateProjListAndFilter(
project_list_dxlnode, filter_dxlnode,
&base_table_context, // translate context for the base table
nullptr, // translate_ctxt_left and pdxltrctxRight,
&targetlist, &query_quals, output_context);

Plan *plan = nullptr;
Plan *plan_return = nullptr;

// Parallel table scans are always sequential scans (not foreign scans)
SeqScan *seq_scan = MakeNode(SeqScan);
seq_scan->scanrelid = index;
plan = &(seq_scan->plan);
plan_return = (Plan *) seq_scan;

// Set parallel execution flags
plan->parallel_aware = true;
plan->parallel_safe = true;
plan->parallel = (int) parallel_workers;

plan->targetlist = targetlist;

// List to hold the quals which contain both security quals and query
// quals.
List *security_query_quals = NIL;

// Fetching the RTE of the relation from the rewritten parse tree
// based on the oidRel and adding the security quals of the RTE in
// the security_query_quals list.
AddSecurityQuals(oidRel, &security_query_quals, &index);

// The security quals should always be executed first when
// compared to other quals. So appending query quals to the
// security_query_quals list after the security quals.
security_query_quals =
gpdb::ListConcat(security_query_quals, query_quals);
plan->qual = security_query_quals;

if (md_rel->IsNonBlockTable())
{
CheckSafeTargetListForAOTables(plan->targetlist);
}

plan->plan_node_id = m_dxl_to_plstmt_context->GetNextPlanId();

// translate operator costs
TranslatePlanCosts(tbl_scan_dxlnode, plan);

// Adjust row count to per-worker statistics
if (parallel_workers > 1)
{
plan->plan_rows = ceil(plan->plan_rows / parallel_workers);
}

SetParamIds(plan);

return plan_return;
}


//---------------------------------------------------------------------------
// @function:
// CTranslatorDXLToPlStmt::SetIndexVarAttnoWalker
//
// @doc:
// Walker to set index var attno's,
// attnos of index vars are set to their relative positions in index keys,
// skip any outer references while walking the expression tree
//
//---------------------------------------------------------------------------
BOOL
Expand Down Expand Up @@ -2415,15 +2527,34 @@ CTranslatorDXLToPlStmt::TranslateDXLMotion(
sendslice->directDispatch.contentIds = NIL;
sendslice->directDispatch.haveProcessedAnyCalculations = false;

// set parallel workers if needed
ULONG child_index = motion_dxlop->GetRelationChildIdx();
CDXLNode *child_dxlnode = (*motion_dxlnode)[child_index];
ULONG child_parallel_workers = ExtractParallelWorkersFromDXL(child_dxlnode);
if (child_parallel_workers > 1)
{
// Determine parallel workers based on enable_parallel and gang type
bool supports_parallel = (sendslice->gangType == GANGTYPE_PRIMARY_READER ||
sendslice->gangType == GANGTYPE_PRIMARY_WRITER);

if (supports_parallel)
{
sendslice->parallel_workers = child_parallel_workers;
}
else
{
// Disable parallel for: non-PRIMARY gang types
// (SINGLETON_READER, ENTRYDB_READER, UNALLOCATED)
sendslice->parallel_workers = 0;
}
}

motion->motionID = sendslice->sliceIndex;

// translate motion child
// child node is in the same position in broadcast and gather motion nodes
// but different in redistribute motion nodes

ULONG child_index = motion_dxlop->GetRelationChildIdx();

CDXLNode *child_dxlnode = (*motion_dxlnode)[child_index];
// Note: child_index and child_dxlnode already defined above

CDXLTranslateContext child_context(m_mp, false,
output_context->GetColIdToParamIdMap());
Expand Down Expand Up @@ -2576,6 +2707,16 @@ CTranslatorDXLToPlStmt::TranslateDXLMotion(
return nullptr;
}

// Adjust row count for parallel execution in the sending slice
// The Motion node receives rows from all parallel workers, so we need to
// account for the fact that each worker processes a fraction of the rows.
// TranslatePlanCosts() already divided by numsegments, but if we have
// parallel workers, each segment is further subdivided among workers.
if (sendslice->parallel_workers > 1)
{
plan->plan_rows = ceil(plan->plan_rows / sendslice->parallel_workers);
}

SetParamIds(plan);

return (Plan *) motion;
Expand Down Expand Up @@ -7282,4 +7423,75 @@ CTranslatorDXLToPlStmt::IsIndexForOrderBy(
}
return false;
}

//---------------------------------------------------------------------------
// @function:
// CTranslatorDXLToPlStmt::ExtractParallelWorkersFromDXL
//
// @doc:
// Extract parallel workers count from DXL node tree recursively.
// Since parallel degree is uniform across all parallel scans in a query,
// returns the first parallel degree found from any CDXLPhysicalParallelTableScan,
// or 1 if no parallel scan exists.
//
//---------------------------------------------------------------------------
ULONG
CTranslatorDXLToPlStmt::ExtractParallelWorkersFromDXL(const CDXLNode *dxlnode)
{
if (nullptr == dxlnode)
{
return 1;
}

CDXLOperator *dxlop = dxlnode->GetOperator();
if (EdxlopPhysicalParallelTableScan == dxlop->GetDXLOperator())
{
// Return parallel workers from the parallel table scan operator
// All parallel scans in the query share the same parallel degree
CDXLPhysicalParallelTableScan *parallel_scan_dxlop =
CDXLPhysicalParallelTableScan::Cast(dxlop);
return parallel_scan_dxlop->UlParallelWorkers();
}
else if (EdxlopPhysicalTableScan == dxlop->GetDXLOperator() ||
EdxlopPhysicalDynamicTableScan == dxlop->GetDXLOperator() ||
EdxlopPhysicalIndexScan == dxlop->GetDXLOperator() ||
EdxlopPhysicalIndexOnlyScan == dxlop->GetDXLOperator() ||
EdxlopPhysicalBitmapTableScan == dxlop->GetDXLOperator() ||
EdxlopPhysicalDynamicBitmapTableScan == dxlop->GetDXLOperator() ||
EdxlopPhysicalForeignScan == dxlop->GetDXLOperator() ||
EdxlopPhysicalDynamicForeignScan == dxlop->GetDXLOperator() ||
EdxlopPhysicalDynamicIndexScan == dxlop->GetDXLOperator() ||
EdxlopPhysicalDynamicIndexOnlyScan == dxlop->GetDXLOperator() ||
EdxlopPhysicalValuesScan == dxlop->GetDXLOperator())
{
// Non-parallel scans (table, index, bitmap, foreign, values)
// These are leaf nodes in terms of parallel worker extraction
// Return 1 to indicate no parallel workers
return 1;
}
else if (EdxlopPhysicalMotionGather == dxlop->GetDXLOperator() ||
EdxlopPhysicalMotionBroadcast == dxlop->GetDXLOperator() ||
EdxlopPhysicalMotionRedistribute == dxlop->GetDXLOperator() ||
EdxlopPhysicalMotionRandom == dxlop->GetDXLOperator() ||
EdxlopPhysicalMotionRoutedDistribute == dxlop->GetDXLOperator())
{
// Motion node creates a slice boundary - do not recurse into child
// The child's parallel workers belong to the sending slice, not receiving slice
// Return 0 to indicate the receiving slice (current slice) has no parallel workers
return 1;
}

// Recursively check child nodes, return early when first parallel scan is found
for (ULONG ul = 0; ul < dxlnode->Arity(); ul++)
{
ULONG child_parallel_workers = ExtractParallelWorkersFromDXL((*dxlnode)[ul]);
if (child_parallel_workers > 1)
{
return child_parallel_workers;
}
}

return 1;
}

// EOF
Loading
Loading