Skip to content

Commit 010a1aa

Browse files
committed
feat: improve DX of csv_agg with csv_options
Now we use the following interface for the agg: ```sql SELECT csv_agg(x, csv_options(';')) SELECT csv_agg(x, csv_options(delimiter := '|')) SELECT csv_agg(x) -- works as usual ``` This using a composite type plus a function constructor. Also bumps version to 0.2.
1 parent dc90cba commit 010a1aa

File tree

9 files changed

+128
-37
lines changed

9 files changed

+128
-37
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ results/
88
pgbench_log.*
99
.history
1010
pg_csv--*.sql
11+
!pg_csv--*--*.sql

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ else
2626
endif
2727

2828
EXTENSION = pg_csv
29-
EXTVERSION = 0.1
29+
EXTVERSION = 0.2
3030

3131
DATA = $(wildcard sql/*--*.sql)
3232

bench/csv_agg_delim.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
\set lim random(1000, 2000)
22

3-
select csv_agg(t,'|') from (
3+
select csv_agg(t, csv_options('|')) from (
44
select * from student_emotion_assessments limit :lim
55
) as t;

sql/pg_csv--0.1--0.2.sql

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
drop aggregate if exists csv_agg (anyelement, "char");
2+
drop function if exists csv_agg_transfn (internal, anyelement, "char");
3+
4+
create type csv_options as (
5+
delimiter "char"
6+
);
7+
8+
create function csv_options(delimiter "char" default ',') returns csv_options as $$
9+
select row(delimiter)::csv_options;
10+
$$ language sql;
11+
12+
create function csv_agg_transfn(internal, anyelement, csv_options)
13+
returns internal
14+
language c
15+
as 'pg_csv';
16+
17+
create aggregate csv_agg(anyelement, csv_options) (
18+
sfunc = csv_agg_transfn,
19+
stype = internal,
20+
finalfunc = csv_agg_finalfn,
21+
parallel = safe
22+
);
23+

sql/pg_csv--0.1.sql

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
create function csv_agg_transfn(internal, anyelement)
2+
returns internal
3+
language c
4+
as 'pg_csv';
5+
6+
create function csv_agg_transfn(internal, anyelement, "char")
7+
returns internal
8+
language c
9+
as 'pg_csv';
10+
11+
create function csv_agg_finalfn(internal)
12+
returns text
13+
language c
14+
as 'pg_csv';
15+
16+
create aggregate csv_agg(anyelement, "char") (
17+
sfunc = csv_agg_transfn,
18+
stype = internal,
19+
finalfunc = csv_agg_finalfn,
20+
parallel = safe
21+
);
22+
23+
create aggregate csv_agg(anyelement) (
24+
sfunc = csv_agg_transfn,
25+
stype = internal,
26+
finalfunc = csv_agg_finalfn,
27+
parallel = safe
28+
);

sql/pg_csv.sql

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
1+
create type csv_options as (
2+
delimiter "char"
3+
);
4+
5+
create function csv_options(delimiter "char" default ',') returns csv_options as $$
6+
select row(delimiter)::csv_options;
7+
$$ language sql;
8+
19
create function csv_agg_transfn(internal, anyelement)
210
returns internal
311
language c
412
as 'pg_csv';
513

6-
create function csv_agg_transfn(internal, anyelement, "char")
14+
create function csv_agg_transfn(internal, anyelement, csv_options)
715
returns internal
816
language c
917
as 'pg_csv';
@@ -13,16 +21,17 @@ create function csv_agg_finalfn(internal)
1321
language c
1422
as 'pg_csv';
1523

16-
create aggregate csv_agg(anyelement, "char") (
24+
create aggregate csv_agg(anyelement) (
1725
sfunc = csv_agg_transfn,
1826
stype = internal,
1927
finalfunc = csv_agg_finalfn,
2028
parallel = safe
2129
);
2230

23-
create aggregate csv_agg(anyelement) (
31+
create aggregate csv_agg(anyelement, csv_options) (
2432
sfunc = csv_agg_transfn,
2533
stype = internal,
2634
finalfunc = csv_agg_finalfn,
2735
parallel = safe
2836
);
37+

src/pg_csv.c

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,16 @@ static const char NEWLINE = '\n';
66
static const char DQUOTE = '"';
77
static const char CR = '\r';
88

9+
typedef struct {
10+
char delim;
11+
} CsvOptions;
12+
913
typedef struct {
1014
StringInfoData accum_buf;
1115
bool header_done;
1216
bool first_row;
1317
TupleDesc tupdesc;
18+
CsvOptions *options;
1419
} CsvAggState;
1520

1621
static inline bool is_reserved(char c) {
@@ -48,40 +53,65 @@ static char *datum_to_cstring(Datum datum, Oid typeoid) {
4853
return OidOutputFunctionCall(out_func, datum);
4954
}
5055

56+
static void parse_csv_options(HeapTupleHeader opts_hdr, CsvOptions *csv_opts) {
57+
// defaults
58+
csv_opts->delim = ',';
59+
60+
if (opts_hdr == NULL) return;
61+
62+
TupleDesc desc = lookup_rowtype_tupdesc(HeapTupleHeaderGetTypeId(opts_hdr),
63+
HeapTupleHeaderGetTypMod(opts_hdr));
64+
65+
Datum values[1];
66+
bool nulls[1];
67+
68+
heap_deform_tuple(
69+
&(HeapTupleData){.t_len = HeapTupleHeaderGetDatumLength(opts_hdr), .t_data = opts_hdr}, desc,
70+
values, nulls);
71+
72+
if (!nulls[0]) {
73+
csv_opts->delim = DatumGetChar(values[0]);
74+
if (is_reserved(csv_opts->delim))
75+
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
76+
errmsg("delimiter cannot be newline, carriage return or "
77+
"double quote")));
78+
}
79+
80+
ReleaseTupleDesc(desc);
81+
}
82+
5183
PG_FUNCTION_INFO_V1(csv_agg_transfn);
5284
Datum csv_agg_transfn(PG_FUNCTION_ARGS) {
53-
CsvAggState *state;
85+
CsvAggState *state = !PG_ARGISNULL(0) ? (CsvAggState *)PG_GETARG_POINTER(0) : NULL;
86+
HeapTupleHeader next = !PG_ARGISNULL(1) ? PG_GETARG_HEAPTUPLEHEADER(1) : NULL;
5487

5588
// first call when the accumulator is NULL
5689
// pretty standard stuff, for example see the jsonb_agg transition function
5790
// https://github.com/postgres/postgres/blob/3c4e26a62c31ebe296e3aedb13ac51a7a35103bd/src/backend/utils/adt/jsonb.c#L1521
58-
if (PG_ARGISNULL(0)) {
91+
if (state == NULL) {
5992
MemoryContext aggctx, oldctx;
6093

6194
if (!AggCheckCallContext(fcinfo, &aggctx))
6295
elog(ERROR, "csv_agg_transfn called in non‑aggregate context");
6396

6497
oldctx = MemoryContextSwitchTo(aggctx);
6598

66-
state = (CsvAggState *)palloc(sizeof(CsvAggState));
99+
state = palloc(sizeof(CsvAggState));
67100
initStringInfo(&state->accum_buf);
68101
state->header_done = false;
69102
state->first_row = true;
70103
state->tupdesc = NULL;
104+
state->options = palloc(sizeof(CsvOptions));
71105

72-
MemoryContextSwitchTo(oldctx);
73-
} else
74-
state = (CsvAggState *)PG_GETARG_POINTER(0);
75-
76-
if (PG_ARGISNULL(1)) PG_RETURN_POINTER(state); // skip NULL rows
77-
78-
HeapTupleHeader next = PG_GETARG_HEAPTUPLEHEADER(1);
106+
// we'll parse the csv options only once
107+
HeapTupleHeader opts_hdr =
108+
PG_NARGS() >= 3 && !PG_ARGISNULL(2) ? PG_GETARG_HEAPTUPLEHEADER(2) : NULL;
109+
parse_csv_options(opts_hdr, state->options);
79110

80-
char delim = PG_NARGS() >= 3 && !PG_ARGISNULL(2) ? PG_GETARG_CHAR(2) : ',';
111+
MemoryContextSwitchTo(oldctx);
112+
}
81113

82-
if (is_reserved(delim))
83-
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
84-
errmsg("delimiter cannot be newline, carriage return or double quote")));
114+
if (next == NULL) PG_RETURN_POINTER(state); // skip NULL rows
85115

86116
// build header and cache tupdesc once
87117
if (!state->header_done) {
@@ -95,10 +125,10 @@ Datum csv_agg_transfn(PG_FUNCTION_ARGS) {
95125
continue;
96126

97127
if (i > 0) // only append delimiter after the first value
98-
appendStringInfoChar(&state->accum_buf, delim);
128+
appendStringInfoChar(&state->accum_buf, state->options->delim);
99129

100130
char *cstr = NameStr(att->attname);
101-
csv_append_field(&state->accum_buf, cstr, strlen(cstr), delim);
131+
csv_append_field(&state->accum_buf, cstr, strlen(cstr), state->options->delim);
102132
}
103133

104134
appendStringInfoChar(&state->accum_buf, NEWLINE);
@@ -131,12 +161,12 @@ Datum csv_agg_transfn(PG_FUNCTION_ARGS) {
131161
if (att->attisdropped) // pg always keeps dropped columns, guard against this
132162
continue;
133163

134-
if (i > 0) appendStringInfoChar(&state->accum_buf, delim);
164+
if (i > 0) appendStringInfoChar(&state->accum_buf, state->options->delim);
135165

136166
if (nulls[i]) continue; // empty field for NULL
137167

138168
char *cstr = datum_to_cstring(datums[i], att->atttypid);
139-
csv_append_field(&state->accum_buf, cstr, strlen(cstr), delim);
169+
csv_append_field(&state->accum_buf, cstr, strlen(cstr), state->options->delim);
140170
}
141171

142172
PG_RETURN_POINTER(state);

test/expected/delimiters.out

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
-- semicolon delimiter
2-
SELECT csv_agg(x, ';') AS body
2+
SELECT csv_agg(x, csv_options(';')) AS body
33
FROM projects x;
44
body
55
-------------------------------
@@ -17,8 +17,8 @@ FROM projects x;
1717
CRLF""";8
1818
(1 row)
1919

20-
-- pipe delimiter
21-
SELECT csv_agg(x, '|') AS body
20+
-- pipe delimiter, named params work too
21+
SELECT csv_agg(x, csv_options(delimiter := '|')) AS body
2222
FROM projects x;
2323
body
2424
-------------------------------
@@ -37,7 +37,7 @@ FROM projects x;
3737
(1 row)
3838

3939
-- tab delimiter
40-
SELECT csv_agg(x, E'\t') AS body
40+
SELECT csv_agg(x, csv_options(E'\t')) AS body
4141
FROM projects x;
4242
body
4343
-------------------------------------------
@@ -56,14 +56,14 @@ FROM projects x;
5656
(1 row)
5757

5858
-- newline is forbidden as delimiter
59-
SELECT csv_agg(x, E'\n') AS body
59+
SELECT csv_agg(x, csv_options(E'\n')) AS body
6060
FROM projects x;
6161
ERROR: delimiter cannot be newline, carriage return or double quote
6262
-- double quote is forbidden as delimiter
63-
SELECT csv_agg(x, '"') AS body
63+
SELECT csv_agg(x, csv_options('"')) AS body
6464
FROM projects x;
6565
ERROR: delimiter cannot be newline, carriage return or double quote
6666
-- carriage return is forbidden as delimiter
67-
SELECT csv_agg(x, E'\r') AS body
67+
SELECT csv_agg(x, csv_options(E'\r')) AS body
6868
FROM projects x;
6969
ERROR: delimiter cannot be newline, carriage return or double quote

test/sql/delimiters.sql

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
11
-- semicolon delimiter
2-
SELECT csv_agg(x, ';') AS body
2+
SELECT csv_agg(x, csv_options(';')) AS body
33
FROM projects x;
44

5-
-- pipe delimiter
6-
SELECT csv_agg(x, '|') AS body
5+
-- pipe delimiter, named params work too
6+
SELECT csv_agg(x, csv_options(delimiter := '|')) AS body
77
FROM projects x;
88

99
-- tab delimiter
10-
SELECT csv_agg(x, E'\t') AS body
10+
SELECT csv_agg(x, csv_options(E'\t')) AS body
1111
FROM projects x;
1212

1313
-- newline is forbidden as delimiter
14-
SELECT csv_agg(x, E'\n') AS body
14+
SELECT csv_agg(x, csv_options(E'\n')) AS body
1515
FROM projects x;
1616

1717
-- double quote is forbidden as delimiter
18-
SELECT csv_agg(x, '"') AS body
18+
SELECT csv_agg(x, csv_options('"')) AS body
1919
FROM projects x;
2020

2121
-- carriage return is forbidden as delimiter
22-
SELECT csv_agg(x, E'\r') AS body
22+
SELECT csv_agg(x, csv_options(E'\r')) AS body
2323
FROM projects x;

0 commit comments

Comments
 (0)