Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions bulker/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ type Config struct {
ShutdownTimeoutSec int `mapstructure:"SHUTDOWN_TIMEOUT_SEC" default:"10"`
//Extra delay may be needed. E.g. for metric scrapper to scrape final metrics. So http server will stay active for an extra period.
ShutdownExtraDelay int `mapstructure:"SHUTDOWN_EXTRA_DELAY_SEC" default:"5"`

// # IP HEADERS
// Comma-separated list of HTTP headers to check for client IP address, in order of priority.
// Default: "X-Real-Ip,X-Forwarded-For" (current behavior)
// For Cloudflare: "CF-Connecting-IP,X-Real-Ip,X-Forwarded-For"
TrustedIPHeaders string `mapstructure:"TRUSTED_IP_HEADERS" default:"X-Real-Ip,X-Forwarded-For"`
TrustedIPHeadersList []string `mapstructure:"-"`
}

func init() {
Expand All @@ -75,5 +82,6 @@ func (ac *Config) PostInit(settings *appbase.AppSettings) error {
return err
}
ac.GlobalHashSecrets = strings.Split(ac.GlobalHashSecret, ",")
ac.TrustedIPHeadersList = strings.Split(ac.TrustedIPHeaders, ",")
return nil
}
19 changes: 18 additions & 1 deletion bulker/ingest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ var eventTypesSet = types.NewSet("page", "identify", "track", "group", "alias",

var messageIdUnsupportedChars = regexp.MustCompile(`[^a-zA-Z0-9._-]`)

// trustedIPHeadersList is set during router initialization from config
var trustedIPHeadersList []string

type eventPatchFunc func(c *gin.Context, messageId string, event types.Json, tp string, ingestType IngestType, analyticContext types.Json, defaultEventName string) error

type Router struct {
Expand Down Expand Up @@ -146,6 +149,9 @@ func NewRouter(appContext *Context, partitionSelector kafkabase.PartitionSelecto
dataHosts: dataHosts,
partitionSelector: partitionSelector,
}
// Initialize trusted IP headers from config
trustedIPHeadersList = appContext.config.TrustedIPHeadersList
base.Infof("Trusted IP headers: %v", trustedIPHeadersList)
engine := router.Engine()
// get global Monitor object
m := ginmetrics.GetMonitor()
Expand Down Expand Up @@ -319,7 +325,7 @@ func patchEvent(c *gin.Context, messageId string, ev types.Json, tp string, inge
ev.SetIfAbsent("event", eventName)
}
}
ip := strings.TrimSpace(strings.Split(utils.NvlString(c.GetHeader("X-Real-Ip"), c.GetHeader("X-Forwarded-For"), c.ClientIP()), ",")[0])
ip := getClientIP(c)
ipPolicy := c.GetHeader("X-IP-Policy")
switch ipPolicy {
case "stripLastOctet":
Expand Down Expand Up @@ -409,6 +415,17 @@ func ipStripLastOctet(ip string) string {
return ip
}

// getClientIP extracts client IP from request headers based on configured trusted headers list
func getClientIP(c *gin.Context) string {
for _, header := range trustedIPHeadersList {
if val := c.GetHeader(header); val != "" {
// Handle comma-separated values (e.g., X-Forwarded-For can contain multiple IPs)
return strings.TrimSpace(strings.Split(val, ",")[0])
}
}
return c.ClientIP()
}

type SyncDestinationsResponse struct {
Destinations []*SyncDestinationsData `json:"destinations,omitempty"`
OK bool `json:"ok"`
Expand Down