Skip to content

Commit 9337513

Browse files
authored
feat(stream): support getting meta changes for vtable in stream reader (#34895)
1 parent c066ee6 commit 9337513

12 files changed

Lines changed: 654 additions & 183 deletions

File tree

include/common/streamMsg.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,12 +808,14 @@ typedef struct SSTriggerVirTableInfoRequest {
808808
SArray* cids; // SArray<col_id_t>, col ids of the virtual table
809809
SArray* uids;
810810
bool fetchAllTable; // if true, ignore uids and fetch all virtual tables' info
811+
int64_t ver; // -1 for first, rsp.ver in walMeta info if vtable changes
811812
} SSTriggerVirTableInfoRequest;
812813

813814
typedef struct SSTriggerVirTablePseudoColRequest {
814815
SSTriggerPullRequest base;
815816
int64_t uid;
816817
SArray* cids; // SArray<col_id_t>, -1 means tbname
818+
int64_t ver; // -1 for first, rsp.ver in walMeta info if vtable changes
817819
} SSTriggerVirTablePseudoColRequest;
818820
typedef struct OTableInfoRsp {
819821
int64_t suid;
@@ -829,6 +831,7 @@ typedef struct OTableInfo {
829831
typedef struct SSTriggerOrigTableInfoRequest {
830832
SSTriggerPullRequest base;
831833
SArray* cols; // SArray<OTableInfo>
834+
int64_t ver; // -1 for first, rsp.ver in walMeta info if original table changes
832835
} SSTriggerOrigTableInfoRequest;
833836

834837
typedef struct SSTriggerOrigTableInfoRsp {

include/libs/executor/storageapi.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ typedef struct SStoreMetaReader {
337337
int32_t (*getTableEntryByUid)(SMetaReader* pReader, tb_uid_t uid);
338338
int (*getTableEntryByVersionUid)(SMetaReader *pReader, int64_t version, tb_uid_t uid);
339339
int32_t (*getTableEntryByName)(SMetaReader* pReader, const char* name);
340+
int32_t (*getTableEntryByVersionName)(SMetaReader* pReader, int64_t version, const char* name);
340341
int32_t (*getEntryGetUidCache)(SMetaReader* pReader, tb_uid_t uid);
341342
} SStoreMetaReader;
342343

include/libs/new-stream/streamReader.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ int32_t createStreamTaskForTs(SStreamOptions* options, SStreamReaderTaskInner**
149149
int32_t initStreamTableListInfo(StreamTableListInfo* pTableListInfo);
150150
int32_t qStreamGetTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, uint64_t gid, STableKeyInfo** pKeyInfo, int32_t* size);
151151
void qStreamDestroyTableInfo(StreamTableListInfo* pTableListInfo);
152+
void qStreamClearTableInfo(StreamTableListInfo* pTableListInfo);
152153
int32_t qStreamCopyTableInfo(SStreamTriggerReaderInfo* sStreamReaderInfo, StreamTableListInfo* dst);
153154
int32_t qStreamSetTableList(StreamTableListInfo* pTableListInfo, int64_t uid, uint64_t gid);
154155
int32_t qStreamGetTableListGroupNum(SStreamTriggerReaderInfo* sStreamReaderInfo);

source/common/src/msg/streamMsg.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3291,12 +3291,14 @@ int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTrigger
32913291
TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
32923292
TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->uids));
32933293
TAOS_CHECK_EXIT(tEncodeBool(&encoder, pRequest->fetchAllTable));
3294+
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
32943295
break;
32953296
}
32963297
case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
32973298
SSTriggerVirTablePseudoColRequest* pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
32983299
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
32993300
TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
3301+
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
33003302
break;
33013303
}
33023304
case STRIGGER_PULL_OTABLE_INFO: {
@@ -3313,6 +3315,7 @@ int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTrigger
33133315
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refTableName));
33143316
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refColName));
33153317
}
3318+
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
33163319
break;
33173320
}
33183321
default: {
@@ -3517,12 +3520,14 @@ int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPull
35173520
TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
35183521
TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->uids, sizeof(int64_t)));
35193522
TAOS_CHECK_EXIT(tDecodeBool(&decoder, &pRequest->fetchAllTable));
3523+
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
35203524
break;
35213525
}
35223526
case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
35233527
SSTriggerVirTablePseudoColRequest* pRequest = &(pReq->virTablePseudoColReq);
35243528
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
35253529
TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
3530+
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
35263531
break;
35273532
}
35283533
case STRIGGER_PULL_OTABLE_INFO: {
@@ -3545,6 +3550,8 @@ int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPull
35453550
TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refTableName));
35463551
TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refColName));
35473552
}
3553+
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
3554+
35483555
break;
35493556
}
35503557
default: {

source/dnode/vnode/src/inc/vnodeInt.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int l
189189
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);
190190
SRSchema* metaGetTbTSchemaR(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
191191
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
192+
int metaGetTableEntryByVersionName(SMetaReader *pReader, int64_t version, const char *name);
192193
int metaAlterCache(SMeta* pMeta, int32_t nPage);
193194
int metaCreateRsma(SMeta* pMeta, int64_t version, SVCreateRsmaReq* pReq);
194195
int metaDropRsma(SMeta* pMeta, int64_t version, SVDropRsmaReq* pReq);

source/dnode/vnode/src/meta/metaQuery.c

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ static int32_t getUidVersion(SMetaReader *pReader, int64_t *version, tb_uid_t ui
137137
}
138138
}
139139
code = TSDB_CODE_NOT_FOUND;
140-
metaError("%s uid:%" PRId64 " version not found", __func__, uid);
140+
metaError("%s uid:%" PRId64 " version:%" PRId64 " not found", __func__, uid, *version);
141141
END:
142142
tdbFree(pKey);
143143
tdbFree(pVal);
@@ -179,7 +179,7 @@ int metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid) {
179179
return metaGetTableEntryByVersion(pReader, info.version, uid);
180180
}
181181

