diff --git a/pkg/env/env.go b/pkg/env/env.go new file mode 100644 index 0000000..b9a5a7d --- /dev/null +++ b/pkg/env/env.go @@ -0,0 +1,49 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +// Package env provides utilities for reading environment variables with type conversion and default values. +package env + +import ( + "os" + "strconv" + "time" +) + +// GetString returns the value of the environment variable or the default value if not set or empty. +func GetString(key, defaultValue string) string { + if value := os.Getenv(key); value != "" { + return value + } + return defaultValue +} + +// GetInt returns the value of the environment variable as an integer or the default value if not set, empty, or invalid. +func GetInt(key string, defaultValue int) int { + if value := os.Getenv(key); value != "" { + if intValue, err := strconv.Atoi(value); err == nil { + return intValue + } + } + return defaultValue +} + +// GetBool returns the value of the environment variable as a boolean or the default value if not set, empty, or invalid. +func GetBool(key string, defaultValue bool) bool { + if value := os.Getenv(key); value != "" { + if boolValue, err := strconv.ParseBool(value); err == nil { + return boolValue + } + } + return defaultValue +} + +// GetDuration returns the value of the environment variable as a time.Duration or the default value if not set, empty, or invalid. +func GetDuration(key string, defaultValue time.Duration) time.Duration { + if value := os.Getenv(key); value != "" { + if duration, err := time.ParseDuration(value); err == nil { + return duration + } + } + return defaultValue +} diff --git a/scripts/migration/001_add_access_query_fields/README.md b/scripts/migration/001_add_access_query_fields/README.md new file mode 100644 index 0000000..cd01db7 --- /dev/null +++ b/scripts/migration/001_add_access_query_fields/README.md @@ -0,0 +1,129 @@ +# Access Query Fields Migration Script + +This script migrates existing OpenSearch documents to add the new `access_check_query` and `history_check_query` fields that were recently added as part of [PR #20](https://github.com/linuxfoundation/lfx-v2-indexer-service/pull/20). + +## Background + +The LFX indexer service was recently updated to include new document fields for Fine-Grained Authorization (FGA) of documents via the [query service](https://github.com/linuxfoundation/lfx-v2-query-service): + +- `access_check_query`: Combination of `access_check_object` + "#" + `access_check_relation` +- `history_check_query`: Combination of `history_check_object` + "#" + `history_check_relation` + +These fields are automatically populated for newly indexed documents (implemented in [PR #20](https://github.com/linuxfoundation/lfx-v2-indexer-service/pull/20)), but existing documents need to be migrated. + +## Usage + +### Basic Usage + +```bash +# Run in dry-run mode to see what would be changed +DRY_RUN=true go run scripts/migration/001_add_access_query_fields/main.go + +# Run the actual migration +go run scripts/migration/001_add_access_query_fields/main.go +``` + +## Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `OPENSEARCH_URL` | `http://localhost:9200` | OpenSearch cluster URL | +| `OPENSEARCH_INDEX` | `resources` | Target index name | +| `BATCH_SIZE` | `100` | Number of documents to process per batch | +| `DRY_RUN` | `false` | If true, only log what would be updated without making changes | +| `SCROLL_TIMEOUT` | `5m` | Scroll context timeout | + +## Safety Features + +- **Dry Run Mode**: Use `DRY_RUN=true` to preview changes without applying them +- **Idempotent**: Safe to run multiple times - skips documents that already have the new fields +- **Graceful Shutdown**: Responds to SIGINT/SIGTERM signals +- **Progress Tracking**: Shows detailed progress and statistics +- **Error Handling**: Continues processing even if individual batches fail + +## Migration Logic + +The script: + +1. Searches for documents that have access control fields but are missing the new query fields +2. For each document, constructs the query fields only if both object and relation are non-empty +3. Updates documents in batches using the OpenSearch bulk API +4. Provides detailed statistics and progress reporting + +### Query Construction Rules + +- `access_check_query` is created only if both `access_check_object` and `access_check_relation` are non-empty +- `history_check_query` is created only if both `history_check_object` and `history_check_relation` are non-empty +- Format: `{object}#{relation}` (e.g., `committee:abc123#viewer`) + +## Example Output + +```text +Starting access query fields migration... +=== Migration Configuration === + OpenSearch URL: http://opensearch:9200 + Index Name: resources + Batch Size: 100 + Dry Run: false + Scroll Timeout: 5m0s +============================== +✓ Connected to OpenSearch successfully +Searching for documents that need migration... +Found 1250 documents that may need migration + +Processing batch 1 (100 documents)... +Progress: 100/1250 documents (8.0%) + +Processing batch 2 (100 documents)... +Progress: 200/1250 documents (16.0%) +... + +=== Migration Statistics === +Total Documents Found: 1250 +Documents Processed: 1250 +Documents Updated: 987 +Documents Skipped: 263 +Documents with Errors: 0 +Duration: 45.6s +Processing Rate: 27.4 docs/sec +============================ + +✓ Migration completed successfully! +``` + +## Troubleshooting + +### Connection Issues + +- Verify OpenSearch is running and accessible +- Check the `OPENSEARCH_URL` environment variable +- Ensure network connectivity and authentication if required + +### Performance Tuning + +- Increase `BATCH_SIZE` for faster processing of large datasets +- Adjust `SCROLL_TIMEOUT` if processing very large result sets +- Monitor OpenSearch cluster performance during migration + +### Partial Failures + +- The script continues processing even if individual batches fail +- Check the error logs for specific failure reasons +- Re-run the script to retry failed documents (it's idempotent) + +## Testing + +Always test in a non-production environment first: + +1. Run with `DRY_RUN=true` to preview changes +2. Test with a small `BATCH_SIZE` initially +3. Verify the query fields are constructed correctly +4. Check that no data is corrupted + +## Technical Details + +- Uses OpenSearch scroll API for efficient processing of large result sets +- Bulk updates for optimal performance +- Only fetches necessary fields to minimize network transfer +- Implements proper signal handling for graceful shutdown +- Comprehensive error handling and statistics tracking diff --git a/scripts/migration/001_add_access_query_fields/main.go b/scripts/migration/001_add_access_query_fields/main.go new file mode 100644 index 0000000..73bebbd --- /dev/null +++ b/scripts/migration/001_add_access_query_fields/main.go @@ -0,0 +1,524 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +// Package main provides a data migration script to add access_check_query and history_check_query +// fields to existing documents in the OpenSearch index. +// +// These fields combine the existing access control object and relation fields into query format: +// - access_check_query = access_check_object + "#" + access_check_relation +// - history_check_query = history_check_object + "#" + history_check_relation +// +// Usage: +// +// go run scripts/migration/001_add_access_query_fields/main.go +// +// Environment variables: +// - OPENSEARCH_URL: OpenSearch cluster URL (default: http://localhost:9200) +// - OPENSEARCH_INDEX: Target index name (default: resources) +// - BATCH_SIZE: Number of documents to process per batch (default: 100) +// - DRY_RUN: If true, only log what would be updated without making changes (default: false) +// - SCROLL_TIMEOUT: Scroll context timeout (default: 5m) +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/linuxfoundation/lfx-v2-indexer-service/pkg/env" + "github.com/opensearch-project/opensearch-go/v2" + "github.com/opensearch-project/opensearch-go/v2/opensearchapi" +) + +// Config holds the migration configuration +type Config struct { + OpenSearchURL string + IndexName string + BatchSize int + DryRun bool + ScrollTimeout time.Duration +} + +// Document represents a document from OpenSearch with access control fields +type Document struct { + ID string `json:"_id"` + Source DocumentSource `json:"_source"` +} + +// DocumentSource represents the source fields of a document +type DocumentSource struct { + AccessCheckObject string `json:"access_check_object,omitempty"` + AccessCheckRelation string `json:"access_check_relation,omitempty"` + HistoryCheckObject string `json:"history_check_object,omitempty"` + HistoryCheckRelation string `json:"history_check_relation,omitempty"` + AccessCheckQuery string `json:"access_check_query,omitempty"` + HistoryCheckQuery string `json:"history_check_query,omitempty"` +} + +// SearchResponse represents the OpenSearch search response +type SearchResponse struct { + ScrollID string `json:"_scroll_id,omitempty"` + Hits struct { + Total struct { + Value int `json:"value"` + } `json:"total"` + Hits []Document `json:"hits"` + } `json:"hits"` +} + +// BulkResponse represents the OpenSearch bulk response +type BulkResponse struct { + Took int `json:"took"` + Errors bool `json:"errors"` + Items []map[string]interface{} `json:"items"` +} + +// Stats tracks migration statistics +type Stats struct { + TotalDocuments int + ProcessedDocuments int + UpdatedDocuments int + SkippedDocuments int + ErroredDocuments int + StartTime time.Time +} + +// JoinFgaQuery constructs an FGA query string from object and relation +// This matches the logic in internal/domain/contracts/fga.go +func JoinFgaQuery(object, relation string) string { + return fmt.Sprintf("%s#%s", object, relation) +} + +// loadConfig loads configuration from environment variables with defaults +func loadConfig() *Config { + config := &Config{ + OpenSearchURL: env.GetString("OPENSEARCH_URL", "http://localhost:9200"), + IndexName: env.GetString("OPENSEARCH_INDEX", "resources"), + BatchSize: env.GetInt("BATCH_SIZE", 100), + DryRun: env.GetBool("DRY_RUN", false), + ScrollTimeout: env.GetDuration("SCROLL_TIMEOUT", 5*time.Minute), + } + + log.Println("=== Migration Configuration ===") + log.Printf(" OpenSearch URL: %s", config.OpenSearchURL) + log.Printf(" Index Name: %s", config.IndexName) + log.Printf(" Batch Size: %d", config.BatchSize) + log.Printf(" Dry Run: %t", config.DryRun) + log.Printf(" Scroll Timeout: %v", config.ScrollTimeout) + log.Println("==============================") + + return config +} + +// createOpenSearchClient creates and configures OpenSearch client +func createOpenSearchClient(config *Config) (*opensearch.Client, error) { + cfg := opensearch.Config{ + Addresses: []string{config.OpenSearchURL}, + // Add any authentication configuration here if needed + } + + client, err := opensearch.NewClient(cfg) + if err != nil { + return nil, fmt.Errorf("failed to create OpenSearch client: %w", err) + } + + // Test connection + info, err := client.Info() + if err != nil { + return nil, fmt.Errorf("failed to connect to OpenSearch: %w", err) + } + defer info.Body.Close() + + log.Println("✓ Connected to OpenSearch successfully") + return client, nil +} + +// needsUpdate checks if a document needs the new query fields added +func needsUpdate(doc DocumentSource) (needsAccessQuery, needsHistoryQuery bool) { + // Check if access query is needed + if doc.AccessCheckQuery == "" && doc.AccessCheckObject != "" && doc.AccessCheckRelation != "" { + needsAccessQuery = true + } + + // Check if history query is needed + if doc.HistoryCheckQuery == "" && doc.HistoryCheckObject != "" && doc.HistoryCheckRelation != "" { + needsHistoryQuery = true + } + + return needsAccessQuery, needsHistoryQuery +} + +// buildQueryFields constructs the new query fields from existing access control fields +func buildQueryFields(doc DocumentSource) (accessQuery, historyQuery string) { + // Build access check query if both components are present and non-empty + if doc.AccessCheckObject != "" && doc.AccessCheckRelation != "" { + accessQuery = JoinFgaQuery(doc.AccessCheckObject, doc.AccessCheckRelation) + } + + // Build history check query if both components are present and non-empty + if doc.HistoryCheckObject != "" && doc.HistoryCheckRelation != "" { + historyQuery = JoinFgaQuery(doc.HistoryCheckObject, doc.HistoryCheckRelation) + } + + return accessQuery, historyQuery +} + +// searchDocuments initiates a scroll search for documents that need migration +func searchDocuments(ctx context.Context, client *opensearch.Client, config *Config) (*SearchResponse, error) { + // Search for documents that have access control fields but are missing query fields + searchBody := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "should": []map[string]interface{}{ + { + "bool": map[string]interface{}{ + "must": []map[string]interface{}{ + {"exists": map[string]interface{}{"field": "access_check_object"}}, + {"exists": map[string]interface{}{"field": "access_check_relation"}}, + }, + "must_not": []map[string]interface{}{ + {"exists": map[string]interface{}{"field": "access_check_query"}}, + }, + }, + }, + { + "bool": map[string]interface{}{ + "must": []map[string]interface{}{ + {"exists": map[string]interface{}{"field": "history_check_object"}}, + {"exists": map[string]interface{}{"field": "history_check_relation"}}, + }, + "must_not": []map[string]interface{}{ + {"exists": map[string]interface{}{"field": "history_check_query"}}, + }, + }, + }, + }, + "minimum_should_match": 1, + }, + }, + "size": config.BatchSize, + "_source": []string{ + "access_check_object", + "access_check_relation", + "history_check_object", + "history_check_relation", + "access_check_query", + "history_check_query", + }, + } + + searchBodyJSON, err := json.Marshal(searchBody) + if err != nil { + return nil, fmt.Errorf("failed to marshal search body: %w", err) + } + + req := opensearchapi.SearchRequest{ + Index: []string{config.IndexName}, + Body: strings.NewReader(string(searchBodyJSON)), + Scroll: config.ScrollTimeout, + } + + res, err := req.Do(ctx, client) + if err != nil { + return nil, fmt.Errorf("failed to execute search: %w", err) + } + defer res.Body.Close() + + if res.IsError() { + return nil, fmt.Errorf("search request failed: %s", res.String()) + } + + var searchResponse SearchResponse + if err := json.NewDecoder(res.Body).Decode(&searchResponse); err != nil { + return nil, fmt.Errorf("failed to parse search response: %w", err) + } + + return &searchResponse, nil +} + +// scrollDocuments continues scrolling through search results +func scrollDocuments(ctx context.Context, client *opensearch.Client, scrollID string, scrollTimeout time.Duration) (*SearchResponse, error) { + scrollBody := map[string]interface{}{ + "scroll_id": scrollID, + "scroll": scrollTimeout.String(), + } + + scrollBodyJSON, err := json.Marshal(scrollBody) + if err != nil { + return nil, fmt.Errorf("failed to marshal scroll body: %w", err) + } + + req := opensearchapi.ScrollRequest{ + Body: strings.NewReader(string(scrollBodyJSON)), + } + + res, err := req.Do(ctx, client) + if err != nil { + return nil, fmt.Errorf("failed to execute scroll: %w", err) + } + defer res.Body.Close() + + if res.IsError() { + return nil, fmt.Errorf("scroll request failed: %s", res.String()) + } + + var searchResponse SearchResponse + if err := json.NewDecoder(res.Body).Decode(&searchResponse); err != nil { + return nil, fmt.Errorf("failed to parse scroll response: %w", err) + } + + return &searchResponse, nil +} + +// processBatch processes a batch of documents and updates them +func processBatch(ctx context.Context, client *opensearch.Client, config *Config, documents []Document, stats *Stats) error { + if len(documents) == 0 { + return nil + } + + var bulkBody strings.Builder + updateCount := 0 + + for _, doc := range documents { + stats.ProcessedDocuments++ + + needsAccessQuery, needsHistoryQuery := needsUpdate(doc.Source) + if !needsAccessQuery && !needsHistoryQuery { + stats.SkippedDocuments++ + continue + } + + accessQuery, historyQuery := buildQueryFields(doc.Source) + + // Build update document + updateDoc := make(map[string]interface{}) + if needsAccessQuery && accessQuery != "" { + updateDoc["access_check_query"] = accessQuery + } + if needsHistoryQuery && historyQuery != "" { + updateDoc["history_check_query"] = historyQuery + } + + // Skip if no updates needed (both queries empty) + if len(updateDoc) == 0 { + stats.SkippedDocuments++ + continue + } + + if config.DryRun { + log.Printf("[DRY RUN] Would update document %s:", doc.ID) + if accessQuery != "" { + log.Printf(" - access_check_query: %s", accessQuery) + } + if historyQuery != "" { + log.Printf(" - history_check_query: %s", historyQuery) + } + stats.UpdatedDocuments++ + continue + } + + // Add to bulk body + bulkBody.WriteString(fmt.Sprintf(`{"update":{"_index":"%s","_id":"%s"}}`, config.IndexName, doc.ID)) + bulkBody.WriteString("\n") + + updateJSON, err := json.Marshal(map[string]interface{}{"doc": updateDoc}) + if err != nil { + stats.ErroredDocuments++ + return fmt.Errorf("failed to marshal update document for %s: %w", doc.ID, err) + } + bulkBody.Write(updateJSON) + bulkBody.WriteString("\n") + + updateCount++ + } + + // Execute bulk update if not in dry run and there are updates + if !config.DryRun && updateCount > 0 { + req := opensearchapi.BulkRequest{ + Body: strings.NewReader(bulkBody.String()), + } + + res, err := req.Do(ctx, client) + if err != nil { + stats.ErroredDocuments += updateCount + return fmt.Errorf("failed to execute bulk update: %w", err) + } + defer res.Body.Close() + + if res.IsError() { + stats.ErroredDocuments += updateCount + return fmt.Errorf("bulk update failed: %s", res.String()) + } + + // Parse response to check for errors + var bulkResponse BulkResponse + if err := json.NewDecoder(res.Body).Decode(&bulkResponse); err != nil { + return fmt.Errorf("failed to parse bulk response: %w", err) + } + + if bulkResponse.Errors { + // Count individual errors + errorCount := 0 + for _, item := range bulkResponse.Items { + if update, ok := item["update"].(map[string]interface{}); ok { + if _, hasError := update["error"]; hasError { + errorCount++ + } + } + } + stats.ErroredDocuments += errorCount + stats.UpdatedDocuments += (updateCount - errorCount) + return fmt.Errorf("bulk update had %d errors out of %d updates", errorCount, updateCount) + } + + stats.UpdatedDocuments += updateCount + } + + return nil +} + +// clearScroll clears the scroll context +func clearScroll(ctx context.Context, client *opensearch.Client, scrollID string) { + if scrollID == "" { + return + } + + req := opensearchapi.ClearScrollRequest{ + ScrollID: []string{scrollID}, + } + + if res, err := req.Do(ctx, client); err == nil { + res.Body.Close() + } +} + +// printStats prints the migration statistics +func printStats(stats *Stats) { + duration := time.Since(stats.StartTime) + + log.Println("\n=== Migration Statistics ===") + log.Printf("Total Documents Found: %d", stats.TotalDocuments) + log.Printf("Documents Processed: %d", stats.ProcessedDocuments) + log.Printf("Documents Updated: %d", stats.UpdatedDocuments) + log.Printf("Documents Skipped: %d", stats.SkippedDocuments) + log.Printf("Documents with Errors: %d", stats.ErroredDocuments) + log.Printf("Duration: %v", duration) + + if stats.ProcessedDocuments > 0 { + rate := float64(stats.ProcessedDocuments) / duration.Seconds() + log.Printf("Processing Rate: %.2f docs/sec", rate) + } + log.Println("============================") +} + +func main() { + log.Println("Starting access query fields migration...") + + // Setup signal handling for graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigChan + log.Println("\n⚠ Received interrupt signal, shutting down gracefully...") + cancel() + }() + + // Load configuration + config := loadConfig() + + // Initialize statistics + stats := &Stats{ + StartTime: time.Now(), + } + + // Create OpenSearch client + client, err := createOpenSearchClient(config) + if err != nil { + log.Fatalf("Failed to create OpenSearch client: %v", err) + } + + // Initial search + log.Println("Searching for documents that need migration...") + searchResponse, err := searchDocuments(ctx, client, config) + if err != nil { + log.Fatalf("Failed to search documents: %v", err) + } + + stats.TotalDocuments = searchResponse.Hits.Total.Value + log.Printf("Found %d documents that may need migration", stats.TotalDocuments) + + if stats.TotalDocuments == 0 { + log.Println("✓ No documents need migration. Exiting.") + return + } + + scrollID := searchResponse.ScrollID + defer clearScroll(context.Background(), client, scrollID) + + batchNumber := 0 + for { + // Check for cancellation + select { + case <-ctx.Done(): + log.Println("Migration cancelled by user") + printStats(stats) + return + default: + } + + // Process current batch + if len(searchResponse.Hits.Hits) > 0 { + batchNumber++ + log.Printf("\nProcessing batch %d (%d documents)...", batchNumber, len(searchResponse.Hits.Hits)) + + if err := processBatch(ctx, client, config, searchResponse.Hits.Hits, stats); err != nil { + log.Printf("Warning: Error processing batch %d: %v", batchNumber, err) + } + + // Progress update + percentComplete := float64(stats.ProcessedDocuments) * 100 / float64(stats.TotalDocuments) + log.Printf("Progress: %d/%d documents (%.1f%%)", stats.ProcessedDocuments, stats.TotalDocuments, percentComplete) + } + + // Check if we have more documents to process + if len(searchResponse.Hits.Hits) < config.BatchSize { + break + } + + // Get next batch + searchResponse, err = scrollDocuments(ctx, client, scrollID, config.ScrollTimeout) + if err != nil { + log.Printf("Warning: Failed to scroll documents: %v", err) + break + } + + // Update scroll ID for cleanup + if searchResponse.ScrollID != "" { + scrollID = searchResponse.ScrollID + } + + // Break if no more documents + if len(searchResponse.Hits.Hits) == 0 { + break + } + } + + // Print final statistics + printStats(stats) + + if config.DryRun { + log.Println("\n✓ DRY RUN COMPLETE: No actual changes were made") + log.Println(" Run without DRY_RUN=true to apply changes") + } else { + log.Println("\n✓ Migration completed successfully!") + } +}