Skip to content

Commit c1a21a7

Browse files
authored
feat(subq/some): some/any/exists for stream subq (#34860)
1 parent 09b23b2 commit c1a21a7

7 files changed

Lines changed: 998 additions & 852 deletions

File tree

source/libs/executor/src/executil.c

Lines changed: 96 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4456,21 +4456,84 @@ int32_t setRowHasNullFromResBlock(STaskSubJobCtx* ctx, bool* hasNull, SSDataBloc
44564456
return code;
44574457
}
44584458

4459-
void handleRemoteRowRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetrieveTableRsp* pRsp) {
4459+
void handleRemoteRowRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetrieveTableRsp* pRsp, bool* fetchDone) {
44604460
SSDataBlock* pResBlock = NULL;
4461+
SExecTaskInfo* pTaskInfo = ctx->pTaskInfo;
44614462

44624463
qDebug("%s scl fetch row rsp received, subQIdx:%d, rows:%" PRId64 , ctx->idStr, pParam->subQIdx, pRsp->numOfRows);
44634464

4465+
SRemoteRowNode* pRemote = (SRemoteRowNode*)pParam->pRes;
4466+
4467+
if (IS_STREAM_MODE(pTaskInfo)) {
4468+
SNode** ppRes = taosArrayGet(ctx->subResNodes, pParam->subQIdx);
4469+
if (NULL == *ppRes && 0 == pRsp->numOfRows) {
4470+
pRemote->valSet = true;
4471+
pRemote->hasValue = false;
4472+
pRemote->hasNull = false;
4473+
pRemote->val.isNull = true;
4474+
pRemote->val.translate = true;
4475+
pRemote->val.flag &= (~VALUE_FLAG_VAL_UNSET);
4476+
taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
4477+
} else if (pRsp->numOfRows > 0 || pRsp->numOfBlocks > 0) {
4478+
if (2 != pRsp->numOfCols) {
4479+
qError("%s invalid scl fetch row rsp received, subQIdx:%d, cols:%" PRId64, ctx->idStr, pParam->subQIdx, pRsp->numOfCols);
4480+
ctx->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
4481+
*fetchDone = true;
4482+
return;
4483+
}
4484+
4485+
ctx->code = createExprSubQResBlock(&pResBlock, &pRemote->val.node.resType);
4486+
if (TSDB_CODE_SUCCESS == ctx->code) {
4487+
SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BOOL, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes, 0);
4488+
ctx->code = blockDataAppendColInfo(pResBlock, &idata);
4489+
}
4490+
if (TSDB_CODE_SUCCESS == ctx->code) {
4491+
ctx->code = blockDataEnsureCapacity(pResBlock, 1);
4492+
}
4493+
if (TSDB_CODE_SUCCESS == ctx->code) {
4494+
ctx->code = extractSingleRspBlock(pRsp, pResBlock);
4495+
}
4496+
if (TSDB_CODE_SUCCESS == ctx->code) {
4497+
ctx->code = setValueFromResBlock(ctx, &pRemote->val, pResBlock);
4498+
}
4499+
if (TSDB_CODE_SUCCESS == ctx->code) {
4500+
ctx->code = setRowHasNullFromResBlock(ctx, &pRemote->hasNull, pResBlock);
4501+
}
4502+
if (TSDB_CODE_SUCCESS == ctx->code) {
4503+
pRemote->valSet = true;
4504+
pRemote->hasValue = true;
4505+
pRemote->val.node.type = QUERY_NODE_REMOTE_ROW;
4506+
taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
4507+
}
4508+
4509+
blockDataDestroy(pResBlock);
4510+
} else if (NULL != *ppRes && 0 == pRsp->numOfRows) {
4511+
pRemote->val.node.type = QUERY_NODE_VALUE;
4512+
pRsp->completed = true;
4513+
}
4514+
4515+
*fetchDone = (TSDB_CODE_SUCCESS != ctx->code || pRsp->completed) ? true : false;
4516+
4517+
if (!(*fetchDone)) {
4518+
ctx->code = sendFetchRemoteNodeReq(ctx, pParam->subQIdx, pParam->pRes, false);
4519+
if (TSDB_CODE_SUCCESS != ctx->code) {
4520+
*fetchDone = true;
4521+
}
4522+
}
4523+
4524+
return;
4525+
}
4526+
4527+
*fetchDone = true;
4528+
44644529
if (pRsp->numOfRows > 1 || pRsp->numOfBlocks > 1 || !pRsp->completed) {
4465-
qError("%s invalid scl fetch row rsp received, subQIdx:%d, rows:%" PRId64 ", blocks:%d, completed:%d",
4530+
qError("%s invalid scl fetch row rsp received, subQIdx:%d, rows:%" PRId64 ", blocks:%d, completed:%d",
44664531
ctx->idStr, pParam->subQIdx, pRsp->numOfRows, pRsp->numOfBlocks, pRsp->completed);
44674532
ctx->code = TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
44684533

44694534
return;
44704535
}
44714536

4472-
SRemoteRowNode* pRemote = (SRemoteRowNode*)pParam->pRes;
4473-
44744537
if (0 == pRsp->numOfRows) {
44754538
pRemote->valSet = true;
44764539
pRemote->hasValue = false;
@@ -4489,7 +4552,7 @@ void handleRemoteRowRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetriev
44894552

44904553
return;
44914554
}
4492-
4555+
44934556
ctx->code = createExprSubQResBlock(&pResBlock, &pRemote->val.node.resType);
44944557
if (TSDB_CODE_SUCCESS == ctx->code) {
44954558
SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BOOL, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes, 0);
@@ -4514,8 +4577,8 @@ void handleRemoteRowRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetriev
45144577
pRemote->valSet = true;
45154578
pRemote->hasValue = true;
45164579
}
4517-
4518-
blockDataDestroy(pResBlock);
4580+
4581+
blockDataDestroy(pResBlock);
45194582
}
45204583