182-
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
182+
int metaGetTableEntryByVersionName(SMetaReader *pReader, int64_t version, const char *name) {
183183
SMeta *pMeta = pReader->pMeta;
184184
tb_uid_t uid;
185185

@@ -189,7 +189,11 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
189189
}
190190

191191
uid = *(tb_uid_t *)pReader->pBuf;
192-
return metaReaderGetTableEntryByUid(pReader, uid);
192+
return metaReaderGetTableEntryByVersionUid(pReader, version, uid);
193+
}
194+
195+
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
196+
return metaGetTableEntryByVersionName(pReader, -1, name);
193197
}
194198

195199
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {

source/dnode/vnode/src/vnd/vnodeApi.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ void initMetaReaderAPI(SStoreMetaReader* pMetaReader) {
160160

161161
pMetaReader->getEntryGetUidCache = metaReaderGetTableEntryByUidCache;
162162
pMetaReader->getTableEntryByName = metaGetTableEntryByName;
163+
pMetaReader->getTableEntryByVersionName = metaGetTableEntryByVersionName;
163164

164165
pMetaReader->readerReleaseLock = metaReaderReleaseLock;
165166
}

source/dnode/vnode/src/vnd/vnodeStream.c

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,9 @@ static int32_t scanCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, S
687687

688688
STREAM_CHECK_RET_GOTO(processTableList(sStreamReaderInfo, uidList, &tableList));
689689
STREAM_CHECK_RET_GOTO(qStreamGetAddTable(sStreamReaderInfo, tableList, uidListAdd));
690-
STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListAdd, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_ADD));
690+
if (sStreamReaderInfo->isVtableStream) {
691+
STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListAdd, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_ADD));
692+
}
691693

692694
STREAM_CHECK_RET_GOTO(qStreamModifyTableList(sStreamReaderInfo, tableList, uidList));
693695
end:
@@ -769,10 +771,6 @@ void getAlterColId(void* pVnode, int64_t uid, const char* colName, col_id_t* col
769771
return;
770772
}
771773

