Skip to content

Commit 9e86521

Browse files
authored
feat: support xa protocol (#9)
1 parent 119af63 commit 9e86521

File tree

5 files changed

+58
-10
lines changed

5 files changed

+58
-10
lines changed

connection.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ import (
2222

2323
"github.com/cectc/dbpack/pkg/log"
2424
"github.com/cectc/hptx/pkg/config"
25+
"github.com/cectc/hptx/pkg/constant"
2526
"github.com/pingcap/parser"
2627
"github.com/pingcap/parser/ast"
2728
)
2829

29-
const XID = "XID"
3030
const GlobalLock = "GlobalLock"
3131

3232
type connCtx struct {
@@ -588,7 +588,7 @@ func (mc *mysqlConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver
588588
}
589589
}
590590

591-
val := ctx.Value(XID)
591+
val := ctx.Value(constant.XID)
592592
if val != nil {
593593
if xid, ok := val.(string); ok {
594594
mc.ctx = &connCtx{

datasource_manager.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/cectc/dbpack/pkg/dt/api"
88
"github.com/cectc/dbpack/pkg/log"
99
"github.com/cectc/hptx/pkg/core"
10+
"github.com/cectc/hptx/pkg/resource"
1011
"github.com/pkg/errors"
1112
)
1213

@@ -25,11 +26,7 @@ func init() {
2526
}
2627
}
2728

28-
func GetDataSourceManager() DataSourceManager {
29-
return dataSourceManager
30-
}
31-
32-
func RegisterResource(dsn string) {
29+
func RegisterATResource(dsn string) {
3330
cfg, err := ParseDSN(dsn)
3431
if err == nil {
3532
c := &connector{
@@ -38,6 +35,7 @@ func RegisterResource(dsn string) {
3835
dataSourceManager.ResourceCache[c.cfg.DBName] = c
3936
InitTableMetaCache(c.cfg.DBName)
4037
}
38+
resource.InitATBranchResource(dataSourceManager)
4139
}
4240

4341
func (resourceManager DataSourceManager) GetConnection(resourceID string) *mysqlConn {

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.13
44

55
require (
66
github.com/cectc/dbpack v0.3.1
7-
github.com/cectc/hptx v1.0.4
7+
github.com/cectc/hptx v1.0.5
88
github.com/google/go-cmp v0.5.7
99
github.com/patrickmn/go-cache v2.1.0+incompatible
1010
github.com/pingcap/parser v0.0.0-20210831085004-b5390aa83f65

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ github.com/carlmjohnson/flagext v0.21.0/go.mod h1:Eenv0epIUAr4NuedNmkzI8WmBmjIxZ
121121
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
122122
github.com/cectc/dbpack v0.3.1 h1:bkIS0dMecGT14EeD7/gBgroyy/zfO27Dk4iEbfDUTkI=
123123
github.com/cectc/dbpack v0.3.1/go.mod h1:xofYTSvQFzwaN5L3KrEh8/chYg4pcbCo/+UVg1y99Hs=
124-
github.com/cectc/hptx v1.0.4 h1:x/bssMsYmIDMKI5kkeeOIJftPuRfNENZp5O95WOvgqw=
125-
github.com/cectc/hptx v1.0.4/go.mod h1:6UF0JJtfOksYnUL9gYCUgufb95PT0e2KD/WYN4LPgzk=
124+
github.com/cectc/hptx v1.0.5 h1:UrT8eqSTU4vEDpuNFiiOBmaZCe/UE3s+3YHy/oGbob8=
125+
github.com/cectc/hptx v1.0.5/go.mod h1:6UF0JJtfOksYnUL9gYCUgufb95PT0e2KD/WYN4LPgzk=
126126
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
127127
github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
128128
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=

xa_resource_manager.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package mysql
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
8+
"github.com/cectc/dbpack/pkg/dt/api"
9+
"github.com/cectc/hptx/pkg/resource"
10+
"github.com/pkg/errors"
11+
)
12+
13+
var xaResourceManager ResourceManager
14+
15+
type ResourceManager struct {
16+
db *sql.DB
17+
}
18+
19+
func RegisterXAResource(db *sql.DB) {
20+
xaResourceManager = ResourceManager{db: db}
21+
resource.InitXABranchResource(xaResourceManager)
22+
}
23+
24+
func (resourceManager ResourceManager) Commit(ctx context.Context, bs *api.BranchSession) (api.BranchSession_BranchStatus, error) {
25+
_, err := resourceManager.db.ExecContext(ctx, fmt.Sprintf("XA COMMIT '%s'", bs.BranchID))
26+
if err != nil {
27+
var mysqlErr *MySQLError
28+
if errors.As(err, &mysqlErr) {
29+
if mysqlErr.Number == 1399 {
30+
return api.Complete, nil
31+
}
32+
}
33+
return bs.Status, err
34+
}
35+
return api.Complete, nil
36+
}
37+
38+
func (resourceManager ResourceManager) Rollback(ctx context.Context, bs *api.BranchSession) (api.BranchSession_BranchStatus, error) {
39+
_, err := resourceManager.db.ExecContext(ctx, fmt.Sprintf("XA ROLLBACK '%s'", bs.BranchID))
40+
if err != nil {
41+
var mysqlErr *MySQLError
42+
if errors.As(err, &mysqlErr) {
43+
if mysqlErr.Number == 1399 {
44+
return api.Complete, nil
45+
}
46+
}
47+
return bs.Status, err
48+
}
49+
return api.Complete, nil
50+
}

0 commit comments

Comments
 (0)