Skip to content

Commit a94abd9

Browse files
committed
feat: add chunk bytes library in prep for singleton schema work
1 parent 01d72a1 commit a94abd9

File tree

2 files changed

+1091
-0
lines changed

2 files changed

+1091
-0
lines changed
Lines changed: 379 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,379 @@
1+
package common
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
sq "github.com/Masterminds/squirrel"
9+
)
10+
11+
// ChunkedBytesTransaction defines the interface for executing SQL queries within a transaction.
12+
type ChunkedBytesTransaction interface {
13+
// ExecuteWrite executes an INSERT query.
14+
ExecuteWrite(ctx context.Context, builder sq.InsertBuilder) error
15+
16+
// ExecuteDelete executes a DELETE query.
17+
ExecuteDelete(ctx context.Context, builder sq.DeleteBuilder) error
18+
19+
// ExecuteUpdate executes an UPDATE query.
20+
ExecuteUpdate(ctx context.Context, builder sq.UpdateBuilder) error
21+
}
22+
23+
// ChunkedBytesExecutor defines the interface for creating transactions for chunked byte operations.
24+
type ChunkedBytesExecutor interface {
25+
// BeginTransaction starts a new transaction for chunked byte operations.
26+
BeginTransaction(ctx context.Context) (ChunkedBytesTransaction, error)
27+
28+
// ExecuteRead executes a SELECT query and returns the results as a map of chunk index to chunk data.
29+
ExecuteRead(ctx context.Context, builder sq.SelectBuilder) (map[int][]byte, error)
30+
}
31+
32+
// WriteMode defines how chunked data should be written.
33+
type WriteMode int
34+
35+
const (
36+
// WriteModeInsertWithTombstones inserts new chunks and marks old chunks with a tombstone.
37+
// Requires TombstoneColumn to be set in config.
38+
WriteModeInsertWithTombstones WriteMode = iota
39+
40+
// WriteModeDeleteAndInsert deletes all existing chunks for the key before inserting new ones.
41+
// Useful for replacing data completely.
42+
WriteModeDeleteAndInsert
43+
)
44+
45+
// SQLByteChunkerConfig contains the configuration for creating a SQLByteChunker.
46+
type SQLByteChunkerConfig[T any] struct {
47+
// TableName is the name of the table storing the chunked data.
48+
TableName string
49+
50+
// NameColumn is the column name that stores the identifier for the byte data.
51+
NameColumn string
52+
53+
// ChunkIndexColumn is the column name that stores the chunk index (0-based).
54+
ChunkIndexColumn string
55+
56+
// ChunkDataColumn is the column name that stores the chunk bytes.
57+
ChunkDataColumn string
58+
59+
// MaxChunkSize is the maximum size in bytes for each chunk.
60+
MaxChunkSize int
61+
62+
// PlaceholderFormat is the placeholder format for SQL queries (e.g., sq.Question, sq.Dollar).
63+
PlaceholderFormat sq.PlaceholderFormat
64+
65+
// Executor is the executor for running SQL queries.
66+
Executor ChunkedBytesExecutor
67+
68+
// WriteMode defines how chunked data should be written (insert-with-tombstones or delete-and-insert).
69+
WriteMode WriteMode
70+
71+
// CreatedAtColumn is the column name that stores when a row was created (alive timestamp/transaction ID).
72+
// Required when WriteMode is WriteModeInsertWithTombstones.
73+
CreatedAtColumn string
74+
75+
// DeletedAtColumn is the column name that stores when a row was deleted (tombstone timestamp/transaction ID).
76+
// Required when WriteMode is WriteModeInsertWithTombstones.
77+
DeletedAtColumn string
78+
79+
// AliveValue is the value used to indicate a row has not been deleted yet (typically max int).
80+
// Required when WriteMode is WriteModeInsertWithTombstones.
81+
AliveValue T
82+
}
83+
84+
// SQLByteChunker provides methods for reading and writing byte data
85+
// that is chunked across multiple rows in a SQL table.
86+
type SQLByteChunker[T any] struct {
87+
tableName string
88+
nameColumn string
89+
chunkIndexColumn string
90+
chunkDataColumn string
91+
maxChunkSize int
92+
placeholderFormat sq.PlaceholderFormat
93+
executor ChunkedBytesExecutor
94+
writeMode WriteMode
95+
createdAtColumn string
96+
deletedAtColumn string
97+
aliveValue T
98+
}
99+
100+
// MustNewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
101+
// Panics if the configuration is invalid.
102+
func MustNewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) *SQLByteChunker[T] {
103+
if config.MaxChunkSize <= 0 {
104+
panic("maxChunkSize must be greater than 0")
105+
}
106+
if config.TableName == "" {
107+
panic("tableName cannot be empty")
108+
}
109+
if config.NameColumn == "" {
110+
panic("nameColumn cannot be empty")
111+
}
112+
if config.ChunkIndexColumn == "" {
113+
panic("chunkIndexColumn cannot be empty")
114+
}
115+
if config.ChunkDataColumn == "" {
116+
panic("chunkDataColumn cannot be empty")
117+
}
118+
if config.PlaceholderFormat == nil {
119+
panic("placeholderFormat cannot be nil")
120+
}
121+
if config.Executor == nil {
122+
panic("executor cannot be nil")
123+
}
124+
if config.WriteMode == WriteModeInsertWithTombstones {
125+
if config.CreatedAtColumn == "" {
126+
panic("createdAtColumn is required when using WriteModeInsertWithTombstones")
127+
}
128+
if config.DeletedAtColumn == "" {
129+
panic("deletedAtColumn is required when using WriteModeInsertWithTombstones")
130+
}
131+
}
132+
133+
return &SQLByteChunker[T]{
134+
tableName: config.TableName,
135+
nameColumn: config.NameColumn,
136+
chunkIndexColumn: config.ChunkIndexColumn,
137+
chunkDataColumn: config.ChunkDataColumn,
138+
maxChunkSize: config.MaxChunkSize,
139+
placeholderFormat: config.PlaceholderFormat,
140+
executor: config.Executor,
141+
writeMode: config.WriteMode,
142+
createdAtColumn: config.CreatedAtColumn,
143+
deletedAtColumn: config.DeletedAtColumn,
144+
aliveValue: config.AliveValue,
145+
}
146+
}
147+
148+
// WriteChunkedBytes writes chunked byte data to the database within a transaction.
149+
//
150+
// Parameters:
151+
// - ctx: Context for the operation
152+
// - name: The unique identifier for this byte data
153+
// - data: The bytes to be chunked and stored
154+
// - createdAtValue: The value for the created_at column (typically a transaction ID or timestamp).
155+
// Required when using WriteModeInsertWithTombstones. For WriteModeDeleteAndInsert, this parameter is ignored.
156+
func (c *SQLByteChunker[T]) WriteChunkedBytes(
157+
ctx context.Context,
158+
name string,
159+
data []byte,
160+
createdAtValue T,
161+
) error {
162+
if name == "" {
163+
return errors.New("name cannot be empty")
164+
}
165+
166+
// Begin transaction
167+
txn, err := c.executor.BeginTransaction(ctx)
168+
if err != nil {
169+
return fmt.Errorf("failed to begin transaction: %w", err)
170+
}
171+
172+
// Handle existing chunks based on write mode
173+
switch c.writeMode {
174+
case WriteModeDeleteAndInsert:
175+
// Delete all existing chunks
176+
deleteBuilder := sq.StatementBuilder.
177+
PlaceholderFormat(c.placeholderFormat).
178+
Delete(c.tableName).
179+
Where(sq.Eq{c.nameColumn: name})
180+
181+
if err := txn.ExecuteDelete(ctx, deleteBuilder); err != nil {
182+
return fmt.Errorf("failed to delete existing chunks: %w", err)
183+
}
184+
case WriteModeInsertWithTombstones:
185+
// Mark existing alive chunks with tombstone
186+
updateBuilder := sq.StatementBuilder.
187+
PlaceholderFormat(c.placeholderFormat).
188+
Update(c.tableName).
189+
Set(c.deletedAtColumn, createdAtValue).
190+
Where(sq.Eq{c.nameColumn: name}).
191+
Where(sq.Eq{c.deletedAtColumn: c.aliveValue})
192+
193+
if err := txn.ExecuteUpdate(ctx, updateBuilder); err != nil {
194+
return fmt.Errorf("failed to tombstone existing chunks: %w", err)
195+
}
196+
}
197+
198+
// Build the insert query
199+
insertBuilder := sq.StatementBuilder.
200+
PlaceholderFormat(c.placeholderFormat).
201+
Insert(c.tableName)
202+
203+
// Chunk the data
204+
chunks := c.chunkData(data)
205+
if len(chunks) == 0 {
206+
// Handle empty data case - insert a single empty chunk
207+
chunks = [][]byte{{}}
208+
}
209+
210+
// Set up the columns - base columns plus created_at (if using tombstone mode)
211+
columns := []string{c.nameColumn, c.chunkIndexColumn, c.chunkDataColumn}
212+
if c.writeMode == WriteModeInsertWithTombstones {
213+
columns = append(columns, c.createdAtColumn)
214+
}
215+
insertBuilder = insertBuilder.Columns(columns...)
216+
217+
// Add each chunk as a row
218+
for index, chunk := range chunks {
219+
values := []any{name, index, chunk}
220+
221+
// Add created_at value if using tombstone mode (deleted_at is written automatically)
222+
if c.writeMode == WriteModeInsertWithTombstones {
223+
values = append(values, createdAtValue)
224+
}
225+
226+
insertBuilder = insertBuilder.Values(values...)
227+
}
228+
229+
// Execute the insert
230+
if err := txn.ExecuteWrite(ctx, insertBuilder); err != nil {
231+
return fmt.Errorf("failed to insert chunks: %w", err)
232+
}
233+
234+
return nil
235+
}
236+
237+
// DeleteChunkedBytes deletes or tombstones all chunks for a given name within a transaction.
238+
//
239+
// Parameters:
240+
// - ctx: Context for the operation
241+
// - name: The unique identifier for the byte data to delete
242+
// - deletedAtValue: The value to write to the deleted_at column (typically a transaction ID or timestamp).
243+
// Required when using WriteModeInsertWithTombstones. For WriteModeDeleteAndInsert, this parameter is ignored.
244+
func (c *SQLByteChunker[T]) DeleteChunkedBytes(
245+
ctx context.Context,
246+
name string,
247+
deletedAtValue T,
248+
) error {
249+
if name == "" {
250+
return errors.New("name cannot be empty")
251+
}
252+
253+
// Begin transaction
254+
txn, err := c.executor.BeginTransaction(ctx)
255+
if err != nil {
256+
return fmt.Errorf("failed to begin transaction: %w", err)
257+
}
258+
259+
switch c.writeMode {
260+
case WriteModeDeleteAndInsert:
261+
// Actually delete the chunks
262+
deleteBuilder := sq.StatementBuilder.
263+
PlaceholderFormat(c.placeholderFormat).
264+
Delete(c.tableName).
265+
Where(sq.Eq{c.nameColumn: name})
266+
267+
if err := txn.ExecuteDelete(ctx, deleteBuilder); err != nil {
268+
return fmt.Errorf("failed to delete chunks: %w", err)
269+
}
270+
case WriteModeInsertWithTombstones:
271+
// Mark alive chunks with tombstone by setting deleted_at column
272+
updateBuilder := sq.StatementBuilder.
273+
PlaceholderFormat(c.placeholderFormat).
274+
Update(c.tableName).
275+
Set(c.deletedAtColumn, deletedAtValue).
276+
Where(sq.Eq{c.nameColumn: name}).
277+
Where(sq.Eq{c.deletedAtColumn: c.aliveValue})
278+
279+
if err := txn.ExecuteUpdate(ctx, updateBuilder); err != nil {
280+
return fmt.Errorf("failed to tombstone chunks: %w", err)
281+
}
282+
}
283+
284+
return nil
285+
}
286+
287+
// ReadChunkedBytes reads and reassembles chunked byte data from the database.
288+
//
289+
// Parameters:
290+
// - ctx: Context for the operation
291+
// - name: The unique identifier for the byte data to read
292+
//
293+
// Returns the reassembled byte data or an error if chunks are missing or invalid.
294+
func (c *SQLByteChunker[T]) ReadChunkedBytes(
295+
ctx context.Context,
296+
name string,
297+
) ([]byte, error) {
298+
if name == "" {
299+
return nil, errors.New("name cannot be empty")
300+
}
301+
302+
selectBuilder := sq.StatementBuilder.
303+
PlaceholderFormat(c.placeholderFormat).
304+
Select(c.chunkIndexColumn, c.chunkDataColumn).
305+
From(c.tableName).
306+
Where(sq.Eq{c.nameColumn: name}).
307+
OrderBy(c.chunkIndexColumn + " ASC")
308+
309+
// Execute the query
310+
chunks, err := c.executor.ExecuteRead(ctx, selectBuilder)
311+
if err != nil {
312+
return nil, fmt.Errorf("failed to read chunks: %w", err)
313+
}
314+
315+
// Reassemble the chunks
316+
data, err := c.reassembleChunks(chunks)
317+
if err != nil {
318+
return nil, fmt.Errorf("failed to reassemble chunks: %w", err)
319+
}
320+
321+
return data, nil
322+
}
323+
324+
// reassembleChunks takes the chunks read from the database and reassembles them
325+
// into the original byte array. It validates that all chunks are present and in order.
326+
func (c *SQLByteChunker[T]) reassembleChunks(chunks map[int][]byte) ([]byte, error) {
327+
if len(chunks) == 0 {
328+
return nil, errors.New("no chunks found")
329+
}
330+
331+
// Validate that we have all chunks from 0 to N-1
332+
maxIndex := -1
333+
for index := range chunks {
334+
if index > maxIndex {
335+
maxIndex = index
336+
}
337+
}
338+
339+
// Check for missing chunks
340+
for i := 0; i <= maxIndex; i++ {
341+
if _, exists := chunks[i]; !exists {
342+
return nil, fmt.Errorf("missing chunk at index %d", i)
343+
}
344+
}
345+
346+
// Calculate total size
347+
totalSize := 0
348+
for _, chunk := range chunks {
349+
totalSize += len(chunk)
350+
}
351+
352+
// Reassemble
353+
result := make([]byte, 0, totalSize)
354+
for i := 0; i <= maxIndex; i++ {
355+
result = append(result, chunks[i]...)
356+
}
357+
358+
return result, nil
359+
}
360+
361+
// chunkData splits the data into chunks of maxChunkSize.
362+
func (c *SQLByteChunker[T]) chunkData(data []byte) [][]byte {
363+
if len(data) == 0 {
364+
return nil
365+
}
366+
367+
numChunks := (len(data) + c.maxChunkSize - 1) / c.maxChunkSize
368+
chunks := make([][]byte, 0, numChunks)
369+
370+
for i := 0; i < len(data); i += c.maxChunkSize {
371+
end := i + c.maxChunkSize
372+
if end > len(data) {
373+
end = len(data)
374+
}
375+
chunks = append(chunks, data[i:end])
376+
}
377+
378+
return chunks
379+
}

0 commit comments

Comments
 (0)