diff --git a/README.md b/README.md index 6dced75..570acb4 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ The exporter will automatically detect if the output location ends in ## Install -Go must be installed, version 1.20 or later. +Go must be installed, version 1.25 or later. Then: @@ -47,7 +47,7 @@ baremaps-exporter --help All of the options: ``` export baremaps-compatible tilesets from a postgis server -Usage: baremaps-exporter [--output OUTPUT] [--mbtiles] [--dsn DSN] [--workers WORKERS] [--tileversion TILEVERSION] [--zoom ZOOM] [--file FILE] TILEJSON +Usage: baremaps-exporter [--output OUTPUT] [--mbtiles] [--dsn DSN] [--init INIT] [--workers WORKERS] [--batch BATCH] [--tileversion TILEVERSION] [--zoom ZOOM] [--file FILE] TILEJSON Positional arguments: TILEJSON input tilejson file @@ -57,8 +57,11 @@ Options: output file or directory --mbtiles output mbtiles instead of files (automatically selected if output filename ends in '.mbtiles') --dsn DSN, -d DSN database connection string (dsn) for postgis + --init INIT initialization SQL statement that is sent on connection/session start, for any specific optimizations --workers WORKERS, -w WORKERS - number of workers to spawn [default: 48] + number of workers to spawn [default: 72] + --batch BATCH, -b BATCH + size of the batch to query and write at once [default: 10] --tileversion TILEVERSION version of the tileset (string) written to mbtiles metadata --zoom ZOOM comma-delimited set specific zooms to export (eg: 2,4,6,8) @@ -71,6 +74,21 @@ Typical usage: baremaps-exporter -o ./tiles/ -d 'postgres://baremaps:baremaps@localhost:5432/baremaps' tiles.json ``` +## Performance Tuning + +By default, the exporter disables JIT since that often slows down large +numbers of queries required to export high volumes of tiles. + +Additional SQL configuration commands may be useful for tuning. These commands +are executed by the exporter at the start of a connection session and can be +specified with `--init`. + +A particularly useful one is: `SET enable_bitmapscan = off;`. It is common for +the planner to _think_ that there's a lot of features and prefer bitmap scans +over going to the geospatial index first. This disables the planner from doing +that, so it first filters by the features in the geospatial region (tile +coordinates), and then filters the features within that geospatial region. + ## LICENSE This work is licensed by [FlightAware](https://flightaware.com) under the [BSD 3-Clause License](./LICENSE.md). diff --git a/cmd/baremaps-exporter/main.go b/cmd/baremaps-exporter/main.go index 361922c..7381091 100644 --- a/cmd/baremaps-exporter/main.go +++ b/cmd/baremaps-exporter/main.go @@ -1,30 +1,11 @@ package main import ( - "context" - "fmt" "runtime" - "strconv" "strings" - "sync" - "time" - - "github.com/flightaware/baremaps-exporter/v2/pkg/tileutils" "github.com/alexflint/go-arg" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/twpayne/go-mbtiles" - "golang.org/x/exp/slices" -) - -const ( - progressUpdateRate = time.Duration(15) * time.Second - mbTilesBatchSize = 10 -) - -var ( - workerProgress = map[int]int{} // workerProgress checks how many tiles each worker has completed - workerProgressMutex sync.Mutex + "github.com/flightaware/baremaps-exporter/v2/pkg/exporter" ) type Args struct { @@ -32,7 +13,9 @@ type Args struct { Output string `arg:"-o,--output" help:"output file or directory"` MbTiles bool `arg:"--mbtiles" help:"output mbtiles instead of files (automatically selected if output filename ends in '.mbtiles')"` Dsn string `arg:"-d,--dsn" help:"database connection string (dsn) for postgis"` + InitSQLCmd string `arg:"--init" help:"initialization SQL statement that is sent on connection/session start, for any specific optimizations"` NumWorkers int `arg:"-w,--workers" help:"number of workers to spawn"` + BatchSize uint `arg:"-b,--batch" help:"size of the batch to query and write at once"` Version string `arg:"--tileversion" help:"version of the tileset (string) written to mbtiles metadata"` Zoom string `arg:"--zoom" help:"comma-delimited set specific zooms to export (eg: 2,4,6,8)"` TilesFile string `arg:"-f,--file" help:"a list of tiles to also generate, from a file where each line is a z/x/y tile coordinate"` @@ -42,282 +25,39 @@ func (Args) Description() string { return "export baremaps-compatible tilesets from a postgis server" } -type WorkerParams struct { - Num int // worker number - Wg *sync.WaitGroup // waitgroup to signal when completed - Args Args // input args - TileList []tileutils.TileCoords // coords that this worker should process - QueryMap tileutils.ZoomLayerInfo // a map of the queries relevant at each zoom level - GzipCompression bool // true if gzip compression should be used - Writer tileutils.TileWriter // writer to use for output - BulkWriter tileutils.TileBulkWriter // bulk writer if available - Pool *pgxpool.Pool // postgres connection pool -} - -// newWriters creates a TileWriter and TileBulkWriter based on the input arguments -func newWriters(args Args, tj *tileutils.TileJSON) (writer tileutils.TileWriter, bulkWriter tileutils.TileBulkWriter, close func(), err error) { - var mbWriter *tileutils.MbTilesWriter - if args.Output == "" { - writer = &tileutils.DummyWriter{} - return - } - if args.MbTiles { - mbWriter = &tileutils.MbTilesWriter{ - Filename: args.Output, - } - writer = mbWriter - bulkWriter = mbWriter - writer, close, err = mbWriter.New() - if err != nil { - return - } - meta := tileutils.CreateMetadata(tj, tileutils.CreateMetadataOptions{ - Filename: args.TileJSON, - Version: args.Version, - Format: tileutils.MbTilesFormatPbf, - }) - err = mbWriter.BulkWriteMetadata(meta) - return - } - writer = &tileutils.FileWriter{ - Path: args.Output, - } - writer, close, err = writer.New() - return -} - -func connectWithRetries(pool *pgxpool.Pool, numRetries int) (*pgxpool.Conn, error) { - var lastErr error - for i := 0; i < numRetries; i++ { - conn, err := pool.Acquire(context.Background()) - if err == nil { - return conn, nil - } - lastErr = err - time.Sleep(time.Duration(100) * time.Millisecond) - } - return nil, lastErr -} - -func progressReporter(total int) { - ticker := time.NewTicker(progressUpdateRate) - start := time.Now() - for { - t := <-ticker.C - counter := 0 - workerProgressMutex.Lock() - for _, v := range workerProgress { - counter += v - } - workerProgressMutex.Unlock() - progress := float64(counter) / float64(total) * 100.0 - elapsed := time.Duration(int(t.Sub(start).Seconds())) * time.Second - var remaining time.Duration - totalTime := time.Duration(int(elapsed.Seconds()/(progress/100.0))) * time.Second - remaining = totalTime - elapsed - fmt.Printf("progress: %.2f%% (%s elapsed, %s remaining)\n", progress, elapsed, remaining) - if counter == total { - break - } - } -} - -func tileWorker(params WorkerParams) { - // open db connection - conn, err := connectWithRetries(params.Pool, 5) - if err != nil { - fmt.Printf("could not acquire connection! %v\n", err) - params.Wg.Done() - return - } - fmt.Printf("[%d] connected, compression=%t\n", params.Num, params.GzipCompression) - defer conn.Release() - - tileCache := make([]mbtiles.TileData, mbTilesBatchSize) - tileCachePos := 0 - count := 0 - // extract all the tiles in this worker's list - for _, c := range params.TileList { - start := time.Now() - count += 1 - workerProgressMutex.Lock() - workerProgress[params.Num] = count - workerProgressMutex.Unlock() - queryStr := "SELECT " - layerCount := 0 - for layerName, sqlStmts := range params.QueryMap[c.Z] { - if layerCount > 0 { - queryStr += "||" - } - sql := "(WITH mvtgeom AS (" - for i, query := range sqlStmts { - template := "(SELECT ST_AsMVTGeom(t.geom, ST_TileEnvelope(%d, %d, %d)) AS geom, t.tags, t.id " + - "FROM (%s) AS t " + - "WHERE t.geom && ST_TileEnvelope(%d, %d, %d, margin => (64.0/4096)))" - _sql := fmt.Sprintf(template, - c.Z, c.X, c.Y, - strings.ReplaceAll(query, ";", ""), - c.Z, c.X, c.Y) - if i != 0 { - sql += " UNION " - } - sql += _sql - } - queryStr += sql + fmt.Sprintf(") SELECT ST_AsMVT(mvtgeom.*, '%s') FROM mvtgeom )", layerName) - layerCount++ - } - queryStr += " mvtTile;" - row := conn.QueryRow(context.Background(), queryStr) - var mvtTile []byte - err = row.Scan(&mvtTile) - if err != nil { - fmt.Printf("error during tile generation (%d,%d,%d): %v\n", c.Z, c.X, c.Y, err) - continue - } - if params.GzipCompression { - compressed, err := tileutils.Gzip(mvtTile) - if err != nil { - fmt.Printf("error compressing tile: %v\n", err) - } - mvtTile = compressed - } - end := time.Now() - if end.Sub(start) > time.Duration(5)*time.Second { - fmt.Printf("[%d] slow tile: %d/%d/%d - %s\n", params.Num, c.Z, c.X, c.Y, end.Sub(start)) - fmt.Println(queryStr) - } - - if params.BulkWriter != nil { - tileCache[tileCachePos] = mbtiles.TileData{ - Z: c.Z, - X: c.X, - Y: c.Y, - Data: mvtTile, - } - tileCachePos++ - if tileCachePos == mbTilesBatchSize { - err := params.BulkWriter.BulkWrite(tileCache) - if err != nil { - fmt.Printf("error writing tiles") - continue - } - tileCachePos = 0 - } - - } else { - err := params.Writer.Write(c.Z, c.X, c.Y, mvtTile) - if err != nil { - fmt.Printf("error writing tile (%d, %d, %d): %v\n", c.Z, c.X, c.Y, err) - continue - } - } - } - - if tileCachePos > 0 && params.BulkWriter != nil { - err := params.BulkWriter.BulkWrite(tileCache[:tileCachePos]) - if err != nil { - fmt.Printf("error writing tiles") - } - } - // signal we're done - params.Wg.Done() - -} - func main() { + // defer profile.Start(profile.CPUProfile, profile.ProfilePath(".")).Stop() args := Args{ NumWorkers: runtime.NumCPU(), + BatchSize: 10, } arg.MustParse(&args) + if strings.HasSuffix(args.Output, ".mbtiles") { args.MbTiles = true } - // open postgres pool - config, err := pgxpool.ParseConfig(args.Dsn) - if err != nil { - panic(err) - } - - // read tilejson - tileJSON, tileMap, err := tileutils.ParseTileJSON(args.TileJSON) - if err != nil { - panic(err) + config := exporter.Config{ + TileJSON: args.TileJSON, + Output: args.Output, + MbTiles: args.MbTiles, + Dsn: args.Dsn, + InitSQLCmd: args.InitSQLCmd, + NumWorkers: args.NumWorkers, + Version: args.Version, + Zoom: args.Zoom, + TilesFile: args.TilesFile, + MbTilesBatchSize: args.BatchSize, } - config.MinConns = int32(runtime.NumCPU()) - config.MaxConns = int32(2 * runtime.NumCPU()) - pool, err := pgxpool.NewWithConfig(context.Background(), config) + exp, err := exporter.NewExporter(config) if err != nil { panic(err) } + defer exp.Close() - var wg sync.WaitGroup - - var zooms []int - // if comma-delimited list of zooms, use those - if args.Zoom != "" { - strZooms := strings.Split(args.Zoom, ",") - zooms = make([]int, 0, len(strZooms)) - for _, z := range strZooms { - intZoom, err := strconv.Atoi(z) - if err != nil { - panic(err) - } - zooms = append(zooms, intZoom) - } - } else { - zooms = make([]int, 0, tileJSON.MaxZoom-tileJSON.MinZoom+1) - for z := tileJSON.MinZoom; z <= tileJSON.MaxZoom; z++ { - zooms = append(zooms, z) - } - } - slices.Sort(zooms) - // reset min/max zoom to match requested output - tileJSON.MinZoom = zooms[0] - tileJSON.MaxZoom = zooms[len(zooms)-1] - - tiles := tileutils.ListTiles(zooms, tileJSON) - if args.TilesFile != "" { - extraTiles, err := tileutils.TilesFromFile(args.TilesFile) - if err != nil { - panic(err) - } - fmt.Printf("read tile coordinates from file: %d\n", len(extraTiles)) - tiles = append(tiles, extraTiles...) - } - tileLen := len(tiles) - fmt.Printf("number of tiles: %d\n", tileLen) - - writer, bulkWriter, close, err := newWriters(args, tileJSON) + err = exp.Export() if err != nil { panic(err) } - - numWorkers := args.NumWorkers - if numWorkers > tileLen { - numWorkers = tileLen - } - // round robin the tiles so workers are hitting similar geospatial entries and zoom at the same time - rrTiles := tileutils.RoundRobinTiles(tiles, numWorkers) - for i := 0; i < numWorkers; i++ { - wg.Add(1) - workerTiles := rrTiles[i] - params := WorkerParams{ - Num: i, - Wg: &wg, - Args: args, - Pool: pool, - QueryMap: tileMap, - Writer: writer, - BulkWriter: bulkWriter, - TileList: workerTiles, - GzipCompression: args.MbTiles, - } - go tileWorker(params) - } - go progressReporter(tileLen) - - wg.Wait() - close() } diff --git a/go.mod b/go.mod index 93d515e..7143c50 100644 --- a/go.mod +++ b/go.mod @@ -1,32 +1,32 @@ module github.com/flightaware/baremaps-exporter/v2 -go 1.22.0 - -toolchain go1.24.3 +go 1.25.5 require ( - github.com/alexflint/go-arg v1.4.3 - github.com/jackc/pgx/v5 v5.4.3 - github.com/klauspost/compress v1.16.7 - github.com/paulmach/orb v0.10.0 - github.com/stretchr/testify v1.8.4 + github.com/alexflint/go-arg v1.6.1 + github.com/jackc/pgx/v5 v5.8.0 + github.com/klauspost/compress v1.18.2 + github.com/mattn/go-sqlite3 v1.14.33 + github.com/paulmach/orb v0.12.0 + github.com/stretchr/testify v1.11.1 github.com/twpayne/go-mbtiles v0.0.2 - golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f + golang.org/x/exp v0.0.0-20251219203646-944ab1f22d93 ) require ( - github.com/alexflint/go-scalar v1.1.0 // indirect + github.com/alexflint/go-scalar v1.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/paulmach/protoscan v0.2.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect - go.mongodb.org/mongo-driver v1.11.4 // indirect - golang.org/x/crypto v0.12.0 // indirect - golang.org/x/sync v0.9.0 // indirect - golang.org/x/text v0.12.0 // indirect + go.mongodb.org/mongo-driver v1.17.6 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.39.0 // indirect + golang.org/x/text v0.32.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + modernc.org/sqlite v1.42.2 // indirect ) diff --git a/go.sum b/go.sum index 202c147..e9ce480 100644 --- a/go.sum +++ b/go.sum @@ -2,10 +2,10 @@ github.com/alecthomas/assert/v2 v2.3.0 h1:mAsH2wmvjsuvyBvAmCtm7zFsBlb8mIHx5ySLVd github.com/alecthomas/assert/v2 v2.3.0/go.mod h1:pXcQ2Asjp247dahGEmsZ6ru0UVwnkhktn7S0bBDLxvQ= github.com/alecthomas/repr v0.2.0 h1:HAzS41CIzNW5syS8Mf9UwXhNH1J9aix/BvDRf1Ml2Yk= github.com/alecthomas/repr v0.2.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= -github.com/alexflint/go-arg v1.4.3 h1:9rwwEBpMXfKQKceuZfYcwuc/7YY7tWJbFsgG5cAU/uo= -github.com/alexflint/go-arg v1.4.3/go.mod h1:3PZ/wp/8HuqRZMUUgu7I+e1qcpUbvmS258mRXkFH4IA= -github.com/alexflint/go-scalar v1.1.0 h1:aaAouLLzI9TChcPXotr6gUhq+Scr8rl0P9P4PnltbhM= -github.com/alexflint/go-scalar v1.1.0/go.mod h1:LoFvNMqS1CPrMVltza4LvnGKhaSpc3oyLEBUZVhhS2o= +github.com/alexflint/go-arg v1.6.1 h1:uZogJ6VDBjcuosydKgvYYRhh9sRCusjOvoOLZopBlnA= +github.com/alexflint/go-arg v1.6.1/go.mod h1:nQ0LFYftLJ6njcaee0sU+G0iS2+2XJQfA8I062D0LGc= +github.com/alexflint/go-scalar v1.2.0 h1:WR7JPKkeNpnYIOfHRa7ivM21aWAdHD0gEWHCx+WQBRw= +github.com/alexflint/go-scalar v1.2.0/go.mod h1:LoFvNMqS1CPrMVltza4LvnGKhaSpc3oyLEBUZVhhS2o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -21,23 +21,21 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= -github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY= -github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA= -github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= -github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= +github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= @@ -46,11 +44,13 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-sqlite3 v1.14.33 h1:A5blZ5ulQo2AtayQ9/limgHEkFreKj1Dv226a1K73s0= +github.com/mattn/go-sqlite3 v1.14.33/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= -github.com/paulmach/orb v0.10.0 h1:guVYVqzxHE/CQ1KpfGO077TR0ATHSNjp4s6XGLn3W9s= -github.com/paulmach/orb v0.10.0/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= +github.com/paulmach/orb v0.12.0 h1:z+zOwjmG3MyEEqzv92UN49Lg1JFYx0L9GpGKNVDKk1s= +github.com/paulmach/orb v0.12.0/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= github.com/paulmach/protoscan v0.2.1 h1:rM0FpcTjUMvPUNk2BhPJrreDKetq43ChnL+x1sRg8O8= github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -65,9 +65,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/twpayne/go-mbtiles v0.0.2 h1:xzcQ7yX9ibDJ367oN47XN+MxaQ7oSQzoQ56Kov3qsVk= github.com/twpayne/go-mbtiles v0.0.2/go.mod h1:fzquLev3+J+7E3qTiYIAPEBmVmSlazPQ1MLyG9QrQek= @@ -77,16 +76,15 @@ github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgk github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.mongodb.org/mongo-driver v1.11.4 h1:4ayjakA013OdpGyL2K3ZqylTac/rMjrJOMZ1EHizXas= go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= +go.mongodb.org/mongo-driver v1.17.6 h1:87JUG1wZfWsr6rIz3ZmpH90rL5tea7O3IHuSwHUpsss= +go.mongodb.org/mongo-driver v1.17.6/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= -golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= -golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f h1:XdNn9LlyWAhLVp6P/i8QYBW+hlyhrhei9uErw2B5GJo= -golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f/go.mod h1:D5SMRVC3C2/4+F/DB1wZsLRnSNimn2Sp/NPsCrsv8ak= +golang.org/x/exp v0.0.0-20251219203646-944ab1f22d93 h1:fQsdNF2N+/YewlRZiricy4P1iimyPKZ/xwniHj8Q2a0= +golang.org/x/exp v0.0.0-20251219203646-944ab1f22d93/go.mod h1:EPRbTFwzwjXj9NpYyyrvenVh9Y+GFeEvMNh7Xuz7xgU= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -98,23 +96,23 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= -golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= -golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= -golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= +golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -133,17 +131,11 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -modernc.org/gc/v3 v3.0.0-20241004144649-1aea3fae8852 h1:IYXPPTTjjoSHvUClZIYexDiO7g+4x+XveKT4gCIAwiY= -modernc.org/gc/v3 v3.0.0-20241004144649-1aea3fae8852/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= -modernc.org/libc v1.61.3 h1:D1gpZODpSnRpSnXxEsPjplrKDZIbtgWvslE5BOsPv5Q= -modernc.org/libc v1.61.3/go.mod h1:Aw9YglLu+WSCq098BoLHmCALpVxwGU5KASDyzFkYTmQ= -modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= -modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= -modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E= -modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU= -modernc.org/sqlite v1.34.1 h1:u3Yi6M0N8t9yKRDwhXcyp1eS5/ErhPTBggxWFuR6Hfk= -modernc.org/sqlite v1.34.1/go.mod h1:pXV2xHxhzXZsgT/RtTFAPY6JJDEvOTcTdwADQCCWD4k= -modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= -modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= -modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= -modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= +modernc.org/libc v1.66.10 h1:yZkb3YeLx4oynyR+iUsXsybsX4Ubx7MQlSYEw4yj59A= +modernc.org/libc v1.66.10/go.mod h1:8vGSEwvoUoltr4dlywvHqjtAqHBaw0j1jI7iFBTAr2I= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/sqlite v1.42.2 h1:7hkZUNJvJFN2PgfUdjni9Kbvd4ef4mNLOu0B9FGxM74= +modernc.org/sqlite v1.42.2/go.mod h1:+VkC6v3pLOAE0A0uVucQEcbVW0I5nHCeDaBf+DpsQT8= diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go new file mode 100644 index 0000000..adefa59 --- /dev/null +++ b/pkg/exporter/exporter.go @@ -0,0 +1,369 @@ +// Package exporter provides the base export functionality with support for multiple workers. +package exporter + +import ( + "bytes" + "context" + "fmt" + "runtime" + "strconv" + "strings" + "sync" + "time" + + "github.com/flightaware/baremaps-exporter/v2/pkg/tileutils" + "github.com/jackc/pgx/v5/pgxpool" + "golang.org/x/exp/slices" +) + +const ( + ProgressUpdateRate = time.Duration(15) * time.Second +) + +// Config holds the configuration for the exporter +type Config struct { + TileJSON string // Input TileJSON filename + Output string // Output filename + MbTiles bool // True if outputting to a mbtiles file, otherwise outputs to a directory + Dsn string // DSN to connect to postgres to query tiles + InitSQLCmd string // Initialization SQL command to be sent on session start for custom configuration or tuning + NumWorkers int // Number of workers to process tiles in parallel + Version string // Tileset version written to the mbtiles file metadata + Zoom string // List of zooms to process, separated by a comma + TilesFile string // A list of custom tile coordinates to also generate, in addition to what is specified in zooms + MbTilesBatchSize uint // how many tiles to fetch in a batch and then write altogether to mbtiles +} + +// Exporter handles the tile export process +type Exporter struct { + config Config + pool *pgxpool.Pool + tileJSON *tileutils.TileJSON + queryMap tileutils.ZoomLayerInfo + progress map[int]int + progressMux sync.Mutex + sqlQueryByZoom map[int]string +} + +// NewExporter creates a new exporter instance +func NewExporter(config Config) (*Exporter, error) { + // Parse database config + pgConfig, err := pgxpool.ParseConfig(config.Dsn) + if err != nil { + return nil, fmt.Errorf("failed to parse database config: %w", err) + } + + // Read tilejson + tileJSON, queryMap, err := tileutils.ParseTileJSON(config.TileJSON) + if err != nil { + return nil, fmt.Errorf("failed to parse TileJSON: %w", err) + } + + // Configure connection pool + pgConfig.MinConns = int32(runtime.NumCPU()) + pgConfig.MaxConns = int32(2 * runtime.NumCPU()) + + pool, err := pgxpool.NewWithConfig(context.Background(), pgConfig) + if err != nil { + return nil, fmt.Errorf("failed to create connection pool: %w", err) + } + + exporter := &Exporter{ + config: config, + pool: pool, + tileJSON: tileJSON, + queryMap: queryMap, + progress: make(map[int]int), + } + exporter.initQueryStrings() + return exporter, nil +} + +func (e *Exporter) initQueryStrings() { + e.sqlQueryByZoom = make(map[int]string, len(e.queryMap)) + buf := bytes.NewBuffer(make([]byte, 0, 2048)) + for zoom, qm := range e.queryMap { + e.sqlQueryByZoom[zoom] = e.generateQueryString(buf, qm) + } +} + +func (e *Exporter) generateQueryString(buf *bytes.Buffer, queryMap map[string][]string) string { + buf.Reset() + buf.WriteString("SELECT ") + layerCount := 0 + + for layerName, sqlStmts := range queryMap { + if layerCount > 0 { + buf.WriteString("||") + } + buf.WriteString("(WITH mvtgeom AS (") + for i, query := range sqlStmts { + if i != 0 { + buf.WriteString(" UNION ") + } + // Use $1, $2, $3 instead of hardcoding coord.Z, X, Y + buf.WriteString("(SELECT ST_AsMVTGeom(t.geom, ST_TileEnvelope($1, $2, $3)) AS geom, t.tags, t.id ") + buf.WriteString("FROM (") + buf.WriteString(strings.ReplaceAll(query, ";", "")) + buf.WriteString(") AS t ") + buf.WriteString("WHERE t.geom && ST_TileEnvelope($1, $2, $3, margin => (64.0/4096)))") + } + buf.WriteString(") SELECT ST_AsMVT(mvtgeom.*, '") + buf.WriteString(layerName) + buf.WriteString("') FROM mvtgeom )") + layerCount++ + } + buf.WriteString(" mvtTile;") + return buf.String() +} + +// Close closes the database connection pool +func (e *Exporter) Close() { + if e.pool != nil { + e.pool.Close() + } +} + +// GenerateZoomLevels parses the zoom configuration and returns zoom levels +func (e *Exporter) GenerateZoomLevels() ([]int, error) { + var zooms []int + + if e.config.Zoom != "" { + // Parse comma-delimited zoom levels + strZooms := strings.Split(e.config.Zoom, ",") + zooms = make([]int, 0, len(strZooms)) + for _, z := range strZooms { + intZoom, err := strconv.Atoi(z) + if err != nil { + return nil, fmt.Errorf("invalid zoom level: %s", z) + } + zooms = append(zooms, intZoom) + } + } else { + // Use all zoom levels from TileJSON + zooms = make([]int, 0, e.tileJSON.MaxZoom-e.tileJSON.MinZoom+1) + for z := e.tileJSON.MinZoom; z <= e.tileJSON.MaxZoom; z++ { + zooms = append(zooms, z) + } + } + + slices.Sort(zooms) + + // Update TileJSON min/max zoom to match requested output + if len(zooms) > 0 { + e.tileJSON.MinZoom = zooms[0] + e.tileJSON.MaxZoom = zooms[len(zooms)-1] + } + + return zooms, nil +} + +// GenerateTileList creates the list of tiles to process +func (e *Exporter) GenerateTileList(zooms []int) ([]tileutils.TileCoords, error) { + tiles := tileutils.ListTiles(zooms, e.tileJSON) + + // Add extra tiles from file if specified + if e.config.TilesFile != "" { + extraTiles, err := tileutils.TilesFromFile(e.config.TilesFile) + if err != nil { + return nil, fmt.Errorf("failed to read tiles from file: %w", err) + } + fmt.Printf("read tile coordinates from file: %d\n", len(extraTiles)) + tiles = append(tiles, extraTiles...) + } + + return tiles, nil +} + +// CreateWriters creates the appropriate tile writers based on configuration +func (e *Exporter) CreateWriters() (tileutils.TileWriter, tileutils.TileBulkWriter, func(), error) { + var mbWriter *tileutils.MbTilesWriter + + if e.config.Output == "" { + return &tileutils.DummyWriter{}, nil, func() {}, nil + } + + if e.config.MbTiles { + mbWriter = &tileutils.MbTilesWriter{ + Filename: e.config.Output, + } + writer, close, err := mbWriter.New() + if err != nil { + return nil, nil, nil, err + } + + meta := tileutils.CreateMetadata(e.tileJSON, tileutils.CreateMetadataOptions{ + Filename: e.config.TileJSON, + Version: e.config.Version, + Format: tileutils.MbTilesFormatPbf, + }) + err = mbWriter.BulkWriteMetadata(meta) + if err != nil { + return nil, nil, nil, err + } + + return writer, mbWriter, close, nil + } + + writer := &tileutils.FileWriter{ + Path: e.config.Output, + } + w, close, err := writer.New() + return w, nil, close, err +} + +// ConnectWithRetries attempts to acquire a database connection with retries +func (e *Exporter) ConnectWithRetries(numRetries int) (*pgxpool.Conn, error) { + var lastErr error + for i := 0; i < numRetries; i++ { + conn, err := e.pool.Acquire(context.Background()) + if err == nil { + return conn, nil + } + lastErr = err + time.Sleep(time.Duration(100) * time.Millisecond) + } + return nil, lastErr +} + +// UpdateProgress updates the progress for a worker +func (e *Exporter) UpdateProgress(workerNum, count int) { + e.progressMux.Lock() + e.progress[workerNum] = count + e.progressMux.Unlock() +} + +// GetTotalProgress returns the total progress across all workers +func (e *Exporter) GetTotalProgress() int { + e.progressMux.Lock() + defer e.progressMux.Unlock() + + total := 0 + for _, count := range e.progress { + total += count + } + return total +} + +// ProgressReporter runs a progress reporting loop +func (e *Exporter) ProgressReporter(ctx context.Context, totalTiles int) { + ticker := time.NewTicker(ProgressUpdateRate) + defer ticker.Stop() + + start := time.Now() + var m runtime.MemStats + var lastGCPauseNs uint64 + + for { + select { + case <-ctx.Done(): + current := e.GetTotalProgress() + progress := float64(current) / float64(totalTiles) * 100.0 + elapsed := time.Duration(int(time.Since(start).Seconds())) * time.Second + fmt.Printf("done: %.2f%% (%s elapsed)\n", + progress, + elapsed, + ) + return + case t := <-ticker.C: + current := e.GetTotalProgress() + progress := float64(current) / float64(totalTiles) * 100.0 + elapsed := time.Duration(int(t.Sub(start).Seconds())) * time.Second + + var remaining time.Duration + if progress > 0 { + totalTime := time.Duration(int(elapsed.Seconds()/(progress/100.0))) * time.Second + remaining = totalTime - elapsed + } + + runtime.ReadMemStats(&m) + if m.NumGC > 0 { + lastGCPauseNs = m.PauseNs[(m.NumGC+255)%256] + } else { + lastGCPauseNs = 0 + } + fmt.Printf("progress: %.2f%% (%s elapsed, %s remaining) | alloc = %vMiB, total = %vMiB, numgc = %v, gcpause = %.2fs, last = %dms\n", + progress, + elapsed, + remaining, + m.Alloc/1024/1024, + m.TotalAlloc/1024/1024, + m.NumGC, + time.Duration(m.PauseTotalNs).Seconds(), + int(time.Duration(lastGCPauseNs).Milliseconds()), + ) + + if current >= totalTiles { + current := e.GetTotalProgress() + progress := float64(current) / float64(totalTiles) * 100.0 + elapsed := time.Duration(int(time.Since(start).Seconds())) * time.Second + fmt.Printf("done: %.2f%% (%s elapsed)\n", + progress, + elapsed, + ) + return + } + } + } +} + +// Export runs the complete tile export process +func (e *Exporter) Export() error { + // Generate zoom levels + zooms, err := e.GenerateZoomLevels() + if err != nil { + return err + } + + // Generate tile list + tiles, err := e.GenerateTileList(zooms) + if err != nil { + return err + } + + tileLen := len(tiles) + fmt.Printf("number of tiles: %d\n", tileLen) + + // Create writers + writer, bulkWriter, closeFunc, err := e.CreateWriters() + if err != nil { + return err + } + defer closeFunc() + + // Determine number of workers + numWorkers := e.config.NumWorkers + if numWorkers > tileLen { + numWorkers = tileLen + } + + // Distribute tiles to workers + rrTiles := tileutils.RoundRobinTiles(tiles, numWorkers) + + // Start progress reporter + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go e.ProgressReporter(ctx, tileLen) + + // Start workers + var wg sync.WaitGroup + for i := 0; i < numWorkers; i++ { + wg.Add(1) + worker := WorkerParams{ + Num: i, + Wg: &wg, + Exporter: e, + TileList: rrTiles[i], + Writer: writer, + BulkWriter: bulkWriter, + GzipCompression: e.config.MbTiles, + } + go worker.Do() + } + + // Wait for completion + wg.Wait() + cancel() // Stop progress reporter + + return nil +} diff --git a/pkg/exporter/worker.go b/pkg/exporter/worker.go new file mode 100644 index 0000000..70d7eca --- /dev/null +++ b/pkg/exporter/worker.go @@ -0,0 +1,177 @@ +package exporter + +import ( + "bytes" + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/flightaware/baremaps-exporter/v2/pkg/tileutils" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" + gziplib "github.com/klauspost/compress/gzip" + "github.com/twpayne/go-mbtiles" +) + +// WorkerParams holds parameters for a tile worker +type WorkerParams struct { + Num int + Wg *sync.WaitGroup + Exporter *Exporter + TileList []tileutils.TileCoords + GzipCompression bool + Writer tileutils.TileWriter + BulkWriter tileutils.TileBulkWriter + Conn *pgxpool.Conn + GzipCompressor *tileutils.WorkerGzipCompressor + GzipBufferPool *tileutils.BytesBufferPool + TileCache []mbtiles.TileData + TileCachePosition uint + TileBufferCache []*bytes.Buffer + Count uint +} + +// Do processes a slice of tiles for a single worker +func (p *WorkerParams) Do() { + defer p.Wg.Done() + + // Open database connection + _conn, err := p.Exporter.ConnectWithRetries(5) + if err != nil { + fmt.Printf("could not acquire connection! %v\n", err) + return + } + defer _conn.Release() + p.Conn = _conn + + // Create worker-local gzip compressor for memory optimization + if p.GzipCompression { + p.GzipBufferPool = tileutils.NewBytesBufferPool(int(p.Exporter.config.MbTilesBatchSize), 2*1024*1024) // match buffers to batch size, max 2MB each + p.GzipCompressor = tileutils.NewWorkerGzipCompressor(gziplib.BestSpeed) // Use BestSpeed for better performance + } + + fmt.Printf("[%d] connected, compression=%t\n", p.Num, p.GzipCompression) + + p.TileCache = make([]mbtiles.TileData, (int(p.Exporter.config.MbTilesBatchSize))) + p.TileBufferCache = make([]*bytes.Buffer, (int(p.Exporter.config.MbTilesBatchSize))) + + // Disable JIT, it doesn't help us with highly prepared statements + if _, err := p.Conn.Exec(context.Background(), "SET jit = off;"); err != nil { + log.Fatalf("error configuring postgres to disable JIT: %v", err) + } + if p.Exporter.config.InitSQLCmd != "" { + if _, err := p.Conn.Exec(context.Background(), p.Exporter.config.InitSQLCmd); err != nil { + log.Fatalf("error running initialization postgres command: %s: %v", p.Exporter.config.InitSQLCmd, err) + } + } + + // Process all tiles in this worker's list + for _, coord := range p.TileList { + p.Count++ + p.Exporter.UpdateProgress(p.Num, int(p.Count)) + + err := p.FetchTile(coord) + if err != nil { + fmt.Printf("error during fetch (%d,%d,%d): %v\n", coord.Z, coord.X, coord.Y, err) + } + } + + // Write remaining tiles in cache + if p.TileCachePosition > 0 && p.BulkWriter != nil { + err := p.BulkWriter.BulkWrite(p.TileCache[:p.TileCachePosition]) + if err != nil { + fmt.Printf("error writing remaining tiles: %v\n", err) + } + // release remaining buffers + for i, buf := range p.TileBufferCache { + if buf != nil { + p.GzipBufferPool.Put(buf) + p.TileBufferCache[i] = nil + } + } + p.TileCachePosition = 0 + } + + // Log buffer pool efficiency for this worker + if p.GzipCompressor != nil { + stats := p.GzipBufferPool.Stats() + fmt.Printf("[%d] Buffer pool stats: Created=%d, Reused=%d, ReuseRatio=%.2f%%\n", + p.Num, stats.Created, stats.Reused, stats.ReuseRatio*100) + } +} + +func (p *WorkerParams) FetchTile(coord tileutils.TileCoords) error { + // Query tile from the database + queryStr := p.Exporter.sqlQueryByZoom[coord.Z] + start := time.Now() + rows, err := p.Conn.Query(context.Background(), queryStr, coord.Z, coord.X, coord.Y) + if err != nil { + return fmt.Errorf("error querying postgres for tile (%d,%d,%d): %w", coord.Z, coord.X, coord.Y, err) + } + defer rows.Close() + // DriverBytes is the raw re-usable buffer but is only valid until Scan is next called. + // Must be used with Query and not QueryRow, since QueryRow closes rows result immediately which makes + // accesses to the buffer unstable. + var mvtTile pgtype.DriverBytes + if rows.Next() { + if err := rows.Scan(&mvtTile); err != nil { + return fmt.Errorf("error during tile scan: %w", err) + } + } else { + // Check if the query returned no rows or if an error occurred during Next() + if err := rows.Err(); err != nil { + return fmt.Errorf("rows error: %w", err) + } + return fmt.Errorf("no tile data returned for (%d,%d,%d)", coord.Z, coord.X, coord.Y) + } + + // Log slow tiles + end := time.Now() + if end.Sub(start) > time.Duration(5)*time.Second { + fmt.Printf("[%d] slow tile: %d/%d/%d - %s\n", p.Num, coord.Z, coord.X, coord.Y, end.Sub(start)) + } + + // Apply gzip compression if needed using optimized compressor + if p.GzipCompression && p.GzipCompressor != nil { + buf := p.GzipBufferPool.Get() + p.TileBufferCache[p.TileCachePosition] = buf + _, err := p.GzipCompressor.Compress(mvtTile, buf) + if err != nil { + return fmt.Errorf("error compressing tile: %w", err) + } + mvtTile = buf.Bytes() + } + + // Write the tile + if p.BulkWriter != nil { + p.TileCache[p.TileCachePosition] = mbtiles.TileData{ + Z: coord.Z, + X: coord.X, + Y: coord.Y, + Data: mvtTile, + } + p.TileCachePosition++ + + if p.TileCachePosition == p.Exporter.config.MbTilesBatchSize { + err := p.BulkWriter.BulkWrite(p.TileCache) + if err != nil { + return fmt.Errorf("error writing tiles: %w", err) + } + p.TileCachePosition = 0 + for i, buf := range p.TileBufferCache { + if buf != nil { + p.GzipBufferPool.Put(buf) + p.TileBufferCache[i] = nil + } + } + } + } else { + err := p.Writer.Write(coord.Z, coord.X, coord.Y, mvtTile) + if err != nil { + return fmt.Errorf("error writing tile (%d, %d, %d): %w", coord.Z, coord.X, coord.Y, err) + } + } + return nil +} diff --git a/pkg/tileutils/buffer_pool.go b/pkg/tileutils/buffer_pool.go new file mode 100644 index 0000000..d6203bb --- /dev/null +++ b/pkg/tileutils/buffer_pool.go @@ -0,0 +1,87 @@ +package tileutils + +import ( + "bytes" + "sync/atomic" +) + +// BytesBufferPool provides reusable buffers for a single worker (no synchronization needed) +type BytesBufferPool struct { + buffers []*bytes.Buffer + maxSize int + created int64 + reused int64 + maxBuffer int // maximum buffer size to keep in pool +} + +// NewBytesBufferPool creates a new worker-local buffer pool +func NewBytesBufferPool(maxPoolSize, maxBufferSize int) *BytesBufferPool { + return &BytesBufferPool{ + buffers: make([]*bytes.Buffer, 0, maxPoolSize), + maxSize: maxPoolSize, + maxBuffer: maxBufferSize, + } +} + +// Get returns a clean buffer from the pool or creates a new one +func (p *BytesBufferPool) Get() *bytes.Buffer { + if len(p.buffers) > 0 { + // Pop from the end for better performance + buf := p.buffers[len(p.buffers)-1] + p.buffers = p.buffers[:len(p.buffers)-1] + atomic.AddInt64(&p.reused, 1) + return buf + } + + // Create new buffer if pool is empty + atomic.AddInt64(&p.created, 1) + return &bytes.Buffer{} +} + +// Put returns a buffer to the pool after resetting it +func (p *BytesBufferPool) Put(buf *bytes.Buffer) { + if buf == nil { + return + } + + // Don't keep buffers that are too large to prevent memory bloat + if buf.Cap() > p.maxBuffer { + return + } + + // Reset the buffer + buf.Reset() + + // Add to pool if there's space + if len(p.buffers) < p.maxSize { + p.buffers = append(p.buffers, buf) + } + // Else if pool is full, let the buffer be garbage collected +} + +// Stats returns usage statistics for the pool +func (p *BytesBufferPool) Stats() PoolStats { + created := atomic.LoadInt64(&p.created) + reused := atomic.LoadInt64(&p.reused) + total := created + reused + + var reuseRatio float64 + if total > 0 { + reuseRatio = float64(reused) / float64(total) + } + + return PoolStats{ + Created: created, + Reused: reused, + Active: int64(len(p.buffers)), + ReuseRatio: reuseRatio, + } +} + +// PoolStats contains buffer pool usage statistics +type PoolStats struct { + Created int64 // Total buffers created + Reused int64 // Total buffer reuses + Active int64 // Buffers currently in pool + ReuseRatio float64 // Ratio of reused vs created +} diff --git a/pkg/tileutils/gzip_optimized.go b/pkg/tileutils/gzip_optimized.go new file mode 100644 index 0000000..8d086e1 --- /dev/null +++ b/pkg/tileutils/gzip_optimized.go @@ -0,0 +1,47 @@ +package tileutils + +import ( + "bytes" + + gziplib "github.com/klauspost/compress/gzip" +) + +// WorkerGzipCompressor provides optimized gzip compression using a worker-local buffer pool +type WorkerGzipCompressor struct { + writer *gziplib.Writer + level int +} + +// NewWorkerGzipCompressor creates a new worker-local gzip compressor +func NewWorkerGzipCompressor(level int) *WorkerGzipCompressor { + return &WorkerGzipCompressor{ + level: level, + // writer will be created lazily + } +} + +// Compress compresses data using the provided buffer and reused writer. +// Returns a buffer, which will almost certainly be the buffer provided as input. +func (c *WorkerGzipCompressor) Compress(data []byte, buf *bytes.Buffer) (*bytes.Buffer, error) { + // Create or reset gzip writer + if c.writer == nil { + var err error + c.writer, err = gziplib.NewWriterLevel(buf, c.level) + if err != nil { + return nil, err + } + } else { + c.writer.Reset(buf) + } + + // Write and close + if _, err := c.writer.Write(data); err != nil { + return nil, err + } + if err := c.writer.Close(); err != nil { + return nil, err + } + + // NOTE: The caller has to release the buffer back to the pool if it acquired one! + return buf, nil +} diff --git a/pkg/tileutils/gzip_optimized_test.go b/pkg/tileutils/gzip_optimized_test.go new file mode 100644 index 0000000..e6e60f5 --- /dev/null +++ b/pkg/tileutils/gzip_optimized_test.go @@ -0,0 +1,193 @@ +package tileutils + +import ( + "bytes" + "fmt" + "io" + "sync" + "testing" + + gziplib "github.com/klauspost/compress/gzip" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWorkerBufferPool(t *testing.T) { + pool := NewBytesBufferPool(3, 1024*1024) // 3 buffers max, 1MB max size + + // Test getting buffers + buf1 := pool.Get() + buf2 := pool.Get() + buf3 := pool.Get() + + require.NotNil(t, buf1) + require.NotNil(t, buf2) + require.NotNil(t, buf3) + + // Write some data + buf1.WriteString("test1") + buf2.WriteString("test2") + buf3.WriteString("test3") + + // Return buffers + pool.Put(buf1) + pool.Put(buf2) + pool.Put(buf3) + + // Get buffers again - should be reused and clean + buf4 := pool.Get() + buf5 := pool.Get() + buf6 := pool.Get() + + assert.Equal(t, 0, buf4.Len(), "Buffer should be reset") + assert.Equal(t, 0, buf5.Len(), "Buffer should be reset") + assert.Equal(t, 0, buf6.Len(), "Buffer should be reset") + + // Check stats + stats := pool.Stats() + assert.Equal(t, int64(3), stats.Created, "Should have created 3 buffers") + assert.Equal(t, int64(3), stats.Reused, "Should have reused 3 buffers") + assert.Equal(t, float64(0.5), stats.ReuseRatio, "Reuse ratio should be 50%") +} + +func TestWorkerBufferPoolMaxSize(t *testing.T) { + pool := NewBytesBufferPool(2, 1024) // Only 2 buffers max, 1KB max buffer size + + buf1 := pool.Get() + buf2 := pool.Get() + buf3 := pool.Get() + + // Write data to make buffers different sizes + buf1.WriteString("small") + buf2.Write(make([]byte, 2048)) // Too large - should not be kept + buf3.WriteString("normal") + + // Return all buffers + pool.Put(buf1) // Should be kept (small size, pool has space) + pool.Put(buf2) // Should be discarded due to size + pool.Put(buf3) // Should be kept (normal size, pool still has space for 2 total) + + stats := pool.Stats() + assert.Equal(t, int64(2), stats.Active, "Should keep 2 buffers (buf1 and buf3)") + + // Test that oversized buffer was rejected + buf4 := pool.Get() // Should get buf1 or buf3 + buf5 := pool.Get() // Should get the other one + buf6 := pool.Get() // Should create new buffer since pool is empty + + assert.Equal(t, 0, buf4.Len(), "Buffer should be reset") + assert.Equal(t, 0, buf5.Len(), "Buffer should be reset") + assert.Equal(t, 0, buf6.Len(), "Buffer should be reset") +} + +func TestGzipOptimizedCorrectness(t *testing.T) { + testData := []byte("Hello, World! This is a test of gzip compression with buffer pooling.") + + // Compress with original + originalResult, err := Gzip(testData) + require.NoError(t, err) + + // Compress with optimized worker-local approach + pool := NewBytesBufferPool(5, 2*1024*1024) + compressor := NewWorkerGzipCompressor(gziplib.BestCompression) + buf := pool.Get() + optimizedResult, err := compressor.Compress(testData, buf) + require.NoError(t, err) + + // Both should decompress to the same original data + originalDecompressed := decompressGzip(t, originalResult) + optimizedDecompressed := decompressGzip(t, optimizedResult.Bytes()) + + assert.Equal(t, testData, originalDecompressed) + assert.Equal(t, testData, optimizedDecompressed) + + // Results might not be identical due to different buffer usage, + // but decompressed data should be the same + assert.Equal(t, originalDecompressed, optimizedDecompressed) +} + +func TestWorkerGzipCompressor(t *testing.T) { + pool := NewBytesBufferPool(3, 1024*1024) + compressor := NewWorkerGzipCompressor(gziplib.BestCompression) + + testData := []byte("Test data for worker gzip compressor") + + // Compress multiple times to test buffer reuse + for i := 0; i < 10; i++ { + buf := pool.Get() + compressed, err := compressor.Compress(testData, buf) + require.NoError(t, err) + + // Verify decompression + decompressed := decompressGzip(t, compressed.Bytes()) + assert.Equal(t, testData, decompressed) + pool.Put(buf) + } + + // Check that buffers were reused + stats := pool.Stats() + assert.Greater(t, stats.ReuseRatio, 0.5, "Should have good buffer reuse ratio") + assert.LessOrEqual(t, stats.Created, int64(3), "Should not create more than pool size") +} + +func TestConcurrentWorkerPools(t *testing.T) { + // Test that multiple worker pools work independently + numWorkers := 4 + pools := make([]*BytesBufferPool, numWorkers) + compressors := make([]*WorkerGzipCompressor, numWorkers) + + for i := 0; i < numWorkers; i++ { + pools[i] = NewBytesBufferPool(3, 1024*1024) + compressors[i] = NewWorkerGzipCompressor(gziplib.BestCompression) + } + + testData := []byte("Concurrent test data") + + // Run concurrent compressions + results := make(chan []byte, numWorkers*10) + var wg sync.WaitGroup + + for worker := 0; worker < numWorkers; worker++ { + wg.Add(1) + go func(w int) { + for i := 0; i < 10; i++ { + buf := pools[w].Get() + compressed, err := compressors[w].Compress(testData, buf) + require.NoError(t, err) + compressedBytes := make([]byte, compressed.Len()) + copy(compressedBytes, compressed.Bytes()) + results <- compressedBytes + pools[w].Put(buf) + } + wg.Done() + }(worker) + } + wg.Wait() + + // Collect and verify results + for i := 0; i < numWorkers*10; i++ { + fmt.Println(i) + compressed := <-results + decompressed := decompressGzip(t, compressed) + assert.Equal(t, testData, decompressed) + } + + // Verify each pool was used + for i, pool := range pools { + stats := pool.Stats() + assert.Greater(t, stats.Created+stats.Reused, int64(0), + "Worker %d pool should have been used", i) + } +} + +// Helper function to decompress gzip data for testing +func decompressGzip(t *testing.T, data []byte) []byte { + reader, err := gziplib.NewReader(bytes.NewReader(data)) + require.NoError(t, err) + defer reader.Close() + + result, err := io.ReadAll(reader) + require.NoError(t, err) + + return result +} diff --git a/pkg/tileutils/tileutils.go b/pkg/tileutils/tileutils.go index ba9245b..b1621ff 100644 --- a/pkg/tileutils/tileutils.go +++ b/pkg/tileutils/tileutils.go @@ -51,7 +51,6 @@ func ListTiles(zooms []int, tj *TileJSON) []TileCoords { // tilesInBbox returns a list of all tiles within that lat/lon bounding box at the specified zoom level func tilesInBbox(bbox BoundingBox, zoom int) []TileCoords { - fmt.Printf("zoom: %d\n", zoom) xMin := lonToX(bbox.Left, zoom) xMax := lonToX(bbox.Right, zoom) yMin := latToY(bbox.Top, zoom) diff --git a/pkg/tileutils/tileutils_test.go b/pkg/tileutils/tileutils_test.go index ee75729..895a25d 100644 --- a/pkg/tileutils/tileutils_test.go +++ b/pkg/tileutils/tileutils_test.go @@ -35,9 +35,6 @@ func TestTilesInBbox(t *testing.T) { t.Parallel() tiles := tilesInBbox(test.bbox, test.zoom) assert.Equal(t, test.numTiles, len(tiles)) - if test.numTiles != len(tiles) { - fmt.Println(tiles) - } }) } diff --git a/pkg/tileutils/writers.go b/pkg/tileutils/writers.go index f8b0100..cdde960 100644 --- a/pkg/tileutils/writers.go +++ b/pkg/tileutils/writers.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + _ "github.com/mattn/go-sqlite3" "github.com/twpayne/go-mbtiles" ) @@ -121,7 +122,7 @@ func (w *MbTilesWriter) New() (TileWriter, func(), error) { return nil, nil, err } // create a mbtiles writer, which is a wrapper around sqlite3 - _writer, err := mbtiles.NewWriter("sqlite", w.Filename) + _writer, err := mbtiles.NewWriter("sqlite3", w.Filename) if err != nil { return nil, nil, fmt.Errorf("error creating writer: %w", err) }