Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ recompress_chunk_impl(Chunk *chunk, Oid *uncompressed_chunk_id, bool recompress)
* 3. Fallback to decompress/compress: When neither strategy is applicable
*/

if (ts_chunk_is_partial(chunk))
if (ts_chunk_is_partial(chunk) && !recompress)
{
if (!ts_guc_enable_segmentwise_recompression)
{
Expand Down
14 changes: 1 addition & 13 deletions tsl/src/compression/recompress.c
Original file line number Diff line number Diff line change
Expand Up @@ -759,9 +759,6 @@ perform_recompression(RecompressContext *recompress_ctx, Relation compressed_chu

/*
* Perform per segment in-memory recompression of a compressed chunk.
*
* Note: This function will early return if the chunk is not suitable for
* recompression (e.g., partial, unordered, frozen).
*/
bool
recompress_chunk_in_memory_impl(Chunk *uncompressed_chunk)
Expand All @@ -771,16 +768,7 @@ recompress_chunk_in_memory_impl(Chunk *uncompressed_chunk)

Ensure(ts_guc_enable_in_memory_recompression, "in-memory recompression functionality disabled");

/*
* Only proceed if chunk is in compressed state without partial or unordered status
* Status meanings:
* 1: compressed
* 2: compressed_unordered TODO: add support
* 4: frozen
* 8: compressed_partial
*/
if (!ts_chunk_is_compressed(uncompressed_chunk) || ts_chunk_is_partial(uncompressed_chunk) ||
ts_chunk_is_unordered(uncompressed_chunk) || ts_chunk_is_frozen(uncompressed_chunk))
if (!ts_chunk_is_compressed(uncompressed_chunk) || ts_chunk_is_frozen(uncompressed_chunk))
return false;

Chunk *compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true);
Expand Down
114 changes: 114 additions & 0 deletions tsl/test/expected/recompression_integrity_tests.out
Original file line number Diff line number Diff line change
Expand Up @@ -694,3 +694,117 @@ SELECT * FROM recomp_guc_test ORDER BY time, device;

RESET timescaledb.enable_in_memory_recompression;
DROP TABLE recomp_guc_test CASCADE;
-- Test Case 8: Recompression for partial chunks. Should only recompress the columnstore part
SET timescaledb.enable_direct_compress_insert = true;
SET timescaledb.enable_direct_compress_insert_client_sorted = true;
DROP TABLE IF EXISTS recomp_partial CASCADE;
NOTICE: table "recomp_partial" does not exist, skipping
CREATE TABLE recomp_partial (time TIMESTAMPTZ NOT NULL, device TEXT, value float) WITH (tsdb.hypertable, tsdb.orderby='time');
NOTICE: using column "time" as partitioning column
INSERT INTO recomp_partial SELECT '2025-01-01'::timestamptz + (i || ' minute')::interval, 'd1', i::float FROM generate_series(0,100) i;
INSERT INTO recomp_partial SELECT '2025-01-01'::timestamptz + (i || ' minute')::interval, 'd1', i::float FROM generate_series(101,750) i;
-- less than 10 tuples will not undergo direct compression
INSERT INTO recomp_partial SELECT '2025-01-01'::timestamptz + (i || ' minute')::interval, 'd1', i::float FROM generate_series(751,755) i;
WARNING: disabling direct compress because of too small batch size
-- status should be compressed, partial.
SELECT chunk, _timescaledb_functions.chunk_status_text(chunk) FROM show_chunks('recomp_partial') chunk;
chunk | chunk_status_text
------------------------------------------+----------------------
_timescaledb_internal._hyper_15_28_chunk | {COMPRESSED,PARTIAL}

\set TEST_TABLE_NAME 'recomp_partial'
\ir :RECOMPRESSION_INTEGRITY_CHECK_RELPATH
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\set TEST_BASE_NAME recompression_intergrity_check
SELECT format('%s/results/%s_results_compressed.out', :'TEST_OUTPUT_DIR', :'TEST_BASE_NAME') as "TEST_RESULTS_COMPRESS",
format('%s/results/%s_results_recompressed.out', :'TEST_OUTPUT_DIR', :'TEST_BASE_NAME') as "TEST_RESULTS_RECOMPRESS"
\gset
SELECT format('\! diff -u --label "Compressed result" --label "Recompressed result" %s %s', :'TEST_RESULTS_COMPRESS', :'TEST_RESULTS_RECOMPRESS') as "DIFF_CMD"
\gset
-- Store initial compressed chunk info before recompression
SELECT uncompressed.schema_name || '.' || uncompressed.table_name AS "OLD_CHUNK_NAME",
compressed.schema_name || '.' || compressed.table_name AS "OLD_COMPRESSED_CHUNK_NAME",
compressed.id AS "OLD_CHUNK_ID"
FROM _timescaledb_catalog.chunk uncompressed
JOIN _timescaledb_catalog.chunk compressed
ON uncompressed.compressed_chunk_id = compressed.id
WHERE uncompressed.hypertable_id = (
SELECT id
FROM _timescaledb_catalog.hypertable
WHERE table_name = :'TEST_TABLE_NAME'
)
LIMIT 1 \gset
\set COMPRESSED_CHUNK_NAME :OLD_COMPRESSED_CHUNK_NAME
:BATCH_METADATA_QUERY
_ts_meta_count | _ts_meta_min_1 | _ts_meta_max_1
----------------+------------------------------+------------------------------
101 | Wed Jan 01 00:00:00 2025 PST | Wed Jan 01 01:40:00 2025 PST
650 | Wed Jan 01 01:41:00 2025 PST | Wed Jan 01 12:30:00 2025 PST

\set QUERY1 'SELECT COUNT(*) FROM ' :OLD_CHUNK_NAME ';'
\set QUERY2 'SELECT * FROM ' :OLD_CHUNK_NAME ';'
\o :TEST_RESULTS_COMPRESS
:QUERY1
:QUERY2
\o
-- Recompress the chunk in-memory
SELECT compress_chunk(:'OLD_CHUNK_NAME', recompress := true);
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_15_28_chunk

-- Get info for the new compressed chunk
SELECT compressed.schema_name || '.' || compressed.table_name AS "NEW_COMPRESSED_CHUNK_NAME",
compressed.id AS "NEW_CHUNK_ID"
FROM _timescaledb_catalog.chunk uncompressed
JOIN _timescaledb_catalog.chunk compressed
ON uncompressed.compressed_chunk_id = compressed.id
WHERE uncompressed.schema_name || '.' || uncompressed.table_name = :'OLD_CHUNK_NAME'
LIMIT 1 \gset
\set COMPRESSED_CHUNK_NAME :NEW_COMPRESSED_CHUNK_NAME
:BATCH_METADATA_QUERY
_ts_meta_count | _ts_meta_min_1 | _ts_meta_max_1
----------------+------------------------------+------------------------------
751 | Wed Jan 01 00:00:00 2025 PST | Wed Jan 01 12:30:00 2025 PST

-- Get data after in-memory recompression
\o :TEST_RESULTS_RECOMPRESS
:QUERY1
:QUERY2
\o
-- Check if a new chunk was created (this will show in the output)
SELECT
CASE WHEN :'NEW_CHUNK_ID' IS NULL OR :'OLD_CHUNK_ID' = :'NEW_CHUNK_ID' THEN
'ERROR: Recompression did not create a new chunk'
ELSE
'SUCCESS: New chunk created, old_chunk_id=' || :'OLD_CHUNK_ID' || ', new_chunk_id=' || :'NEW_CHUNK_ID'
END AS recompression_status;
recompression_status
--------------------------------------------------------------
SUCCESS: New chunk created, old_chunk_id=29, new_chunk_id=30

-- Compare result using diff to validate integrity of recompressed data
:DIFF_CMD
-- extra checks
SELECT chunk FROM show_chunks('recomp_partial') AS chunk LIMIT 1 \gset
SELECT _timescaledb_functions.chunk_status_text(:'chunk'::regclass); -- should be compressed, partial
chunk_status_text
----------------------
{COMPRESSED,PARTIAL}

SELECT COUNT(*) FROM ONLY :chunk; -- should be 5
count
-------
5

SELECT * FROM _timescaledb_catalog.compression_settings ORDER BY relid;
relid | compress_relid | segmentby | orderby | orderby_desc | orderby_nullsfirst | index
------------------------------------------+--------------------------------------------------+-----------+---------+--------------+--------------------+-------------------------------------------------------------
recomp_partial | | | {time} | {f} | {f} |
_timescaledb_internal._hyper_15_28_chunk | _timescaledb_internal.compress_hyper_16_30_chunk | | {time} | {f} | {f} | [{"type": "minmax", "column": "time", "source": "orderby"}]

DROP TABLE IF EXISTS recomp_partial CASCADE;
RESET timescaledb.enable_direct_compress_insert;
RESET timescaledb.enable_direct_compress_insert_client_sorted;
26 changes: 26 additions & 0 deletions tsl/test/sql/recompression_integrity_tests.sql
Original file line number Diff line number Diff line change
Expand Up @@ -227,5 +227,31 @@ SELECT * FROM recomp_guc_test ORDER BY time, device;
RESET timescaledb.enable_in_memory_recompression;
DROP TABLE recomp_guc_test CASCADE;

-- Test Case 8: Recompression for partial chunks. Should only recompress the columnstore part
SET timescaledb.enable_direct_compress_insert = true;
SET timescaledb.enable_direct_compress_insert_client_sorted = true;

DROP TABLE IF EXISTS recomp_partial CASCADE;

CREATE TABLE recomp_partial (time TIMESTAMPTZ NOT NULL, device TEXT, value float) WITH (tsdb.hypertable, tsdb.orderby='time');
INSERT INTO recomp_partial SELECT '2025-01-01'::timestamptz + (i || ' minute')::interval, 'd1', i::float FROM generate_series(0,100) i;
INSERT INTO recomp_partial SELECT '2025-01-01'::timestamptz + (i || ' minute')::interval, 'd1', i::float FROM generate_series(101,750) i;
-- less than 10 tuples will not undergo direct compression
INSERT INTO recomp_partial SELECT '2025-01-01'::timestamptz + (i || ' minute')::interval, 'd1', i::float FROM generate_series(751,755) i;

-- status should be compressed, partial.
SELECT chunk, _timescaledb_functions.chunk_status_text(chunk) FROM show_chunks('recomp_partial') chunk;

\set TEST_TABLE_NAME 'recomp_partial'
\ir :RECOMPRESSION_INTEGRITY_CHECK_RELPATH
-- extra checks
SELECT chunk FROM show_chunks('recomp_partial') AS chunk LIMIT 1 \gset
SELECT _timescaledb_functions.chunk_status_text(:'chunk'::regclass); -- should be compressed, partial
SELECT COUNT(*) FROM ONLY :chunk; -- should be 5
SELECT * FROM _timescaledb_catalog.compression_settings ORDER BY relid;

DROP TABLE IF EXISTS recomp_partial CASCADE;

RESET timescaledb.enable_direct_compress_insert;
RESET timescaledb.enable_direct_compress_insert_client_sorted;

Loading