772-
static bool checkAlterCondition() {
773-
return true;
774-
}
775-
776774
// Handle TSDB_ALTER_TABLE_ALTER_COLUMN_REF and TSDB_ALTER_TABLE_REMOVE_COLUMN_REF
777775
static int32_t scanAlterTableColumnRef(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp,
778776
SVAlterTbReq* pReq, uint64_t uid, int64_t ver) {
@@ -916,6 +914,7 @@ static int32_t scanAlterTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SS
916914

917915
uint64_t uid = 0;
918916
if (req.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF || req.action == TSDB_ALTER_TABLE_REMOVE_COLUMN_REF) {
917+
STREAM_CHECK_CONDITION_GOTO(!sStreamReaderInfo->isVtableStream, TDB_CODE_SUCCESS);
919918
code = checkAlter(sStreamReaderInfo, req.tbName, req.action, &uid);
920919
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
921920
ST_TASK_WLOG("stream reader scan alter table %s not exist, metaGetTableUidByName", req.tbName);
@@ -1271,6 +1270,11 @@ static int32_t processWalVerMetaNew(SVnode* pVnode, SSTriggerWalNewRsp* rsp, SSt
12711270
ST_TASK_DLOG("vgId:%d stream reader scan wal ver:%" PRId64 "/%" PRId64 ", type:%s, deleteData:%d, deleteTb:%d",
12721271
TD_VID(pVnode), ver, walGetAppliedVer(pWalReader->pWal), TMSG_INFO(wCont->msgType), sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl);
12731272
if (wCont->msgType == TDMT_VND_SUBMIT) {
1273+
// return when getting data if there are meta data in vtable scan
1274+
if (sStreamReaderInfo->isVtableStream && rsp->tableBlock != NULL && ((SSDataBlock*)rsp->tableBlock)->info.rows > 0) {
1275+
rsp->ver--;
1276+
break;
1277+
}
12741278
data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
12751279
len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
12761280
STREAM_CHECK_RET_GOTO(scanSubmitDataForMeta(sStreamReaderInfo, rsp, data, len, ver));
@@ -2124,6 +2128,11 @@ static int32_t prepareIndexMetaData(SWalReader* pWalReader, SStreamTriggerReader
21242128
ST_TASK_DLOG("%s scan wal ver:%" PRId64 ", type:%s, deleteData:%d, deleteTb:%d, msg len:%d", __func__,
21252129
ver, TMSG_INFO(wCont->msgType), sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl, len);
21262130
if (wCont->msgType == TDMT_VND_SUBMIT) {
2131+
// return when getting data if there are meta data in vtable scan
2132+
if (sStreamReaderInfo->isVtableStream && resultRsp->tableBlock != NULL && ((SSDataBlock*)resultRsp->tableBlock)->info.rows > 0) {
2133+
resultRsp->ver--;
2134+
break;
2135+
}
21272136
data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
21282137
len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
21292138
STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, data, len, NULL, resultRsp, ver));
@@ -2812,15 +2821,16 @@ static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTr
28122821
size_t size = 0;
28132822
void* pTask = sStreamReaderInfo->pTask;
28142823

2815-
ST_TASK_DLOG("vgId:%d %s start, trigger hash size:%d, calc hash size:%d", TD_VID(pVnode), __func__,
2816-
tSimpleHashGetSize(req->setTableReq.uidInfoTrigger), tSimpleHashGetSize(req->setTableReq.uidInfoCalc));
2824+
ST_TASK_DLOG("vgId:%d %s start, trigger hash size:%d, calc hash size:%d, appver:%"PRId64, TD_VID(pVnode), __func__,
2825+
tSimpleHashGetSize(req->setTableReq.uidInfoTrigger), tSimpleHashGetSize(req->setTableReq.uidInfoCalc), pVnode->state.applied);
28172826

28182827
taosWLockLatch(&sStreamReaderInfo->lock);
28192828
TSWAP(sStreamReaderInfo->uidHashTrigger, req->setTableReq.uidInfoTrigger);
28202829
TSWAP(sStreamReaderInfo->uidHashCalc, req->setTableReq.uidInfoCalc);
28212830
STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashTrigger, TSDB_CODE_INVALID_PARA);
28222831
STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashCalc, TSDB_CODE_INVALID_PARA);
28232832

2833+
qStreamClearTableInfo(&sStreamReaderInfo->vSetTableList);
28242834
STREAM_CHECK_RET_GOTO(initStreamTableListInfo(&sStreamReaderInfo->vSetTableList));
28252835
STREAM_CHECK_RET_GOTO(qBuildVTableList(sStreamReaderInfo));
28262836
end:
@@ -3589,7 +3599,7 @@ static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg,
35893599
return code;
35903600
}
35913601

3592-
static int32_t setVtableInfo(SVnode* pVnode, SArray* infos, SArray* cids, int64_t uid, uint64_t gid, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo) {
3602+
static int32_t setVtableInfo(SVnode* pVnode, SArray* infos, SArray* cids, int64_t uid, uint64_t gid, int64_t ver, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo) {
35933603
int32_t code = 0;
35943604
int32_t lino = 0;
35953605
void* pTask = sStreamReaderInfo->pTask;
@@ -3601,7 +3611,7 @@ static int32_t setVtableInfo(SVnode* pVnode, SArray* infos, SArray* cids, int64_
36013611

36023612
ST_TASK_DLOG("vgId:%d %s put vtable uid:%"PRId64, TD_VID(pVnode), __func__, uid);
36033613

3604-
code = sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByVersionUid(metaReader, sStreamReaderInfo->tableList.version, uid);
3614+
code = sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByVersionUid(metaReader, ver, uid);
36053615
if (code != 0) {
36063616
ST_TASK_ELOG("vgId:%d %s get table entry by uid:%"PRId64" failed, msg:%s", TD_VID(pVnode), __func__, uid, tstrerror(code));
36073617
goto end;
@@ -3635,7 +3645,7 @@ static int32_t setVtableInfo(SVnode* pVnode, SArray* infos, SArray* cids, int64_
36353645
return code;
36363646
}
36373647

3638-
static int32_t getAllVinfo(SVnode* pVnode, SStreamMsgVTableInfo* vTableInfo, SArray* cids, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo){
3648+
static int32_t getAllVinfo(SVnode* pVnode, SStreamMsgVTableInfo* vTableInfo, SArray* cids, int64_t ver, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo){
36393649
int32_t code = 0;
36403650
int32_t lino = 0;
36413651
void* pTask = sStreamReaderInfo->pTask;
@@ -3653,7 +3663,7 @@ static int32_t getAllVinfo(SVnode* pVnode, SStreamMsgVTableInfo* vTableInfo, SAr
36533663
if (pKeyInfo == NULL || pKeyInfo->markedDeleted) {
36543664
continue;
36553665
}
3656-
code = setVtableInfo(pVnode, vTableInfo->infos, cids, pKeyInfo->uid, pKeyInfo->groupId, metaReader, sStreamReaderInfo);
3666+
code = setVtableInfo(pVnode, vTableInfo->infos, cids, pKeyInfo->uid, pKeyInfo->groupId, ver, metaReader, sStreamReaderInfo);
36573667
if (code != 0) {
36583668
ST_TASK_WLOG("vgId:%d %s set vtable info uid:%"PRId64" failed, msg:%s", TD_VID(pVnode), __func__, pKeyInfo->uid, tstrerror(code));
36593669
code = 0;
@@ -3666,7 +3676,7 @@ static int32_t getAllVinfo(SVnode* pVnode, SStreamMsgVTableInfo* vTableInfo, SAr
36663676
return code;
36673677
}
36683678

3669-
static int32_t getSpicificVinfo(SVnode* pVnode, SStreamMsgVTableInfo* vTableInfo, SArray* uids, SArray* cids, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo){
3679+
static int32_t getSpicificVinfo(SVnode* pVnode, SStreamMsgVTableInfo* vTableInfo, SArray* uids, SArray* cids, int64_t ver, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo){
36703680
int32_t code = 0;
36713681
int32_t lino = 0;
36723682
void* pTask = sStreamReaderInfo->pTask;
@@ -3685,7 +3695,7 @@ static int32_t getSpicificVinfo(SVnode* pVnode, SStreamMsgVTableInfo* vTableInfo
36853695
ST_TASK_WLOG("vgId:%d %s uid:%"PRId64" not found in stream group", TD_VID(pVnode), __func__, *uid);
36863696
continue;
36873697
}
3688-
code = setVtableInfo(pVnode, vTableInfo->infos, cids, *uid, groupId, metaReader, sStreamReaderInfo);
3698+
code = setVtableInfo(pVnode, vTableInfo->infos, cids, *uid, groupId, ver, metaReader, sStreamReaderInfo);
36893699
if (code != 0) {
36903700
ST_TASK_WLOG("vgId:%d %s set vtable info uid:%"PRId64" failed, msg:%s", TD_VID(pVnode), __func__, *uid, tstrerror(code));
36913701
code = 0;
@@ -3706,7 +3716,7 @@ static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SS
37063716
SMetaReader metaReader = {0};
37073717

37083718
void* pTask = sStreamReaderInfo->pTask;
3709-
ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
3719+
ST_TASK_DLOG("vgId:%d %s start, version:%"PRId64, TD_VID(pVnode), __func__, req->virTableInfoReq.ver);
37103720

37113721
SArray* cids = req->virTableInfoReq.cids;
37123722
STREAM_CHECK_NULL_GOTO(cids, terrno);
@@ -3717,9 +3727,9 @@ static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SS
37173727
sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
37183728

37193729
if (req->virTableInfoReq.fetchAllTable || req->virTableInfoReq.uids == NULL || taosArrayGetSize(req->virTableInfoReq.uids) == 0) {
3720-
STREAM_CHECK_RET_GOTO(getAllVinfo(pVnode, &vTableInfo, cids, &metaReader, sStreamReaderInfo));
3730+
STREAM_CHECK_RET_GOTO(getAllVinfo(pVnode, &vTableInfo, cids, req->virTableInfoReq.ver, &metaReader, sStreamReaderInfo));
37213731
} else {
3722-
STREAM_CHECK_RET_GOTO(getSpicificVinfo(pVnode, &vTableInfo, req->virTableInfoReq.uids, cids, &metaReader, sStreamReaderInfo));
3732+
STREAM_CHECK_RET_GOTO(getSpicificVinfo(pVnode, &vTableInfo, req->virTableInfoReq.uids, cids, req->virTableInfoReq.ver, &metaReader, sStreamReaderInfo));
37233733
}
37243734
ST_TASK_DLOG("vgId:%d %s end, size:%"PRIzu, TD_VID(pVnode), __func__, taosArrayGetSize(vTableInfo.infos));
37253735
STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
@@ -3743,7 +3753,7 @@ static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SS
37433753
SMetaReader metaReader = {0};
37443754
void* pTask = sStreamReaderInfo->pTask;
37453755

3746-
ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
3756+
ST_TASK_DLOG("vgId:%d %s start, ver:%" PRId64, TD_VID(pVnode), __func__, req->origTableInfoReq.ver);
37473757

37483758
SArray* cols = req->origTableInfoReq.cols;
37493759
STREAM_CHECK_NULL_GOTO(cols, terrno);
@@ -3758,7 +3768,7 @@ static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SS
37583768
OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
37593769
STREAM_CHECK_NULL_GOTO(oInfo, terrno);
37603770
STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
3761-
code = sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName);
3771+
code = sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByVersionName(&metaReader, req->origTableInfoReq.ver, oInfo->refTableName);
37623772
if (code != 0) {
37633773
code = 0;
37643774
ST_TASK_ELOG("vgId:%d %s get table entry by name:%s failed, msg:%s", TD_VID(pVnode), __func__, oInfo->refTableName, tstrerror(code));
@@ -3772,7 +3782,7 @@ static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SS
37723782
int64_t suid = metaReader.me.ctbEntry.suid;
37733783
vTableInfo->suid = suid;
37743784
tDecoderClear(&metaReader.coder);
3775-
STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByUid(&metaReader, suid));
3785+
STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByVersionUid(&metaReader, req->origTableInfoReq.ver, suid));
37763786
sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
37773787
} else if (metaReader.me.type == TD_NORMAL_TABLE) {
37783788
vTableInfo->suid = 0;
@@ -3817,13 +3827,13 @@ static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg,
38173827
SMetaReader metaReader = {0};
38183828
SMetaReader metaReaderStable = {0};
38193829
int64_t streamId = req->base.streamId;
3820-
stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
3830+
stsDebug("vgId:%d %s start, ver:%"PRId64, TD_VID(pVnode), __func__, req->virTablePseudoColReq.ver);
38213831

38223832
SArray* cols = req->virTablePseudoColReq.cids;
38233833
STREAM_CHECK_NULL_GOTO(cols, terrno);
38243834

38253835
sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
3826-
STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
3836+
STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByVersionUid(&metaReader, req->virTablePseudoColReq.ver, req->virTablePseudoColReq.uid));
38273837

38283838
STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
38293839

@@ -3842,7 +3852,7 @@ static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg,
38423852
sStreamReaderInfo->storageApi.metaReaderFn.readerReleaseLock(&metaReader);
38433853
sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
38443854

3845-
STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
3855+
STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByVersionUid(&metaReaderStable, req->virTablePseudoColReq.ver, suid));
38463856
SSchemaWrapper* sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
38473857
for (size_t i = 0; i < taosArrayGetSize(cols); i++){
38483858
col_id_t* id = taosArrayGet(cols, i);

0 commit comments

Comments
 (0)