Skip to content

Commit d37a371

Browse files
committed
remove configs and enable replcia connection through code
1 parent 25cf164 commit d37a371

File tree

2 files changed

+359
-94
lines changed

2 files changed

+359
-94
lines changed

pkg/gofr/datasource/dbresolver/factory.go

Lines changed: 70 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@ const (
2020
maxIdleReplicaCapDefault = 10
2121
minOpenReplicaDefault = 5
2222
maxOpenReplicaCapDefault = 20
23-
expectedHostPortParts
23+
expectedHostPortParts = 2
2424
)
2525

2626
var (
2727
errPrimaryNil = errors.New("primary SQL connection is nil")
2828
errInvalidReplicaHostFormat = errors.New("invalid replica host format (expected host:port)")
2929
errDBNameRequired = errors.New("DB_NAME is required")
30+
errEmptyCredentials = errors.New("replica has empty credentials")
31+
errInvalidPort = errors.New("invalid port for replica")
32+
errAllReplicasFailed = errors.New("all replicas failed to connect")
3033
)
3134

3235
// Config holds resolver configuration.
@@ -36,6 +39,14 @@ type Config struct {
3639
MaxFailures uint32
3740
TimeoutSec uint32
3841
PrimaryRoutes []string
42+
Replicas []ReplicaCredential
43+
}
44+
45+
// ReplicaCredential stores credentials for a single replica.
46+
type ReplicaCredential struct {
47+
Host string `json:"host"` // Format: "hostname:port".
48+
User string `json:"user"`
49+
Password string `json:"password"` // Supports commas and special chars.
3950
}
4051

4152
// ResolverProvider implements container.DBResolverProvider interface.
@@ -49,10 +60,10 @@ type ResolverProvider struct {
4960
}
5061

