Skip to content

Commit 6cbc74a

Browse files
[BugFix] Fix query detail lost audit items (backport #63237) (backport #63496) (#63536)
Signed-off-by: before-Sunrise <[email protected]> Co-authored-by: before-Sunrise <[email protected]>
1 parent 7f2a1de commit 6cbc74a

File tree

4 files changed

+304
-35
lines changed

4 files changed

+304
-35
lines changed

fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,6 @@ public class StmtExecutor {
281281
private List<ByteBuffer> proxyResultBuffer = null;
282282
private ShowResultSet proxyResultSet = null;
283283
private PQueryStatistics statisticsForAuditLog;
284-
private boolean statisticsConsumed = false;
285284
private List<StmtExecutor> subStmtExecutors;
286285
private Optional<Boolean> isForwardToLeaderOpt = Optional.empty();
287286
private HttpResultSender httpResultSender;
@@ -2076,46 +2075,28 @@ public void setQueryStatistics(PQueryStatistics statistics) {
20762075
}
20772076

20782077
public PQueryStatistics getQueryStatisticsForAuditLog() {
2079-
// for one StmtExecutor, only consume PQueryStatistics once
2080-
// so call getQueryStatisticsForAuditLog will return a emtpy PQueryStatistics if this is not the first call
2081-
if (statisticsConsumed) {
2082-
// create a empty PQueryStatistics
2083-
PQueryStatistics stats = normalizeQueryStatistics(null);
2084-
statisticsForAuditLog = stats;
2085-
return stats;
2078+
if (statisticsForAuditLog == null && coord != null) {
2079+
statisticsForAuditLog = coord.getAuditStatistics();
20862080
}
2087-
2088-
PQueryStatistics stats = statisticsForAuditLog;
2089-
if (stats == null && coord != null) {
2090-
// for insert stmt
2091-
stats = coord.getAuditStatistics();
2081+
if (statisticsForAuditLog == null) {
2082+
statisticsForAuditLog = new PQueryStatistics();
20922083
}
2093-
2094-
stats = normalizeQueryStatistics(stats);
2095-
2096-
statisticsForAuditLog = stats;
2097-
statisticsConsumed = true;
2098-
return stats;
2099-
}
2100-
2101-
private PQueryStatistics normalizeQueryStatistics(PQueryStatistics stats) {
2102-
PQueryStatistics normalized = (stats != null) ? stats : new PQueryStatistics();
2103-
if (normalized.scanBytes == null) {
2104-
normalized.scanBytes = 0L;
2084+
if (statisticsForAuditLog.scanBytes == null) {
2085+
statisticsForAuditLog.scanBytes = 0L;
21052086
}
2106-
if (normalized.scanRows == null) {
2107-
normalized.scanRows = 0L;
2087+
if (statisticsForAuditLog.scanRows == null) {
2088+
statisticsForAuditLog.scanRows = 0L;
21082089
}
2109-
if (normalized.cpuCostNs == null) {
2110-
normalized.cpuCostNs = 0L;
2090+
if (statisticsForAuditLog.cpuCostNs == null) {
2091+
statisticsForAuditLog.cpuCostNs = 0L;
21112092
}
2112-
if (normalized.memCostBytes == null) {
2113-
normalized.memCostBytes = 0L;
2093+
if (statisticsForAuditLog.memCostBytes == null) {
2094+
statisticsForAuditLog.memCostBytes = 0L;
21142095
}
2115-
if (normalized.spillBytes == null) {
2116-
normalized.spillBytes = 0L;
2096+
if (statisticsForAuditLog.spillBytes == null) {
2097+
statisticsForAuditLog.spillBytes = 0L;
21172098
}
2118-
return normalized;
2099+
return statisticsForAuditLog;
21192100
}
21202101

21212102
public void handleInsertOverwrite(InsertStmt insertStmt) throws Exception {
@@ -2633,7 +2614,6 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
26332614
GlobalStateMgr.getCurrentState().getOperationListenerBus()
26342615
.onDMLStmtJobTransactionFinish(txnState, database, targetTable, dmlType);
26352616
}
2636-
recordExecStatsIntoContext();
26372617
}
26382618

26392619
String errMsg = "";

test/lib/sr_sql_lib.py

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2774,3 +2774,239 @@ def regex_match(check_str: str, pattern: str):
27742774
return True
27752775

27762776
return False
2777+
2778+
def get_timestamp_ms(self):
2779+
"""
2780+
Get current timestamp in milliseconds
2781+
"""
2782+
import time
2783+
timestamp_ms = int(time.time() * 1000)
2784+
# print(f"Generated timestamp: {timestamp_ms}")
2785+
return timestamp_ms
2786+
2787+
def get_last_query_id(self):
2788+
"""
2789+
Get the query_id of the last query
2790+
"""
2791+
sql = "select last_query_id()"
2792+
result = self.execute_sql(sql, True)
2793+
2794+
if not result["status"]:
2795+
# print(f"Failed to get last query id: {result}")
2796+
return None
2797+
2798+
if "result" not in result or len(result["result"]) == 0:
2799+
# print("No query id found")
2800+
return None
2801+
2802+
query_id = result["result"][0][0]
2803+
# print(f"Last query id: {query_id}")
2804+
return query_id
2805+
2806+
def get_query_detail_by_api(self, timestamp_ms, query_id):
2807+
"""
2808+
Get query detail information through API
2809+
"""
2810+
import json
2811+
import subprocess
2812+
2813+
# Build API URL and curl command
2814+
api_url = f"http://{self.mysql_host}:{self.http_port}/api/query_detail?event_time={timestamp_ms}"
2815+
# print(f"API URL: {api_url}")
2816+
# print(f"Looking for query_id: {query_id}")
2817+
2818+
try:
2819+
# Use curl command to send HTTP request
2820+
cmd = f"curl -s --location-trusted -u {self.mysql_user}:{self.mysql_password} '{api_url}'"
2821+
# print(f"Curl command: {cmd}")
2822+
2823+
result = subprocess.run(
2824+
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", timeout=30, shell=True
2825+
)
2826+
2827+
if result.returncode != 0:
2828+
# print(
2829+
# f"Curl command failed with return code {result.returncode}")
2830+
# print(f"Stderr: {result.stderr}")
2831+
return None
2832+
2833+
if not result.stdout.strip():
2834+
# print("Empty response from API")
2835+
return None
2836+
2837+
# print(f"Raw API response: {result.stdout[:500]}...") # Only show first 500 characters
2838+
2839+
query_details = json.loads(result.stdout)
2840+
# print(f"Retrieved {len(query_details)} query details")
2841+
# print(
2842+
# f"Available query_ids: {[detail.get('queryId') for detail in query_details]}")
2843+
2844+
# Find matching query_id, prioritize records with FINISHED state
2845+
finished_detail = None
2846+
other_detail = None
2847+
2848+
for detail in query_details:
2849+
if detail.get("queryId") == query_id:
2850+
state = detail.get("state", "")
2851+
# print(
2852+
# f"Found query detail for query_id: {query_id}, state: {state}")
2853+
if state == "FINISHED":
2854+
finished_detail = detail
2855+
else:
2856+
other_detail = detail
2857+
2858+
# Return FINISHED state record first, otherwise return other state record
2859+
if finished_detail:
2860+
# print(f"Using FINISHED query detail for query_id: {query_id}")
2861+
return finished_detail
2862+
elif other_detail:
2863+
# print(
2864+
# f"Using non-FINISHED query detail for query_id: {query_id}, state: {other_detail.get('state')}")
2865+
return other_detail
2866+
else:
2867+
# print(f"No query detail found for query_id: {query_id}")
2868+
# print(
2869+
# f"Available query_ids: {[detail.get('queryId') for detail in query_details]}")
2870+
return None
2871+
2872+
except json.JSONDecodeError as e:
2873+
# print(f"Failed to parse JSON response: {e}")
2874+
# print(
2875+
# f"Response: {result.stdout if 'result' in locals() else 'No response'}")
2876+
return None
2877+
except Exception as e:
2878+
# print(f"Failed to get query detail: {e}")
2879+
# print(f"API URL: {api_url}")
2880+
# print(f"Query ID: {query_id}")
2881+
return None
2882+
2883+
def assert_query_detail_field(self, query_detail, field_name, expected_value=None):
2884+
"""
2885+
Validate field values in query detail
2886+
"""
2887+
if query_detail is None:
2888+
# print("Query detail is None")
2889+
return False
2890+
2891+
if field_name not in query_detail:
2892+
# print(f"Field {field_name} not found in query detail")
2893+
return False
2894+
2895+
actual_value = query_detail[field_name]
2896+
# print(f"Field {field_name}: {actual_value}")
2897+
2898+
if expected_value is not None:
2899+
if actual_value != expected_value:
2900+
# print(
2901+
# f"Field {field_name} mismatch: expected {expected_value}, got {actual_value}")
2902+
return False
2903+
2904+
return True
2905+
2906+
def query_detail_check(self, sql=None, expected_scan_rows=None, expected_return_rows=None):
2907+
"""
2908+
Comprehensive function to test query detail API using assertions to validate results
2909+
2910+
Args:
2911+
sql: SQL statement to execute
2912+
expected_scan_rows: Expected scanRows value
2913+
expected_return_rows: Expected returnRows value
2914+
2915+
Returns:
2916+
dict: Dictionary containing test results
2917+
"""
2918+
import time
2919+
2920+
# 1. Get timestamp
2921+
timestamp_ms = self.get_timestamp_ms()
2922+
# print(f"Test started with timestamp: {timestamp_ms}")
2923+
2924+
# 2. Execute SQL statement
2925+
# print(f"Executing SQL: {sql}")
2926+
result = self.execute_sql(sql, True)
2927+
2928+
if not result["status"]:
2929+
# print(f"Failed to execute SQL: {result}")
2930+
tools.assert_true(False, f"SQL execution failed: {result}")
2931+
2932+
# 3. Get query_id
2933+
query_id = self.get_last_query_id()
2934+
if query_id is None:
2935+
# print("Failed to get query_id")
2936+
tools.assert_true(False, "Failed to get query_id")
2937+
2938+
# 4. Wait a bit to ensure query detail is collected
2939+
time.sleep(1)
2940+
2941+
# 5. Get query detail, retry up to 3 times
2942+
query_detail = None
2943+
for retry_count in range(3):
2944+
query_detail = self.get_query_detail_by_api(timestamp_ms, query_id)
2945+
if query_detail is not None:
2946+
break
2947+
# print(f"Failed to get query detail, retry {retry_count + 1}/3")
2948+
if retry_count < 2: # Not the last retry
2949+
time.sleep(1)
2950+
2951+
if query_detail is None:
2952+
# print("Failed to get query detail after 3 retries")
2953+
tools.assert_true(
2954+
False, "Failed to get query detail after 3 retries")
2955+
2956+
# 6. Validate each field and assert
2957+
# print("=== Query Detail API Test Results ===")
2958+
# print(f"Timestamp: {timestamp_ms}")
2959+
# print(f"Query ID: {query_id}")
2960+
# print(f"Query Detail: {query_detail}")
2961+
2962+
# Validate scanRows
2963+
if expected_scan_rows is not None:
2964+
actual_scan_rows = query_detail.get("scanRows")
2965+
tools.assert_equal(
2966+
expected_scan_rows, actual_scan_rows,
2967+
f"scanRows mismatch: expected {expected_scan_rows}, got {actual_scan_rows}"
2968+
)
2969+
# print(f"✓ scanRows validation passed: {actual_scan_rows}")
2970+
else:
2971+
# print(f"scanRows: {query_detail.get('scanRows')}")
2972+
pass
2973+
2974+
# Validate returnRows
2975+
if expected_return_rows is not None:
2976+
actual_return_rows = query_detail.get("returnRows")
2977+
tools.assert_equal(
2978+
expected_return_rows, actual_return_rows,
2979+
f"returnRows mismatch: expected {expected_return_rows}, got {actual_return_rows}"
2980+
)
2981+
# print(f"✓ returnRows validation passed: {actual_return_rows}")
2982+
else:
2983+
# print(f"returnRows: {query_detail.get('returnRows')}")
2984+
pass
2985+
2986+
# Validate cpuCostNs
2987+
actual_cpu_cost_ns = query_detail.get("cpuCostNs")
2988+
tools.assert_true(
2989+
actual_cpu_cost_ns > 0,
2990+
f"cpuCostNs is negative: {actual_cpu_cost_ns}"
2991+
)
2992+
# print(f"✓ cpuCostNs validation passed: {actual_cpu_cost_ns}")
2993+
2994+
# Validate scanBytes
2995+
actual_scan_bytes = query_detail.get("scanBytes")
2996+
tools.assert_true(
2997+
actual_scan_bytes > 0,
2998+
f"scanBytes is negative: {actual_scan_bytes}"
2999+
)
3000+
# print(f"✓ scanBytes validation passed: {actual_scan_bytes}")
3001+
3002+
# Validate memCostBytes
3003+
actual_mem_cost_bytes = query_detail.get("memCostBytes")
3004+
tools.assert_true(
3005+
actual_mem_cost_bytes > 0,
3006+
f"memCostBytes is negative: {actual_mem_cost_bytes}"
3007+
)
3008+
# print(f"✓ memCostBytes validation passed: {actual_mem_cost_bytes}")
3009+
3010+
return {
3011+
"success": True
3012+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
-- name: test_query_detail_api
2+
create database db_${uuid0};
3+
-- result:
4+
-- !result
5+
use db_${uuid0};
6+
-- result:
7+
-- !result
8+
admin set frontend config ("enable_collect_query_detail_info" = "true");
9+
-- result:
10+
-- !result
11+
create table test_table (
12+
id int,
13+
name varchar(100),
14+
value double
15+
) distributed by hash(id) properties("replication_num" = "1");
16+
-- result:
17+
-- !result
18+
insert into test_table values (1, 'test1', 10.5), (2, 'test2', 20.5), (3, 'test3', 30.5);
19+
-- result:
20+
-- !result
21+
function: query_detail_check("select * from test_table", 3, 3)
22+
-- result:
23+
{'success': True}
24+
-- !result
25+
function: query_detail_check("insert into test_table select * from test_table", 3, 0)
26+
-- result:
27+
{'success': True}
28+
-- !result
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
-- name: test_query_detail_api
2+
3+
create database db_${uuid0};
4+
use db_${uuid0};
5+
6+
7+
admin set frontend config ("enable_collect_query_detail_info" = "true");
8+
9+
10+
create table test_table (
11+
id int,
12+
name varchar(100),
13+
value double
14+
) distributed by hash(id) properties("replication_num" = "1");
15+
16+
17+
insert into test_table values (1, 'test1', 10.5), (2, 'test2', 20.5), (3, 'test3', 30.5);
18+
19+
function: query_detail_check("select * from test_table", 3, 3)
20+
21+
function: query_detail_check("insert into test_table select * from test_table", 3, 0)
22+
23+
-- admin set frontend config ("enable_collect_query_detail_info" = "false");
24+
25+

0 commit comments

Comments
 (0)