Skip to content

Commit 11a8420

Browse files
committed
Avoid skipping query stats in case of slow query text/schema collection
Previously the high-frequency stats collection (which occurs every minute, except at the 10 minute mark) shared a mutex with the full snapshot collection, in order to record query stats. This effectively meant that query stats would be missed if the full snapshot took longer than 1 minute, which was observed in practice in the case of very slow query text file loading (up to 2 minutes by default, higher with changed settings), or when collecting schema data from many databases (up to 8 minutes total). Instead, give the high frequency collection its own persisted state struct and mutex, and move data from that struct during the full snapshot. Additionally, to improve correctness of statistics, start each full snapshot by calling the exact same logic that otherwise runs once a minute. Perform the query text collection afterwards, without recording statistics at that time. This effectively means that we are doing an additional pg_stat_statements access (without query text), but since that only looks at shared memory and holds the pgss lock very briefly, it should not be a problem, and avoids incorrect statistics when query text collection is slow (since pg_stat_statements looks at the statistics after accessing the query text file). In passing, improve the correctness of the next query stats collection after a reset (it had the wrong reference point before), and add a missing mutex to the state file write (which now also writes out the separate high frequency state). An existing short-coming is left for a future improvement (but with an added note), which is that state file updates only get written every 10 minutes, missing high-frequency updates inbetween in case of crashes.
1 parent bd89a1e commit 11a8420

File tree

14 files changed

+312
-244
lines changed

14 files changed

+312
-244
lines changed

input/full.go

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,41 @@ func CollectFull(ctx context.Context, server *state.Server, connection *sql.DB,
3535
return
3636
}
3737

38+
// Perform one high frequency stats collection at the exact time of the full snapshot.
39+
//
40+
// The scheduler skips the otherwise scheduled execution when the full snapshot time happens,
41+
// so we can run it inline here and pass its data along as part of this full snapshot.
42+
server.HighFreqStateMutex.Lock()
43+
newHighFreqState, err := CollectAndDiff1minStats(ctx, c, connection, ps.CollectedAt, server.HighFreqPrevState)
44+
if err != nil {
45+
logger.PrintError("Could not collect high frequency statistics for server: %s", err)
46+
err = nil
47+
} else {
48+
// Move over previously collected high frequency stats to be submitted with this full snapshot
49+
//
50+
// We don't want to delay this longer in case query text or schema metadata collection takes time.
51+
// Doing this later could mean that additional high frequency query stats are being collected and
52+
// included in the submission, whose query texts were not yet picked up with this snapshot.
53+
ts.StatementStats = newHighFreqState.UnidentifiedStatementStats
54+
ts.PlanStats = newHighFreqState.UnidentifiedPlanStats
55+
ts.ServerIoStats = newHighFreqState.QueuedServerIoStats
56+
newHighFreqState.UnidentifiedStatementStats = make(state.HistoricStatementStatsMap)
57+
newHighFreqState.UnidentifiedPlanStats = make(state.HistoricPlanStatsMap)
58+
newHighFreqState.QueuedServerIoStats = make(state.HistoricPostgresServerIoStatsMap)
59+
server.HighFreqPrevState = newHighFreqState
60+
}
61+
server.HighFreqStateMutex.Unlock()
3862
bufferCacheReady := make(chan state.BufferCache)
3963
go func() {
4064
postgres.GetBufferCache(ctx, c, server, opts, bufferCacheReady)
4165
}()
4266