5162
// NewDBResolverProvider creates a new ResolverProvider.
52-
func NewDBResolverProvider(app *gofr.App, cfg Config) *ResolverProvider {
63+
func NewDBResolverProvider(app *gofr.App, cfg *Config) *ResolverProvider {
5364
return &ResolverProvider{
5465
app: app,
55-
cfg: cfg,
66+
cfg: *cfg,
5667
}
5768
}
5869

@@ -124,11 +135,9 @@ func (p *ResolverProvider) Connect() {
124135

125136
logger.Logf("DB Resolver initialized with %d replicas", len(replicas))
126137
}
127-
128-
// createAndValidateReplicas creates replicas and validates count.
129138
func (p *ResolverProvider) createAndValidateReplicas(logger Logger, metrics Metrics) []container.DB {
130-
// Create replicas from config.
131-
replicas, err := connectReplicas(p.app.Config, logger, metrics)
139+
// Pass Config to connectReplicas
140+
replicas, err := connectReplicas(&p.cfg, p.app.Config, logger, metrics)
132141
if err != nil {
133142
logger.Errorf("Failed to create replicas: %v", err)
134143

@@ -164,7 +173,7 @@ func (p *ResolverProvider) GetResolver() container.DB {
164173
}
165174

166175
// InitDBResolver - Complete initialization with middleware.
167-
func InitDBResolver(app *gofr.App, cfg Config) error {
176+
func InitDBResolver(app *gofr.App, cfg *Config) error {
168177
provider := NewDBResolverProvider(app, cfg)
169178

170179
// Add middleware to inject HTTP context.
@@ -190,45 +199,78 @@ func createHTTPMiddleware() gofrHTTP.Middleware {
190199
}
191200
}
192201

193-
// connectReplicas creates replica connections from configuration.
194-
func connectReplicas(cfg config.Config, logger Logger, metrics Metrics) ([]container.DB, error) {
195-
replicasStr := cfg.Get("DB_REPLICA_HOSTS")
196-
if replicasStr == "" {
202+
// connectReplicas creates replica connections from Config.Replicas.
203+
func connectReplicas(cfg *Config, appCfg config.Config, logger Logger, metrics Metrics) ([]container.DB, error) {
204+
if len(cfg.Replicas) == 0 {
197205
return nil, nil
198206
}
199207

200-
replicaHosts := strings.Split(replicasStr, ",")
201-
replicas := make([]container.DB, 0, len(replicaHosts))
208+
replicas := make([]container.DB, 0, len(cfg.Replicas))
202209

203-
user := cfg.Get("DB_REPLICA_USER")
204-
password := cfg.Get("DB_REPLICA_PASSWORD")
210+
var failedReplicas []string
205211

206-
for i, hostPort := range replicaHosts {
207-
hostPort = strings.TrimSpace(hostPort)
208-
if hostPort == "" {
209-
continue
212+
for i, cred := range cfg.Replicas {
213+
if err := validateReplicaCredentials(cred, i); err != nil {
214+
return nil, err
210215
}
211216

212-
parts := strings.Split(hostPort, ":")
213-
if len(parts) != expectedHostPortParts {
214-
return nil, fmt.Errorf("%w at index %d: %s", errInvalidReplicaHostFormat, i, hostPort)
217+
host, port, err := parseReplicaHost(cred.Host, i)
218+
if err != nil {
219+
return nil, err
215220
}
216221

217-
host, port := parts[0], parts[1]
218-
219-
replica, err := createReplicaConnection(cfg, host, port, user, password, logger, metrics)
222+
replica, err := createReplicaConnection(appCfg, host, port, cred.User, cred.Password, logger, metrics)
220223
if err != nil {
221-
return nil, fmt.Errorf("failed to create replica #%d (%s:%s): %w", i+1, host, port, err)
224+
logger.Warnf("Failed to connect to replica #%d (%s): %v", i+1, cred.Host, err)
225+
226+
failedReplicas = append(failedReplicas, cred.Host)
227+
228+
continue // Skip failed replica instead of failing completely
222229
}
223230

224231
replicas = append(replicas, replica)
225232

226-
logger.Logf("Created DB replica connection to %s:%s", host, port)
233+
logger.Logf("Created DB replica connection to %s", cred.Host)
234+
}
235+
236+
if len(replicas) == 0 {
237+
return nil, fmt.Errorf("%w (%d total)", errAllReplicasFailed, len(cfg.Replicas))
238+
}
239+
240+
if len(failedReplicas) > 0 {
241+
logger.Warnf("%d/%d replicas failed: %v", len(failedReplicas), len(cfg.Replicas), failedReplicas)
227242
}
228243

229244
return replicas, nil
230245
}
231246

247+
// validateReplicaCredentials checks if all required fields are present.
248+
func validateReplicaCredentials(cred ReplicaCredential, index int) error {
249+
if cred.Host == "" || cred.User == "" || cred.Password == "" {
250+
return fmt.Errorf("%w at index %d", errEmptyCredentials, index)
251+
}
252+
253+
return nil
254+
}
255+
256+
// parseReplicaHost splits host:port and validates the format.
257+
func parseReplicaHost(host string, index int) (validatedHost, validatedPort string, err error) {
258+
parts := strings.Split(host, ":")
259+
if len(parts) != expectedHostPortParts {
260+
return "", "", fmt.Errorf("%w at index %d: %s", errInvalidReplicaHostFormat, index, host)
261+
}
262+
263+
hostname := strings.TrimSpace(parts[0])
264+
port := strings.TrimSpace(parts[1])
265+
266+
// Validate port is numeric
267+
if _, err := strconv.Atoi(port); err != nil {
268+
return "", "", fmt.Errorf("%w at index %d: %s", errInvalidPort, index, port)
269+
}
270+
271+
return hostname, port, nil
272+
}
273+
232274
// createReplicaConnection creates a single replica database connection.
233275
func createReplicaConnection(cfg config.Config, host, port, user, password string, logger Logger, metrics Metrics) (container.DB, error) {
234276
dbName := cfg.Get("DB_NAME")

0 commit comments

Comments
 (0)