Skip to content

Commit fe41c83

Browse files
[UT] [BugFix] Trigger to refresh related mvs when base tables has deleted rows (backport #48106) (#48132)
Signed-off-by: shuming.li <[email protected]> Co-authored-by: shuming.li <[email protected]>
1 parent a03d868 commit fe41c83

File tree

8 files changed

+214
-1
lines changed

8 files changed

+214
-1
lines changed

fe/fe-core/src/main/java/com/starrocks/listener/GlobalLoadJobListenerBus.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,16 @@ public void onStreamJobTransactionFinish(TransactionState transactionState) {
8888
}
8989
listeners.stream().forEach(listener -> listener.onStreamLoadTransactionFinish(transactionState));
9090
}
91+
92+
/**
93+
* Do all callbacks after `delete` transaction is finished.
94+
* @param db database of the target table
95+
* @param table table of the target table
96+
*/
97+
public void onDeleteJobTransactionFinish(Database db, Table table) {
98+
if (db == null || table == null) {
99+
return;
100+
}
101+
listeners.stream().forEach(listener -> listener.onDeleteJobTransactionFinish(db, table));
102+
}
91103
}

fe/fe-core/src/main/java/com/starrocks/listener/LoadJobListener.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,11 @@ public interface LoadJobListener {
5050
* @param table target table that has changed
5151
*/
5252
void onInsertOverwriteJobCommitFinish(Database db, Table table);
53+
54+
/**
55+
* Listener after `Delete` transaction is finished, which is only triggered without an error.
56+
* @param db database of the target table
57+
* @param table target table that has changed
58+
*/
59+
void onDeleteJobTransactionFinish(Database db, Table table);
5360
}

fe/fe-core/src/main/java/com/starrocks/listener/LoadJobMVListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,4 +156,9 @@ private List<PartitionCommitInfo> getPartitionCommitInfos(TransactionState txnSt
156156
}
157157
return new ArrayList<>(tableCommitInfo.getIdToPartitionCommitInfo().values());
158158
}
159+
160+
@Override
161+
public void onDeleteJobTransactionFinish(Database db, Table table) {
162+
triggerToRefreshRelatedMVs(db, table);
163+
}
159164
}

fe/fe-core/src/main/java/com/starrocks/listener/LoadJobStatsListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,9 @@ private void onTransactionFinish(TransactionState transactionState, boolean sync
9090
LOG.warn("refresh mv after publish version failed:", DebugUtil.getStackTrace(t));
9191
}
9292
}
93+
94+
@Override
95+
public void onDeleteJobTransactionFinish(Database db, Table table) {
96+
// do nothing
97+
}
9398
}

fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
import com.starrocks.persist.metablock.SRMetaBlockWriter;
9696
import com.starrocks.planner.PartitionColumnFilter;
9797
import com.starrocks.planner.RangePartitionPruner;
98+
import com.starrocks.qe.QueryState;
9899
import com.starrocks.qe.QueryStateException;
99100
import com.starrocks.server.GlobalStateMgr;
100101
import com.starrocks.service.FrontendOptions;
@@ -175,8 +176,8 @@ public void process(DeleteStmt stmt) throws DdlException, QueryStateException {
175176
}
176177