45214584

@@ -4529,13 +4592,14 @@ int32_t setZeroRowsResValue(STaskSubJobCtx* ctx, SValueNode* pRes, int32_t rows)
45294592

45304593
void handleRemoteZeroRowsRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetrieveTableRsp* pRsp, bool* fetchDone) {
45314594
SRemoteZeroRowsNode* pRemote = (SRemoteZeroRowsNode*)pParam->pRes;
4595+
SExecTaskInfo* pTaskInfo = ctx->pTaskInfo;
45324596

45334597
qDebug("%s scl fetch zeroRows rsp received, subQIdx:%d, rows:%" PRId64 , ctx->idStr, pParam->subQIdx, pRsp->numOfRows);
45344598

45354599
int32_t resRows = (pRsp->numOfRows > 0) ? 1 : 0;
45364600
if (resRows > 0 || pRsp->completed) {
45374601
ctx->code = setZeroRowsResValue(ctx, &pRemote->val, resRows);
4538-
if (TSDB_CODE_SUCCESS == ctx->code) {
4602+
if (TSDB_CODE_SUCCESS == ctx->code) {
45394603
taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
45404604
}
45414605

@@ -4544,6 +4608,16 @@ void handleRemoteZeroRowsRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRe
45444608
*fetchDone = false;
45454609
}
45464610

4611+
if (IS_STREAM_MODE(pTaskInfo) && 0 == pRsp->numOfRows) {
4612+
ctx->code = setZeroRowsResValue(ctx, &pRemote->val, 0);
4613+
if (TSDB_CODE_SUCCESS == ctx->code) {
4614+
taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
4615+
}
4616+
4617+
pRsp->completed = true;
4618+
*fetchDone = true;
4619+
}
4620+
45474621
if (!(*fetchDone)) {
45484622
int32_t code = sendFetchRemoteNodeReq(ctx, pParam->subQIdx, pParam->pRes, false);
45494623
if (TSDB_CODE_SUCCESS != code) {
@@ -4618,9 +4692,15 @@ int32_t remoteFetchCallBack(void* param, SDataBuf* pMsg, int32_t code) {
46184692
}
46194693
break;
46204694
}
4621-
case QUERY_NODE_REMOTE_ROW:
4622-
handleRemoteRowRes(pParam, ctx, pRsp);
4695+
case QUERY_NODE_REMOTE_ROW: {
4696+
bool fetchDone = false;
4697+
handleRemoteRowRes(pParam, ctx, pRsp, &fetchDone);
4698+
qDebug("%s subQIdx %d handle remote row finished, fetchDone:%d", idStr, pParam->subQIdx, fetchDone);
4699+
if (!fetchDone) {
4700+
goto _exit;
4701+
}
46234702
break;
4703+
}
46244704
case QUERY_NODE_REMOTE_ZERO_ROWS: {
46254705
bool fetchDone = false;
46264706
handleRemoteZeroRowsRes(pParam, ctx, pRsp, &fetchDone);
@@ -4817,6 +4897,12 @@ int32_t remoteNodeCopy(SNode* pSrc, SNode* pDst) {
48174897
pRemote->hasNull = ((SRemoteRowNode*)pSrc)->hasNull;
48184898
break;
48194899
}
4900+
case QUERY_NODE_REMOTE_ZERO_ROWS: {
4901+
SRemoteZeroRowsNode* pRemote = (SRemoteZeroRowsNode*)pDst;
4902+
TAOS_CHECK_EXIT(valueNodeCopy((SValueNode*)pSrc, &pRemote->val));
4903+
pRemote->val.node.type = QUERY_NODE_VALUE;
4904+
break;
4905+
}
48204906
default:
48214907
break;
48224908
}

test/cases/18-StreamProcessing/02-Stream/stream_subq_in-mtables.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,27 @@ def prepareData(self):
8080
self.tb_count = 50
8181
self.ts_step = 1000
8282
self.ts_total = 5000000
83+
8384
for t in range(0, self.tb_count):
8485
sqls.append(f"create table db.d{t} using db.meters tags({t})")
85-
for i in range(0, self.ts_total, self.ts_step):
86-
sqls.append(f"insert into db.d{t} values({ts+i},{i})")
8786

8887
tdSql.executes(sqls)
8988
tdLog.info(f"create tables successfully.")
9089

90+
# Batch insert: 500 rows per INSERT statement
91+
batch_size = 500
92+
row_count = self.ts_total // self.ts_step # 5000
93+
for t in range(0, self.tb_count):
94+
for batch_start in range(0, row_count, batch_size):
95+
batch_end = min(batch_start + batch_size, row_count)
96+
values = " ".join(
97+
f"({ts + i * self.ts_step},{i * self.ts_step})"
98+
for i in range(batch_start, batch_end)
99+
)
100+
tdSql.execute(f"insert into db.d{t} values {values}")
101+
102+
tdLog.info(f"insert data successfully.")
103+
91104
def createStream(self):
92105
tdLog.info(f"create stb stream.")
93106
#sql = (f"create stream db.stb_stream count_window(2, 1) from db.meters partition by tbname,groupid stream_options(fill_history('2026-01-01 00:00:00')|low_latency_calc) into db.stream_meters output_subtable (concat('sm#', tbname)) tags (groupid int as groupid) as select _twstart as ts, first(current) as ff1, last(current) as lf1 from %%tbname where ts>= _twstart and ts<= _twend and current > (select first(current)-1 from db.meters);")

test/cases/18-StreamProcessing/02-Stream/stream_subquery-mtables.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,27 @@ def prepareData(self):
7272
self.tb_count = 50
7373
self.ts_step = 1000
7474
self.ts_total = 5000000
75+
7576
for t in range(0, self.tb_count):
7677
sqls.append(f"create table db.d{t} using db.meters tags({t})")
77-
for i in range(0, self.ts_total, self.ts_step):
78-
sqls.append(f"insert into db.d{t} values({ts+i},{i})")
7978

8079
tdSql.executes(sqls)
8180
tdLog.info(f"create tables successfully.")
8281

82+
# Batch insert: 500 rows per INSERT statement
83+
batch_size = 500
84+
row_count = self.ts_total // self.ts_step # 5000
85+
for t in range(0, self.tb_count):
86+
for batch_start in range(0, row_count, batch_size):
87+
batch_end = min(batch_start + batch_size, row_count)
88+
values = " ".join(
89+
f"({ts + i * self.ts_step},{i * self.ts_step})"
90+
for i in range(batch_start, batch_end)
91+
)
92+
tdSql.execute(f"insert into db.d{t} values {values}")
93+
94+
tdLog.info(f"insert data successfully.")
95+
8396
def createStream(self):
8497
tdLog.info(f"create stb stream.")
8598
sql = (

0 commit comments

Comments
 (0)