diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d4e9661..24adeb2 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -37,7 +37,7 @@ jobs: loadtest: strategy: matrix: - kind: ['csv_agg', 'csv_agg_delim', 'csv_agg_delim_bom', 'postgrest'] + kind: ['postgrest', 'csv_agg', 'csv_agg_options'] name: Loadtest runs-on: ubuntu-24.04 steps: @@ -55,7 +55,7 @@ jobs: authToken: ${{ secrets.CACHIX_AUTH_TOKEN }} - name: Run loadtest - run: nix-shell --run "./bench/loadtest.sh ${{ matrix.kind }}" >> "$GITHUB_STEP_SUMMARY" + run: nix-shell --run "pg_csv-loadtest ${{ matrix.kind }}" >> "$GITHUB_STEP_SUMMARY" coverage: diff --git a/.gitignore b/.gitignore index 740489e..d453f58 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ pgbench_log.* .history pg_csv--*.sql !pg_csv--*--*.sql +tags diff --git a/Makefile b/Makefile index cdea62f..afae9ab 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ else endif EXTENSION = pg_csv -EXTVERSION = 0.4 +EXTVERSION = 1.0 DATA = $(wildcard sql/*--*.sql) @@ -68,7 +68,7 @@ $(BUILD_DIR)/$(EXTENSION).$(SHARED_EXT): $(EXTENSION).$(SHARED_EXT) sql/$(EXTENSION)--$(EXTVERSION).sql: sql/$(EXTENSION).sql cp $< $@ -$(EXTENSION).control: +$(EXTENSION).control: $(EXTENSION).control.in sed "s/@EXTVERSION@/$(EXTVERSION)/g" $(EXTENSION).control.in > $@ PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/README.md b/README.md index 5f429cb..ae2c6f2 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,17 @@ [![Coverage Status](https://coveralls.io/repos/github/PostgREST/pg_csv/badge.svg)](https://coveralls.io/github/PostgREST/pg_csv) [![Tests](https://github.com/PostgREST/pg_csv/actions/workflows/ci.yaml/badge.svg)](https://github.com/PostgREST/pg_csv/actions) +Postgres has CSV support on the [COPY](https://www.postgresql.org/docs/current/sql-copy.html) command, but `COPY` has problems: + +- It uses a special protocol, so it doesn't work with other standard features like [prepared statements](https://www.postgresql.org/docs/current/sql-prepare.html), [pipeline mode](https://www.postgresql.org/docs/current/libpq-pipeline-mode.html#LIBPQ-PIPELINE-USING) or [pgbench](https://www.postgresql.org/docs/current/pgbench.html). +- Is not composable. You can't use COPY inside CTEs, subqueries, view definitions or as function arguments. + +`pg_csv` offers flexible CSV processing as a solution. + +- Includes a CSV aggregate that composes with SQL expressions. +- Native C extension, almost 2 times faster than SQL queries that try to output CSV (see our [CI results](https://github.com/PostgREST/pg_csv/actions/runs/17367407744)). +- No dependencies except Postgres. + ## Installation Clone this repo and run: @@ -20,55 +31,82 @@ create extension pg_csv; ## csv_agg -Aggregate that builds a CSV as per [RFC 4180](https://www.ietf.org/rfc/rfc4180.txt), quoting as required. +Aggregate that builds a CSV respecting [RFC 4180](https://www.ietf.org/rfc/rfc4180.txt), quoting as required. + +```sql +create table projects as +select * +from ( + values + (1, 'Death Star OS', 1), + (2, 'Windows 95 Rebooted', 1), + (3, 'Project "Comma,Please"', 2), + (4, 'Escape ""Plan""', 2), + (NULL, 'NULL & Void', NULL) +) as _(id, name, client_id); +``` -```psql +```sql select csv_agg(x) from projects x; - csv_agg -------------------- - id,name,client_id+ - 1,Windows 7,1 + - 2,Windows 10,1 + - 3,IOS,2 + - 4,OSX,2 + - 5,Orphan, + csv_agg +-------------------------------- + id,name,client_id + + 1,Death Star OS,1 + + 2,Windows 95 Rebooted,1 + + 3,"Project ""Comma,Please""",2+ + 4,"Escape """"Plan""""",2 + + ,NULL & Void, (1 row) ``` ### Custom Delimiter -You can use a custom delimiter. +Custom delimiters can be used to produce different formats like pipe-separated values, tab-separated values or semicolon-separated values. -```psql +```sql select csv_agg(x, csv_options(delimiter := '|')) from projects x; - csv_agg -------------------- - id|name|client_id+ - 1|Windows 7|1 + - 2|Windows 10|1 + - 3|IOS|2 + - 4|OSX|2 + - 5|Orphan| + csv_agg +----------------------------- + id|name|client_id + + 1|Death Star OS|1 + + 2|Windows 95 Rebooted|1 + + 3|Open Source Lightsabers|2+ + 4|Galactic Payroll System|2+ + 7|Bugzilla Revival|3 +(1 row) + +select csv_agg(x, csv_options(delimiter := E'\t')) from projects x; + csv_agg +----------------------------------- + id name client_id + + 1 Death Star OS 1 + + 2 Windows 95 Rebooted 1+ + 3 Open Source Lightsabers 2+ + 4 Galactic Payroll System 2+ + 7 Bugzilla Revival 3 (1 row) ``` > [!NOTE] -> Newline, carriage return and double quotes are not supported as delimiters to maintain the integrity of the separated values format. +> - Newline, carriage return and double quotes are not supported as delimiters to maintain the integrity of the separated values format. +> - The delimiter can only be a single char, if a longer string is specified only the first char will be used. +> - Why use a `csv_options` constructor function instead of extra arguments? Aggregates don't support named arguments in postgres, see a discussion on https://github.com/PostgREST/pg_csv/pull/2#issuecomment-3155740589. ### BOM You can include a byte-order mark (BOM) to make the CSV compatible with Excel. -```psql +```sql select csv_agg(x, csv_options(bom := true)) from projects x; + csv_agg ------------------- id,name,client_id+ - 1,Windows 7,1 + - 2,Windows 10,1 + - 3,IOS,2 + - 4,OSX,2 + - 5,Orphan, + 1,Death Star OS,1 + 2,Windows 95 Rebooted,1 + 3,Open Source Lightsabers,2 + 4,Galactic Payroll System,2 + 5,Bugzilla Revival,3 (1 row) ``` @@ -76,14 +114,38 @@ select csv_agg(x, csv_options(bom := true)) from projects x; You can omit or include the CSV header. -```psql +```sql select csv_agg(x, csv_options(header := false)) from projects x; - csv_agg -------------------- - 1,Windows 7,1 + - 2,Windows 10,1 + - 3,IOS,2 + - 4,OSX,2 + - 5,Orphan, + + csv_agg +----------------------------- + 1,Death Star OS,1 + + 2,Windows 95 Rebooted,1 + + 3,Open Source Lightsabers,2+ + 4,Galactic Payroll System,2+ + 7,Bugzilla Revival,3 (1 row) ``` + +### Null string + +NULL values are represented by an empty string by default. This can be changed with the `nullstr` option. + +```sql +SELECT csv_agg(x, csv_options(nullstr:='')) AS body +FROM projects x; + + body +-------------------------------- + id,name,client_id + + 1,Death Star OS,1 + + 2,Windows 95 Rebooted,1 + + 3,"Project ""Comma,Please""",2+ + 4,"Escape """"Plan""""",2 + + ,NULL & Void, +(1 row) +``` + +## Limitations + +- For large bulk exports and imports, `COPY ... CSV` should still be preferred as its faster due to streaming support. diff --git a/bench/csv_agg.sql b/bench/csv_agg.sql index f3dee39..0787cdc 100644 --- a/bench/csv_agg.sql +++ b/bench/csv_agg.sql @@ -1,5 +1,5 @@ \set lim random(1000, 2000) select csv_agg(t) from ( - select * from student_emotion_assessments limit :lim + select * from orders_customers limit :lim ) as t; diff --git a/bench/csv_agg_delim.sql b/bench/csv_agg_delim.sql deleted file mode 100644 index 00074a9..0000000 --- a/bench/csv_agg_delim.sql +++ /dev/null @@ -1,5 +0,0 @@ -\set lim random(1000, 2000) - -select csv_agg(t, csv_options('|')) from ( - select * from student_emotion_assessments limit :lim -) as t; diff --git a/bench/csv_agg_delim_bom.sql b/bench/csv_agg_delim_bom.sql deleted file mode 100644 index ffdec38..0000000 --- a/bench/csv_agg_delim_bom.sql +++ /dev/null @@ -1,5 +0,0 @@ -\set lim random(1000, 2000) - -select csv_agg(t, csv_options(delimiter:=',', bom:=true)) from ( - select * from student_emotion_assessments limit :lim -) as t; diff --git a/bench/csv_agg_options.sql b/bench/csv_agg_options.sql new file mode 100644 index 0000000..c504352 --- /dev/null +++ b/bench/csv_agg_options.sql @@ -0,0 +1,5 @@ +\set lim random(1000, 2000) + +select csv_agg(t, csv_options(delimiter:='|', bom:=true, header:=false, nullstr:='')) from ( + select * from orders_customers limit :lim +) as t; diff --git a/bench/init.sql b/bench/init.sql index 3dc3442..84cab30 100644 --- a/bench/init.sql +++ b/bench/init.sql @@ -1,130 +1,129 @@ -create extension if not exists pg_csv; - -create type gender_enum as enum ('female', 'male', 'non_binary', 'prefer_not_to_say'); -create type attachment_enum as enum ('secure', 'anxious', 'avoidant', 'fearful'); -create type regulation_strategy as enum ('cognitive_reappraisal', 'suppression', 'rumination', - 'problem_solving', 'distraction', 'other'); - -create table student_emotion_assessments ( - -- identifiers - assessment_id bigserial primary key, - student_uuid uuid not null, - institution_id int not null, - - -- demographics - gender gender_enum not null, - birth_date date not null, - nationality text not null, - socioeconomic_level text not null, - - -- academic context - faculty text not null, - degree_program text not null, - year_of_study smallint not null check (year_of_study between 1 and 7), - current_gpa numeric(3,2) not null check (current_gpa between 0 and 4), - credits_completed int not null check (credits_completed >= 0), - enrollment_status boolean not null default true, -- true = active student - - -- attachment style - attachment_style attachment_enum not null, - attachment_score_anxiety numeric(4,2) not null check (attachment_score_anxiety between 1 and 7), - attachment_score_avoidant numeric(4,2) not null check (attachment_score_avoidant between 1 and 7), - - -- difficulties in emotion regulation scale (ders-18) sub-scores - ders_non_acceptance smallint not null check (ders_non_acceptance between 6 and 30), - ders_goals smallint not null check (ders_goals between 5 and 25), - ders_impulse smallint not null check (ders_impulse between 6 and 30), - ders_awareness smallint not null check (ders_awareness between 6 and 30), - ders_strategy smallint not null check (ders_strategy between 8 and 40), - ders_clarity smallint not null check (ders_clarity between 5 and 25), - ders_total smallint generated always as - (ders_non_acceptance + ders_goals + ders_impulse + - ders_awareness + ders_strategy + ders_clarity) stored, - - -- emotion-regulation strategy prevalence (likert 1-5) - uses_reappraisal smallint not null check (uses_reappraisal between 1 and 5), - uses_suppression smallint not null check (uses_suppression between 1 and 5), - uses_rumination smallint not null check (uses_rumination between 1 and 5), - predominant_strategy regulation_strategy not null, +-- based on the northwind database https://github.com/pthom/northwind_psql +-- the idea is to use the aggregate over a relation with lots of columns to test the performance - -- well-being & mental-health screeners - perceived_stress smallint not null check (perceived_stress between 0 and 40), - anxiety_score_gad7 smallint not null check (anxiety_score_gad7 between 0 and 21), - depression_score_phq9 smallint not null check (depression_score_phq9 between 0 and 27), +create extension if not exists pg_csv; - -- environmental variables - living_with_family boolean not null, - weekly_work_hours smallint not null check (weekly_work_hours between 0 and 60), - social_support_index smallint not null check (social_support_index between 12 and 84), +CREATE TABLE customers ( + customer_id CHAR(5) PRIMARY KEY, + company_name TEXT NOT NULL, + contact_name TEXT, + contact_title TEXT, + address TEXT, + city TEXT, + region TEXT, + postal_code TEXT, + country TEXT, + phone TEXT, + fax TEXT +); - -- audit fields - administered_by text not null, -- name/id of interviewer or system - collected_at timestamptz not null default now(), - updated_at timestamptz not null default now(), - constraint updated_at_future check (updated_at <= now()) +CREATE TABLE orders ( + order_id BIGSERIAL PRIMARY KEY, + customer_id CHAR(5) NOT NULL REFERENCES customers(customer_id) ON DELETE CASCADE, + employee_id SMALLINT, + order_date DATE, + required_date DATE, + shipped_date DATE, + freight NUMERIC(10,2) DEFAULT 0 CHECK (freight >= 0), + ship_name TEXT, + ship_address TEXT, + ship_city TEXT, + ship_region TEXT, + ship_postal_code TEXT, + ship_country TEXT ); -INSERT INTO student_emotion_assessments ( - student_uuid, institution_id, gender, birth_date, nationality, socioeconomic_level, - faculty, degree_program, year_of_study, current_gpa, credits_completed, enrollment_status, - attachment_style, attachment_score_anxiety, attachment_score_avoidant, - ders_non_acceptance, ders_goals, ders_impulse, ders_awareness, ders_strategy, ders_clarity, - uses_reappraisal, uses_suppression, uses_rumination, predominant_strategy, - perceived_stress, anxiety_score_gad7, depression_score_phq9, - living_with_family, weekly_work_hours, social_support_index, - administered_by +-- generate seed data +-- three groups of 100 by city/country +INSERT INTO customers ( + customer_id, company_name, contact_name, contact_title, + address, city, region, postal_code, country, phone, fax +) +SELECT + ('C' || lpad(i::text, 4, '0'))::char(5) AS customer_id, + 'Company ' || i AS company_name, + 'Contact ' || i AS contact_name, + CASE + WHEN i <= 100 THEN 'Owner' + WHEN i <= 200 THEN 'Sales Manager' + ELSE 'Purchasing' + END AS contact_title, + i::text || ' Main Street' AS address, + CASE + WHEN i <= 100 THEN 'Seattle' + WHEN i <= 200 THEN 'London' + ELSE 'Sao Paulo' + END AS city, + CASE + WHEN i <= 100 THEN 'WA' + WHEN i <= 200 THEN NULL + ELSE 'SP' + END AS region, + (10000 + i)::text AS postal_code, + CASE + WHEN i <= 100 THEN 'USA' + WHEN i <= 200 THEN 'UK' + ELSE 'Brazil' + END AS country, + '+1-555-' || lpad(i::text, 4, '0') AS phone, + CASE + WHEN right(i::text, 1) IN ('0','5') THEN NULL + ELSE '+1-555-' || lpad((i + 1000)::text, 4, '0') + END AS fax +FROM generate_series(1, 300) AS s(i); + +-- 2700 orders, 9 orders per customer +WITH base AS ( + SELECT c.customer_id, c.company_name, c.address, c.city, c.region, c.postal_code, c.country + FROM customers c +) +INSERT INTO orders ( + customer_id, employee_id, order_date, required_date, shipped_date, + freight, ship_name, ship_address, ship_city, ship_region, ship_postal_code, ship_country ) SELECT - gen_random_uuid(), -- student_uuid - 1 + (i % 5), -- institution_id 1-5 - CASE (i % 4) - WHEN 0 THEN 'female' - WHEN 1 THEN 'male' - WHEN 2 THEN 'non_binary' - ELSE 'prefer_not_to_say' - END::gender_enum, - (CURRENT_DATE - ((18 + (i % 10)) * INTERVAL '1 year'))::date, - 'Country ' || i, - CASE WHEN i % 3 = 0 THEN 'alto' - WHEN i % 3 = 1 THEN 'medio' - ELSE 'bajo' - END, - CASE WHEN i % 2 = 0 THEN 'Psychology' ELSE 'Engineering' END, - CASE WHEN i % 2 = 0 THEN 'BSc' ELSE 'BA' END, - (i % 7) + 1, - round((random()*4)::numeric, 2)::numeric(3,2), -- GPA 0-4.00 - (i * 10) % 200, - TRUE, - CASE (i % 4) - WHEN 0 THEN 'secure' - WHEN 1 THEN 'anxious' - WHEN 2 THEN 'avoidant' - ELSE 'fearful' - END::attachment_enum, - 3 + (i % 4), - 3 + ((i + 2) % 4), - 10 + (i % 15), -- ders_non_acceptance - 10 + ((i+1) % 15), -- ders_goals - 10 + ((i+2) % 15), -- ders_impulse - 10 + ((i+3) % 15), -- ders_awareness - 15 + (i % 10), -- ders_strategy - 5 + ((i+4) % 21), -- ders_clarity (range 5-25) - 1 + (i % 5), -- uses_reappraisal - 1 + ((i+1) % 4), -- uses_suppression - 1 + ((i+2) % 3), -- uses_rumination - CASE (i % 6) - WHEN 0 THEN 'cognitive_reappraisal' - WHEN 1 THEN 'suppression' - WHEN 2 THEN 'rumination' - WHEN 3 THEN 'problem_solving' - WHEN 4 THEN 'distraction' - ELSE 'other' - END::regulation_strategy, - 10 + (i % 20), - 2 + (i % 5), - 3 + (i % 9), - (i % 2 = 0), - (i % 61), - 20 + (i % 50), - 'seed_script' -FROM generate_series(1, 3000) AS s(i); + b.customer_id, + n::smallint AS employee_id, + (DATE '2024-01-01' + (n || ' day')::interval)::date AS order_date, + (DATE '2024-01-01' + ((n + 7) || ' day')::interval)::date AS required_date, + CASE WHEN n = 9 THEN NULL + ELSE (DATE '2024-01-01' + ((n + 3) || ' day')::interval)::date + END AS shipped_date, + (10 + n)::numeric(10,2) AS freight, + b.company_name AS ship_name, + b.address AS ship_address, + b.city AS ship_city, + b.region AS ship_region, + b.postal_code AS ship_postal_code, + b.country AS ship_country +FROM base b +CROSS JOIN generate_series(1, 9) AS n; + +-- create a view to have more columns +CREATE OR REPLACE VIEW orders_customers AS +SELECT + o.order_id, + o.customer_id, + c.company_name, + c.contact_name, + c.contact_title, + c.address AS customer_address, + c.city AS customer_city, + c.region AS customer_region, + c.postal_code AS customer_postal_code, + c.country AS customer_country, + c.phone, + c.fax, + o.employee_id, + o.order_date, + o.required_date, + o.shipped_date, + o.freight, + o.ship_name, + o.ship_address, + o.ship_city, + o.ship_region, + o.ship_postal_code, + o.ship_country +FROM orders o +JOIN customers c USING (customer_id); diff --git a/bench/loadtest.sh b/bench/loadtest.sh deleted file mode 100755 index 7c9c2d7..0000000 --- a/bench/loadtest.sh +++ /dev/null @@ -1 +0,0 @@ -xpg pgbench -n -c 1 -T 30 -M prepared -f ./bench/$1.sql diff --git a/bench/postgrest.sql b/bench/postgrest.sql index 7bfc2d4..0e0cf21 100644 --- a/bench/postgrest.sql +++ b/bench/postgrest.sql @@ -1,7 +1,7 @@ \set lim random(1000, 2000) with pgrst_source as ( - select * from student_emotion_assessments + select * from orders_customers limit :lim ) select (select coalesce(string_agg(a.k, ','), '') from (select json_object_keys(r)::text as k from (select row_to_json(hh) as r from pgrst_source as hh limit 1) _) a) || diff --git a/pg_csv.control.in b/pg_csv.control.in index 2a67d78..ec77d89 100644 --- a/pg_csv.control.in +++ b/pg_csv.control.in @@ -1,2 +1,3 @@ default_version = '@EXTVERSION@' relocatable = true +module_pathname = '$libdir/pg_csv' diff --git a/shell.nix b/shell.nix index 86f87f3..aaad4e2 100644 --- a/shell.nix +++ b/shell.nix @@ -20,10 +20,32 @@ mkShell { ${clang-tools}/bin/clang-format -i src/* ${git}/bin/git diff-index --exit-code HEAD -- '*.c' ''; + loadtest = + writeShellScriptBin "pg_csv-loadtest" '' + set -euo pipefail + + file=./bench/$1.sql + + cat <delim = ','; - csv_opts->with_bom = false; - csv_opts->header = true; + csv_opts->delimiter = ','; + csv_opts->bom = false; + csv_opts->header = true; + csv_opts->nullstr = NULL; if (opts_hdr == NULL) return; TupleDesc desc = lookup_rowtype_tupdesc(HeapTupleHeaderGetTypeId(opts_hdr), HeapTupleHeaderGetTypMod(opts_hdr)); - Datum values[3]; - bool nulls[3]; + Datum values[csv_options_count]; + bool nulls[csv_options_count]; heap_deform_tuple( &(HeapTupleData){.t_len = HeapTupleHeaderGetDatumLength(opts_hdr), .t_data = opts_hdr}, desc, values, nulls); if (!nulls[0]) { - csv_opts->delim = DatumGetChar(values[0]); - if (is_reserved(csv_opts->delim)) + csv_opts->delimiter = DatumGetChar(values[0]); + if (is_reserved(csv_opts->delimiter)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("delimiter cannot be newline, carriage return or " "double quote"))); } if (!nulls[1]) { - csv_opts->with_bom = DatumGetBool(values[1]); + csv_opts->bom = DatumGetBool(values[1]); } if (!nulls[2]) { csv_opts->header = DatumGetBool(values[2]); } + if (!nulls[3]) { + csv_opts->nullstr = DatumGetTextPP(values[3]); + } + ReleaseTupleDesc(desc); } diff --git a/src/aggs.h b/src/aggs.h index 1602923..1bfaa4c 100644 --- a/src/aggs.h +++ b/src/aggs.h @@ -1,20 +1,30 @@ #ifndef AGGS_H #define AGGS_H +// mirrors the SQL csv_options type typedef struct { - char delim; - bool with_bom; - bool header; + char delimiter; + bool bom; + bool header; + text *nullstr; } CsvOptions; +#define csv_options_count 4 typedef struct { StringInfoData accum_buf; bool header_done; bool first_row; TupleDesc tupdesc; + int nullstr_len; CsvOptions *options; + char *cached_nullstr; } CsvAggState; +extern const char NEWLINE; +extern const char BOM[3]; +extern const char DQUOTE; +extern const char CR; + void parse_csv_options(HeapTupleHeader opts_hdr, CsvOptions *csv_opts); void csv_append_field(StringInfo buf, const char *s, size_t n, char delim); diff --git a/src/general.c b/src/general.c deleted file mode 100644 index 288f192..0000000 --- a/src/general.c +++ /dev/null @@ -1,6 +0,0 @@ -#include "general.h" - -const char NEWLINE = '\n'; -const char DQUOTE = '"'; -const char CR = '\r'; -const char BOM[3] = "\xEF\xBB\xBF"; diff --git a/src/general.h b/src/general.h deleted file mode 100644 index adc50a4..0000000 --- a/src/general.h +++ /dev/null @@ -1,9 +0,0 @@ -#ifndef GENERAL_H -#define GENERAL_H - -extern const char NEWLINE; -extern const char BOM[3]; -extern const char DQUOTE; -extern const char CR; - -#endif diff --git a/src/pg_csv.c b/src/pg_csv.c index b9ced71..4937a86 100644 --- a/src/pg_csv.c +++ b/src/pg_csv.c @@ -1,11 +1,13 @@ +// This is the top module, all SQL exposed functions will be in this file + #define PG_PRELUDE_IMPL #include "pg_prelude.h" #include "aggs.h" -#include "general.h" PG_MODULE_MAGIC; +// aggregate final function PG_FUNCTION_INFO_V1(csv_agg_finalfn); Datum csv_agg_finalfn(PG_FUNCTION_ARGS) { if (PG_ARGISNULL(0)) PG_RETURN_NULL(); @@ -17,6 +19,7 @@ Datum csv_agg_finalfn(PG_FUNCTION_ARGS) { PG_RETURN_TEXT_P(cstring_to_text_with_len(state->accum_buf.data, state->accum_buf.len)); } +// aggregate transition function PG_FUNCTION_INFO_V1(csv_agg_transfn); Datum csv_agg_transfn(PG_FUNCTION_ARGS) { CsvAggState *state = !PG_ARGISNULL(0) ? (CsvAggState *)PG_GETARG_POINTER(0) : NULL; @@ -29,22 +32,30 @@ Datum csv_agg_transfn(PG_FUNCTION_ARGS) { MemoryContext aggctx, oldctx; if (!AggCheckCallContext(fcinfo, &aggctx)) - elog(ERROR, "csv_agg_transfn called in non‑aggregate context"); + elog(ERROR, "%s called in non‑aggregate context", __func__); + // here we extend the lifetime of the CsvAggState until the aggregate finishes oldctx = MemoryContextSwitchTo(aggctx); state = palloc(sizeof(CsvAggState)); initStringInfo(&state->accum_buf); - state->header_done = false; - state->first_row = true; - state->tupdesc = NULL; - state->options = palloc(sizeof(CsvOptions)); + state->header_done = false; + state->first_row = true; + state->tupdesc = NULL; + state->nullstr_len = 0; + state->cached_nullstr = NULL; + state->options = palloc(sizeof(CsvOptions)); // we'll parse the csv options only once HeapTupleHeader opts_hdr = PG_NARGS() >= 3 && !PG_ARGISNULL(2) ? PG_GETARG_HEAPTUPLEHEADER(2) : NULL; parse_csv_options(opts_hdr, state->options); + if (state->options->nullstr) { + state->cached_nullstr = text_to_cstring(state->options->nullstr); + state->nullstr_len = VARSIZE_ANY_EXHDR(state->options->nullstr); + } + MemoryContextSwitchTo(oldctx); } @@ -55,7 +66,7 @@ Datum csv_agg_transfn(PG_FUNCTION_ARGS) { TupleDesc tdesc = lookup_rowtype_tupdesc(HeapTupleHeaderGetTypeId(next), HeapTupleHeaderGetTypMod(next)); - if (state->options->with_bom) appendBinaryStringInfo(&state->accum_buf, BOM, sizeof(BOM)); + if (state->options->bom) appendBinaryStringInfo(&state->accum_buf, BOM, sizeof(BOM)); // build header row if (state->options->header) { @@ -65,10 +76,10 @@ Datum csv_agg_transfn(PG_FUNCTION_ARGS) { continue; if (i > 0) // only append delimiter after the first value - appendStringInfoChar(&state->accum_buf, state->options->delim); + appendStringInfoChar(&state->accum_buf, state->options->delimiter); char *cstr = NameStr(att->attname); - csv_append_field(&state->accum_buf, cstr, strlen(cstr), state->options->delim); + csv_append_field(&state->accum_buf, cstr, strlen(cstr), state->options->delimiter); } appendStringInfoChar(&state->accum_buf, NEWLINE); @@ -81,9 +92,10 @@ Datum csv_agg_transfn(PG_FUNCTION_ARGS) { // build body int tuple_natts = state->tupdesc->natts; - Datum *datums = (Datum *)palloc(tuple_natts * sizeof(Datum)); - bool *nulls = (bool *)palloc(tuple_natts * sizeof(bool)); + Datum *datums = (Datum *)palloc(mul_size(tuple_natts, sizeof(Datum))); + bool *nulls = (bool *)palloc(mul_size(tuple_natts, sizeof(bool))); + // extract the values of the next row heap_deform_tuple( &(HeapTupleData){ .t_len = HeapTupleHeaderGetDatumLength(next), @@ -102,12 +114,16 @@ Datum csv_agg_transfn(PG_FUNCTION_ARGS) { if (att->attisdropped) // pg always keeps dropped columns, guard against this continue; - if (i > 0) appendStringInfoChar(&state->accum_buf, state->options->delim); + if (i > 0) appendStringInfoChar(&state->accum_buf, state->options->delimiter); - if (nulls[i]) continue; // empty field for NULL - - char *cstr = datum_to_cstring(datums[i], att->atttypid); - csv_append_field(&state->accum_buf, cstr, strlen(cstr), state->options->delim); + if (nulls[i]) { + if (state->cached_nullstr) + csv_append_field(&state->accum_buf, state->cached_nullstr, state->nullstr_len, + state->options->delimiter); + } else { + char *cstr = datum_to_cstring(datums[i], att->atttypid); + csv_append_field(&state->accum_buf, cstr, strlen(cstr), state->options->delimiter); + } } PG_RETURN_POINTER(state); diff --git a/test/expected/nullstr.out b/test/expected/nullstr.out new file mode 100644 index 0000000..e212e06 --- /dev/null +++ b/test/expected/nullstr.out @@ -0,0 +1,55 @@ +-- custom null string +SELECT csv_agg(x, csv_options(nullstr:='')) AS body +FROM projects x; + body +------------------------------- + id,name,client_id + + 1,Windows 7,1 + + 2,"has,comma",1 + + ,, + + 4,OSX,2 + + ,"has""quote", + + 5,"has,comma and ""quote""",7+ + 6,"has + + LF",7 + + 7,"has \r CR",8 + + 8,"has \r + + CRLF""",8 +(1 row) + +-- custom null string with no header +SELECT csv_agg(x, csv_options(nullstr:='NULL', header:=false)) AS body +FROM projects x; + body +------------------------------- + 1,Windows 7,1 + + 2,"has,comma",1 + + NULL,NULL,NULL + + 4,OSX,2 + + NULL,"has""quote",NULL + + 5,"has,comma and ""quote""",7+ + 6,"has + + LF",7 + + 7,"has \r CR",8 + + 8,"has \r + + CRLF""",8 +(1 row) + +-- custom null string with no header and delimiter +SELECT csv_agg(x, csv_options(nullstr:='~', delimiter:='|', header:=false)) AS body +FROM projects x; + body +------------------------------- + 1|Windows 7|1 + + 2|has,comma|1 + + ~|~|~ + + 4|OSX|2 + + ~|"has""quote"|~ + + 5|"has,comma and ""quote"""|7+ + 6|"has + + LF"|7 + + 7|"has \r CR"|8 + + 8|"has \r + + CRLF"""|8 +(1 row) + diff --git a/test/sql/nullstr.sql b/test/sql/nullstr.sql new file mode 100644 index 0000000..5e52103 --- /dev/null +++ b/test/sql/nullstr.sql @@ -0,0 +1,11 @@ +-- custom null string +SELECT csv_agg(x, csv_options(nullstr:='')) AS body +FROM projects x; + +-- custom null string with no header +SELECT csv_agg(x, csv_options(nullstr:='NULL', header:=false)) AS body +FROM projects x; + +-- custom null string with no header and delimiter +SELECT csv_agg(x, csv_options(nullstr:='~', delimiter:='|', header:=false)) AS body +FROM projects x;