177178
DeleteJob deleteJob = null;
179+
Table table = null;
178180
try {
179-
Table table = null;
180181
long transactionId = -1L;
181182
List<Partition> partitions = Lists.newArrayList();
182183
db.readLock();
@@ -211,6 +212,13 @@ public void process(DeleteStmt stmt) throws DdlException, QueryStateException {
211212
}
212213

213214
deleteJob.run(stmt, db, table, partitions);
215+
} catch (QueryStateException e) {
216+
// If delete success, it will throw QueryStateException(QueryState.MysqlStateType.OK, sb.toString()).
217+
if (e.getQueryState().getStateType() == QueryState.MysqlStateType.OK) {
218+
// trigger after a delete job finished
219+
GlobalStateMgr.getCurrentState().getOperationListenerBus().onDeleteJobTransactionFinish(db, table);
220+
}
221+
throw e;
214222
} finally {
215223
if (!FeConstants.runningUnitTest) {
216224
clearJob(deleteJob);

fe/fe-core/src/test/java/com/starrocks/utframe/StarRocksAssert.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,46 @@ public StarRocksAssert refreshMvPartition(String sql) throws Exception {
764764
return this;
765765
}
766766

767+
/**
768+
* Wait the input mv refresh task finished.
769+
* @param mvId: mv id
770+
* @return true if the mv refresh task finished, otherwise false.
771+
*/
772+
public boolean waitRefreshFinished(long mvId) {
773+
TaskManager tm = GlobalStateMgr.getCurrentState().getTaskManager();
774+
Task task = tm.getTask(TaskBuilder.getMvTaskName(mvId));
775+
Assert.assertTrue(task != null);
776+
TaskRunManager taskRunManager = tm.getTaskRunManager();
777+
TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler();
778+
TaskRun taskRun = taskRunScheduler.getRunnableTaskRun(task.getId());
779+
int maxTimes = 1200;
780+
int count = 0;
781+
while (taskRun != null && count < maxTimes) {
782+
ThreadUtil.sleepAtLeastIgnoreInterrupts(500L);
783+
taskRun = taskRunScheduler.getRunnableTaskRun(task.getId());
784+
count += 1;
785+
}
786+
return taskRun == null;
787+
}
788+
789+
/**
790+
* Refresh materialized view asynchronously.
791+
* @param ctx connnect context
792+
* @param mvName mv's name
793+
*/
794+
public StarRocksAssert refreshMV(ConnectContext ctx, String mvName) throws Exception {
795+
String sql = "REFRESH MATERIALIZED VIEW " + mvName;
796+
StatementBase stmt = UtFrameUtils.parseStmtWithNewParser(sql, ctx);
797+
RefreshMaterializedViewStatement refreshMaterializedViewStatement = (RefreshMaterializedViewStatement) stmt;
798+
TableName tableName = refreshMaterializedViewStatement.getMvName();
799+
Database db = GlobalStateMgr.getCurrentState().getDb(tableName.getDb());
800+
Table table = db.getTable(tableName.getTbl());
801+
Assert.assertNotNull(table);
802+
Assert.assertTrue(table instanceof MaterializedView);
803+
ctx.executeSql(sql);
804+
return this;
805+
}
806+
767807
public StarRocksAssert refreshMV(String sql) throws Exception {
768808
StatementBase stmt = UtFrameUtils.parseStmtWithNewParser(sql, ctx);
769809
RefreshMaterializedViewStatement refreshMaterializedViewStatement = (RefreshMaterializedViewStatement) stmt;
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
-- name: test_mv_refresh_with_delete
2+
create database db_${uuid0};
3+
-- result:
4+
-- !result
5+
use db_${uuid0};
6+
-- result:
7+
-- !result
8+
CREATE TABLE `t1` (
9+
`k1` date,
10+
`k2` int,
11+
`k3` int
12+
)
13+
DUPLICATE KEY(`k1`)
14+
PARTITION BY RANGE (k1) (
15+
START ("2020-10-01") END ("2022-03-04") EVERY (INTERVAL 15 day)
16+
)
17+
DISTRIBUTED BY HASH(`k1`) BUCKETS 3;
18+
-- result:
19+
-- !result
20+
CREATE TABLE `t2` (
21+
`k1` date,
22+
`k2` int,
23+
`k3` int
24+
)
25+
DUPLICATE KEY(`k1`)
26+
DISTRIBUTED BY HASH(`k1`) BUCKETS 3;
27+
-- result:
28+
-- !result
29+
INSERT INTO t1 VALUES ("2020-11-10",1,1);
30+
-- result:
31+
-- !result
32+
INSERT INTO t2 VALUES ("2020-11-10",2,2);
33+
-- result:
34+
-- !result
35+
CREATE MATERIALIZED VIEW test_mv1
36+
PARTITION BY k3
37+
DISTRIBUTED BY HASH(k1) BUCKETS 10
38+
REFRESH ASYNC
39+
PROPERTIES ("partition_refresh_number"="-1")
40+
AS SELECT t2.k1 as k1, t2.k2 as k2, t1.k1 as k3, t1.k2 as k4
41+
FROM t1 join t2 on t1.k1=t2.k1;
42+
-- result:
43+
-- !result
44+
function: wait_async_materialized_view_finish("db_${uuid0}", "test_mv1")
45+
-- result:
46+
None
47+
-- !result
48+
select * from test_mv1 order by k1, k2;
49+
-- result:
50+
2020-11-10 2 2020-11-10 1
51+
-- !result
52+
INSERT INTO t2 VALUES ("2020-11-10",3, 3);
53+
-- result:
54+
-- !result
55+
INSERT INTO t1 VALUES ("2020-11-10",4,4);
56+
-- result:
57+
-- !result
58+
function: wait_async_materialized_view_finish("db_${uuid0}", "test_mv1")
59+
-- result:
60+
None
61+
-- !result
62+
select * from test_mv1 order by k1, k2;
63+
-- result:
64+
2020-11-10 2 2020-11-10 1
65+
2020-11-10 2 2020-11-10 4
66+
2020-11-10 3 2020-11-10 1
67+
2020-11-10 3 2020-11-10 4
68+
-- !result
69+
DELETE FROM t2 WHERE k2=2;
70+
-- result:
71+
-- !result
72+
function: wait_async_materialized_view_finish("db_${uuid0}", "test_mv1")
73+
-- result:
74+
None
75+
-- !result
76+
select * from test_mv1 order by k1, k2;
77+
-- result:
78+
2020-11-10 3 2020-11-10 1
79+
2020-11-10 3 2020-11-10 4
80+
-- !result
81+
drop table t1;
82+
-- result:
83+
-- !result
84+
drop table t2;
85+
-- result:
86+
-- !result
87+
drop database db_${uuid0} force;
88+
-- result:
89+
-- !result
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
-- name: test_mv_refresh_with_delete
2+
3+
create database db_${uuid0};
4+
use db_${uuid0};
5+
CREATE TABLE `t1` (
6+
`k1` date,
7+
`k2` int,
8+
`k3` int
9+
)
10+
DUPLICATE KEY(`k1`)
11+
PARTITION BY RANGE (k1) (
12+
START ("2020-10-01") END ("2022-03-04") EVERY (INTERVAL 15 day)
13+
)
14+
DISTRIBUTED BY HASH(`k1`) BUCKETS 3;
15+
CREATE TABLE `t2` (
16+
`k1` date,
17+
`k2` int,
18+
`k3` int
19+
)
20+
DUPLICATE KEY(`k1`)
21+
DISTRIBUTED BY HASH(`k1`) BUCKETS 3;
22+
23+
INSERT INTO t1 VALUES ("2020-11-10",1,1);
24+
INSERT INTO t2 VALUES ("2020-11-10",2,2);
25+
26+
CREATE MATERIALIZED VIEW test_mv1
27+
PARTITION BY k3
28+
DISTRIBUTED BY HASH(k1) BUCKETS 10
29+
REFRESH ASYNC
30+
PROPERTIES ("partition_refresh_number"="-1")
31+
AS SELECT t2.k1 as k1, t2.k2 as k2, t1.k1 as k3, t1.k2 as k4
32+
FROM t1 join t2 on t1.k1=t2.k1;
33+
function: wait_async_materialized_view_finish("db_${uuid0}", "test_mv1")
34+
select * from test_mv1 order by k1, k2;
35+
36+
INSERT INTO t2 VALUES ("2020-11-10",3, 3);
37+
INSERT INTO t1 VALUES ("2020-11-10",4,4);
38+
function: wait_async_materialized_view_finish("db_${uuid0}", "test_mv1")
39+
select * from test_mv1 order by k1, k2;
40+
41+
DELETE FROM t2 WHERE k2=2;
42+
function: wait_async_materialized_view_finish("db_${uuid0}", "test_mv1")
43+
select * from test_mv1 order by k1, k2;
44+
45+
drop table t1;
46+
drop table t2;
47+
drop database db_${uuid0} force;

0 commit comments

Comments
 (0)