43-
ps.LastStatementStatsAt = time.Now()
4467
err = postgres.SetQueryTextStatementTimeout(ctx, connection, logger, server)
4568
if err != nil {
4669
logger.PrintError("Error setting query text timeout: %s", err)
4770
return
4871
}
49-
ts.Statements, ts.StatementTexts, ps.StatementStats, err = postgres.GetStatements(ctx, c, connection, true)
72+
ts.Statements, ts.StatementTexts, _, err = postgres.GetStatements(ctx, c, connection, true)
5073
if err != nil {
5174
// Despite query performance data being an essential part of pganalyze, there are
5275
// situations where it may not be available (or it timed out), so treat it as a
@@ -67,8 +90,8 @@ func CollectFull(ctx context.Context, server *state.Server, connection *sql.DB,
6790
}
6891
err = nil
6992
} else {
70-
// Only collect plan stats when we successfully collected query stats
71-
ts.Plans, ps.PlanStats, err = postgres.GetPlans(ctx, c, connection, true)
93+
// Only collect plan texts when we successfully collected query texts
94+
ts.Plans, _, err = postgres.GetPlans(ctx, c, connection, true)
7295
if err != nil {
7396
// Accept this as a non-fatal issue as this is not a critical stats (at least for now)
7497
logger.PrintError("Skipping query plan statistics, due to error: %s", err)
@@ -84,19 +107,29 @@ func CollectFull(ctx context.Context, server *state.Server, connection *sql.DB,
84107
ps.StatementResetCounter = server.PrevState.StatementResetCounter + 1
85108
config := server.Grant.Load().Config
86109
if config.Features.StatementResetFrequency != 0 && ps.StatementResetCounter >= int(config.Features.StatementResetFrequency) {
110+
// Block concurrent collection of query stats, as that may see the actual Postgres-side
111+
// reset before we updated the struct that the collector diffs against.
112+
server.HighFreqStateMutex.Lock()
87113
ps.StatementResetCounter = 0
88114
err = postgres.ResetStatements(ctx, c, connection)
89115
if err != nil {
90116
logger.PrintError("Error calling pg_stat_statements_reset() as requested: %s", err)
91117
err = nil
92118
} else {
93119
logger.PrintInfo("Successfully called pg_stat_statements_reset() for all queries, next reset in %d hours", config.Features.StatementResetFrequency/scheduler.FullSnapshotsPerHour)
94-
_, _, ts.ResetStatementStats, err = postgres.GetStatements(ctx, c, connection, false)
120+
121+
// Make sure the next high frequency run has an empty reference point
122+
_, _, resetStatementStats, err := postgres.GetStatements(ctx, c, connection, false)
95123
if err != nil {
96124
logger.PrintError("Error collecting pg_stat_statements after reset: %s", err)
97125
err = nil
126+
newHighFreqState.StatementStats = state.PostgresStatementStatsMap{}
127+
} else {
128+
newHighFreqState.StatementStats = resetStatementStats
98129
}
130+
newHighFreqState.LastStatementStatsAt = time.Now()
99131
}
132+
server.HighFreqStateMutex.Unlock()
100133
}
101134

102135
if opts.CollectPostgresSettings {

input/full_1min.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package input
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"time"
7+
8+
"github.com/pganalyze/collector/input/postgres"
9+
"github.com/pganalyze/collector/state"
10+
"github.com/pkg/errors"
11+
)
12+
13+
// CollectAndDiff1minStats - Collects once-a-minute data of certain stats
14+
func CollectAndDiff1minStats(ctx context.Context, c *postgres.Collection, connection *sql.DB, collectedAt time.Time, prevState state.PersistedHighFreqState) (state.PersistedHighFreqState, error) {
15+
var err error
16+
17+
newState := prevState
18+
newState.LastStatementStatsAt = time.Now()
19+
20+
_, _, newState.StatementStats, err = postgres.GetStatements(ctx, c, connection, false)
21+
if err != nil {
22+
return newState, errors.Wrap(err, "error collecting pg_stat_statements")
23+
}
24+
_, newState.PlanStats, err = postgres.GetPlans(ctx, c, connection, false)
25+
if err != nil {
26+
return newState, errors.Wrap(err, "error collecting query plan stats")
27+
}
28+
29+
newState.ServerIoStats, err = postgres.GetPgStatIo(ctx, c, connection)
30+
if err != nil {
31+
return newState, errors.Wrap(err, "error collecting Postgres server statistics")
32+
}
33+
34+
// Don't calculate any diffs on the first run (but still update the state)
35+
if len(prevState.StatementStats) == 0 || prevState.LastStatementStatsAt.IsZero() {
36+
return newState, nil
37+
}
38+
39+
timeKey := state.HistoricStatsTimeKey{
40+
CollectedAt: collectedAt,
41+
CollectedIntervalSecs: uint32(newState.LastStatementStatsAt.Sub(prevState.LastStatementStatsAt) / time.Second),
42+
}
43+
44+
newState.UnidentifiedStatementStats = prevState.UnidentifiedStatementStats
45+
if newState.UnidentifiedStatementStats == nil {
46+
newState.UnidentifiedStatementStats = make(state.HistoricStatementStatsMap)
47+
}
48+
newState.UnidentifiedStatementStats[timeKey] = diffStatements(newState.StatementStats, prevState.StatementStats)
49+
50+
newState.UnidentifiedPlanStats = prevState.UnidentifiedPlanStats
51+
if newState.UnidentifiedPlanStats == nil {
52+
newState.UnidentifiedPlanStats = make(state.HistoricPlanStatsMap)
53+
}
54+
newState.UnidentifiedPlanStats[timeKey] = diffPlanStats(newState.PlanStats, prevState.PlanStats)
55+
56+
if c.PostgresVersion.Numeric >= state.PostgresVersion16 {
57+
newState.QueuedServerIoStats = prevState.QueuedServerIoStats
58+
if newState.QueuedServerIoStats == nil {
59+
newState.QueuedServerIoStats = make(state.HistoricPostgresServerIoStatsMap)
60+
}
61+
newState.QueuedServerIoStats[timeKey] = diffServerIoStats(newState.ServerIoStats, prevState.ServerIoStats)
62+
}
63+
64+
return newState, nil
65+
}
66+
67+
func diffStatements(new state.PostgresStatementStatsMap, prev state.PostgresStatementStatsMap) (diff state.DiffedPostgresStatementStatsMap) {
68+
followUpRun := len(prev) > 0
69+
diff = make(state.DiffedPostgresStatementStatsMap)
70+
71+
for key, statement := range new {
72+
var diffedStatement state.DiffedPostgresStatementStats
73+
74+
prevStatement, exists := prev[key]
75+
if exists {
76+
diffedStatement = statement.DiffSince(prevStatement)
77+
} else if followUpRun { // New statement since the last run
78+
diffedStatement = statement.DiffSince(state.PostgresStatementStats{})
79+
}
80+
81+
if diffedStatement.Calls > 0 {
82+
diff[key] = diffedStatement
83+
}
84+
}
85+
86+
return
87+
}
88+
89+
func diffPlanStats(new state.PostgresPlanStatsMap, prev state.PostgresPlanStatsMap) (diff state.DiffedPostgresPlanStatsMap) {
90+
followUpRun := len(prev) > 0
91+
diff = make(state.DiffedPostgresPlanStatsMap)
92+
93+
for key, planStats := range new {
94+
var diffedPlanStats state.DiffedPostgresStatementStats
95+
96+
prevPlanStats, exists := prev[key]
97+
if exists {
98+
diffedPlanStats = planStats.DiffSince(prevPlanStats)
99+
} else if followUpRun { // New plan since the last run
100+
diffedPlanStats = planStats.DiffSince(state.PostgresStatementStats{})
101+
}
102+
103+
if diffedPlanStats.Calls > 0 {
104+
diff[key] = diffedPlanStats
105+
}
106+
}
107+
108+
return
109+
}
110+
111+
func diffServerIoStats(new state.PostgresServerIoStatsMap, prev state.PostgresServerIoStatsMap) (diff state.DiffedPostgresServerIoStatsMap) {
112+
followUpRun := len(prev) > 0
113+
114+
diff = make(state.DiffedPostgresServerIoStatsMap)
115+
for k, stats := range new {
116+
var s state.DiffedPostgresServerIoStats
117+
prevStats, exists := prev[k]
118+
if exists {
119+
s = stats.DiffSince(prevStats)
120+
} else if followUpRun { // New since the last run
121+
s = stats.DiffSince(state.PostgresServerIoStats{})
122+
}
123+
// Skip over empty diffs (which can occur either because there was no activity, or for fixed entries that never saw activity)
124+
if s.Reads != 0 || s.Writes != 0 || s.Writebacks != 0 || s.Extends != 0 ||
125+
s.Hits != 0 || s.Evictions != 0 || s.Reuses != 0 || s.Fsyncs != 0 {
126+
diff[k] = s
127+
}
128+
}
129+
130+
return
131+
}

input/postgres/server_stats.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,6 @@ func GetServerStats(ctx context.Context, c *Collection, db *sql.DB, ps state.Per
111111
if err != nil {
112112
return ps, ts, err
113113
}
114-
ps.ServerIoStats, err = GetPgStatIo(ctx, c, db)
115-
if err != nil {
116-
return ps, ts, err
117-
}
118114

119115
// Only collect transaction ID or xmin horizon related stats with non-replicas
120116
if isReplica, err := getIsReplica(ctx, db); err == nil && !isReplica {

input/postgres/statements.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -269,20 +269,16 @@ func GetStatements(ctx context.Context, c *Collection, db *sql.DB, showtext bool
269269
return nil, nil, nil, err
270270
}
271271
query := string(bytes)
272+
ignoreIoTiming := ignoreIOTiming(c.PostgresVersion, query)
272273
key := queryKeys[idx]
273274
select {
274275
// Since normalizing can take time, explicitly check for cancellations
275276
case <-ctx.Done():
276277
return nil, nil, nil, ctx.Err()
277278
default:
278-
fingerprintAndNormalize(c, key, query, statements, statementTextsByFp)
279+
fingerprintAndNormalize(c, key, query, statements, statementTextsByFp, ignoreIoTiming)
279280
}
280-
stats := queryStats[idx]
281-
if ignoreIOTiming(c.PostgresVersion, query) {
282-
stats.BlkReadTime = 0
283-
stats.BlkWriteTime = 0
284-
}
285-
statementStats[key] = stats
281+
statementStats[key] = queryStats[idx]
286282
}
287283
}
288284

@@ -315,20 +311,22 @@ func ignoreIOTiming(postgresVersion state.PostgresVersion, receivedQuery string)
315311
var collectorQueryFingerprint = util.FingerprintText(util.QueryTextCollector)
316312
var insufficientPrivsQueryFingerprint = util.FingerprintText(util.QueryTextInsufficientPrivs)
317313

318-
func fingerprintAndNormalize(c *Collection, key state.PostgresStatementKey, text string, statements state.PostgresStatementMap, statementTextsByFp state.PostgresStatementTextMap) {
314+
func fingerprintAndNormalize(c *Collection, key state.PostgresStatementKey, text string, statements state.PostgresStatementMap, statementTextsByFp state.PostgresStatementTextMap, ignoreIoTiming bool) {
319315
if insufficientPrivilege(text) {
320316
statements[key] = state.PostgresStatement{
321317
InsufficientPrivilege: true,
322318
Fingerprint: insufficientPrivsQueryFingerprint,
319+
IgnoreIoTiming: ignoreIoTiming,
323320
}
324321
} else if collectorStatement(text) {
325322
statements[key] = state.PostgresStatement{
326-
Collector: true,
327-
Fingerprint: collectorQueryFingerprint,
323+
Collector: true,
324+
Fingerprint: collectorQueryFingerprint,
325+
IgnoreIoTiming: ignoreIoTiming,
328326
}
329327
} else {
330328
fp := util.FingerprintQuery(text, c.Config.FilterQueryText, -1)
331-
statements[key] = state.PostgresStatement{Fingerprint: fp}
329+
statements[key] = state.PostgresStatement{Fingerprint: fp, IgnoreIoTiming: ignoreIoTiming}
332330
_, ok := statementTextsByFp[fp]
333331
if !ok {
334332
statementTextsByFp[fp] = util.NormalizeQuery(text, c.Config.FilterQueryText, -1)

output/transform/postgres.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func transformPostgresServerStats(s snapshot.FullSnapshot, newState state.Persis
170170
PgStatStatementsReset: snapshot.NullTimeToNullTimestamp(diffState.PgStatStatementsStats.Reset),
171171
}
172172

173-
for timeKey, diffedStats := range transientState.HistoricServerIoStats {
173+
for timeKey, diffedStats := range transientState.ServerIoStats {
174174
// Ignore any data older than an hour, as a safety measure in case of many
175175
// failed full snapshot runs (which don't reset state)
176176
if time.Since(timeKey.CollectedAt).Hours() >= 1 {

output/transform/postgres_plans.go

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func transformQueryPlanStatistic(stats state.DiffedPostgresStatementStats, idx i
9494
}
9595
}
9696

97-
func upsertQueryPlanReference(s *snapshot.FullSnapshot, queryIdx int32, planId int64) int32 {
97+
func upsertQueryPlanReferenceAndInformation(s *snapshot.FullSnapshot, queryIdx int32, planId int64, value planValue) int32 {
9898
newRef := snapshot.QueryPlanReference{
9999
QueryIdx: queryIdx,
100100
OriginalPlanId: planId,
@@ -109,38 +109,30 @@ func upsertQueryPlanReference(s *snapshot.FullSnapshot, queryIdx int32, planId i
109109
idx := int32(len(s.QueryPlanReferences))
110110
s.QueryPlanReferences = append(s.QueryPlanReferences, &newRef)
111111

112+
var planType snapshot.QueryPlanInformation_PlanType
113+
switch value.plan.PlanType {
114+
case "no plan":
115+
planType = snapshot.QueryPlanInformation_NO_PLAN
116+
case "estimate":
117+
planType = snapshot.QueryPlanInformation_ESTIMATE
118+
case "actual":
119+
planType = snapshot.QueryPlanInformation_ACTUAL
120+
}
121+
info := snapshot.QueryPlanInformation{
122+
QueryPlanIdx: idx,
123+
ExplainPlan: value.plan.ExplainPlan,
124+
PlanCapturedTime: timestamppb.New(value.plan.PlanCapturedTime),
125+
PlanType: planType,
126+
}
127+
s.QueryPlanInformations = append(s.QueryPlanInformations, &info)
128+
112129
return idx
113130
}
114131

115132
func transformPostgresPlans(s snapshot.FullSnapshot, newState state.PersistedState, diffState state.DiffState, transientState state.TransientState, queryIDKeyToIdx QueryIDKeyToIdx) snapshot.FullSnapshot {
116-
groupedPlans := groupPlans(transientState.Plans, diffState.PlanStats, queryIDKeyToIdx)
117-
for key, value := range groupedPlans {
118-
idx := upsertQueryPlanReference(&s, key.queryIdx, key.planID)
119-
120-
var planType snapshot.QueryPlanInformation_PlanType
121-
switch value.plan.PlanType {
122-
case "no plan":
123-
planType = snapshot.QueryPlanInformation_NO_PLAN
124-
case "estimate":
125-
planType = snapshot.QueryPlanInformation_ESTIMATE
126-
case "actual":
127-
planType = snapshot.QueryPlanInformation_ACTUAL
128-
}
129-
info := snapshot.QueryPlanInformation{
130-
QueryPlanIdx: idx,
131-
ExplainPlan: value.plan.ExplainPlan,
132-
PlanCapturedTime: timestamppb.New(value.plan.PlanCapturedTime),
133-
PlanType: planType,
134-
}
135-
s.QueryPlanInformations = append(s.QueryPlanInformations, &info)
133+
var planStats []*snapshot.HistoricQueryPlanStatistics
136134

137-
// Plan stats (from a full snapshot run)
138-
stats := transformQueryPlanStatistic(value.planStats, idx)
139-
s.QueryPlanStatistics = append(s.QueryPlanStatistics, &stats)
140-
}
141-
142-
// Historic plan stats (similar to historic statement stats, collected every 1 min)
143-
for timeKey, diffedStats := range transientState.HistoricPlanStats {
135+
for timeKey, diffedStats := range transientState.PlanStats {
144136
// Ignore any data older than an hour, as a safety measure in case of many
145137
// failed full snapshot runs (which don't reset state)
146138
if time.Since(timeKey.CollectedAt).Hours() >= 1 {
@@ -153,12 +145,19 @@ func transformPostgresPlans(s snapshot.FullSnapshot, newState state.PersistedSta
153145

154146
groupedPlans := groupPlans(transientState.Plans, diffedStats, queryIDKeyToIdx)
155147
for key, value := range groupedPlans {
156-
idx := upsertQueryPlanReference(&s, key.queryIdx, key.planID)
148+
idx := upsertQueryPlanReferenceAndInformation(&s, key.queryIdx, key.planID, value)
157149
stats := transformQueryPlanStatistic(value.planStats, idx)
158150
h.Statistics = append(h.Statistics, &stats)
159151
}
160-
s.HistoricQueryPlanStatistics = append(s.HistoricQueryPlanStatistics, &h)
152+
planStats = append(planStats, &h)
161153
}
162154

155+
if len(planStats) == 0 {
156+
return s
157+
}
158+
159+
s.QueryPlanStatistics = planStats[len(planStats)-1].Statistics
160+
s.HistoricQueryPlanStatistics = planStats[:len(planStats)-1]
161+
163162
return s
164163
}

0 commit comments

Comments
 (0)