-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdb.go
More file actions
176 lines (147 loc) · 5.54 KB
/
db.go
File metadata and controls
176 lines (147 loc) · 5.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package sqlxy
import (
"context"
"database/sql"
"database/sql/driver"
"sync/atomic"
"time"
"github.com/jmoiron/sqlx"
)
// DB is a logical database with multiple underlying physical databases
// forming a single master multiple slaves topology.
// Reads and writes are automatically directed to the correct physical db.
type DB struct {
conns []*sqlx.DB // Physical databases
count uint64 // Monotonically incrementing counter on each query
}
// Close closes all physical databases concurrently, releasing any open resources.
func (db *DB) Close() error {
return scan(len(db.conns), func(i int) error {
return db.conns[i].Close()
})
}
// SetMaxIdleConns sets the maximum number of connections in the idle
// connection pool for each underlying physical db.
// If MaxOpenConns is greater than 0 but less than the new MaxIdleConns then the
// new MaxIdleConns will be reduced to match the MaxOpenConns limit
// If n <= 0, no idle connections are retained.
func (db *DB) SetMaxIdleConns(n int) {
for i := range db.conns {
db.conns[i].SetMaxIdleConns(n)
}
}
// SetMaxOpenConns sets the maximum number of open connections
// to each physical database.
// If MaxIdleConns is greater than 0 and the new MaxOpenConns
// is less than MaxIdleConns, then MaxIdleConns will be reduced to match
// the new MaxOpenConns limit. If n <= 0, then there is no limit on the number
// of open connections. The default is 0 (unlimited).
func (db *DB) SetMaxOpenConns(n int) {
for i := range db.conns {
db.conns[i].SetMaxOpenConns(n)
}
}
// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
// Expired connections may be closed lazily before reuse.
// If d <= 0, connections are reused forever.
func (db *DB) SetConnMaxLifetime(d time.Duration) {
for i := range db.conns {
db.conns[i].SetConnMaxLifetime(d)
}
}
// Ping verifies if a connection to each physical database is still alive,
// establishing a connection if necessary.
func (db *DB) Ping(ctx context.Context) error {
return scan(len(db.conns), func(i int) error {
return db.conns[i].PingContext(ctx)
})
}
// Driver returns the physical database's underlying driver.
func (db *DB) Driver() driver.Driver {
return db.Master().Driver()
}
// Master returns the master physical database
func (db *DB) Master() *sqlx.DB {
return db.conns[0]
}
// Slave returns one of the physical databases which is a slave
func (db *DB) Slave() *sqlx.DB {
return db.conns[db.slave(len(db.conns))]
}
// Beginx begins a transaction and returns an *sqlx.Tx instead of an *sql.Tx.
func (db *DB) Begin() (*sqlx.Tx, error) {
return db.Master().Beginx()
}
// BeginTx begins a transaction and returns an *sqlx.Tx instead of an
func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sqlx.Tx, error) {
return db.Master().BeginTxx(ctx, opts)
}
// Exec (panic) runs Exec using this database.
// Any placeholder parameters are replaced with supplied args.
func (db *DB) Exec(ctx context.Context, query string, args ...any) sql.Result {
return db.Master().MustExecContext(ctx, query, args...)
}
// Prepare returns an Stmt instead of a sqlx.Stmt.
func (db *DB) Prepare(ctx context.Context, query string) (Stmt, error) {
stmts := make([]*sqlx.Stmt, len(db.conns))
err := scan(len(db.conns), func(i int) (err error) {
stmts[i], err = db.conns[i].PreparexContext(ctx, query)
return err
})
if err != nil {
return nil, err
}
return &stmt{db: db, stmts: stmts}, nil
}
// Query executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
// Query uses a slave as the physical db.
func (db *DB) Query(ctx context.Context, query string, args ...any) (*sqlx.Rows, error) {
return db.Slave().QueryxContext(ctx, query, args...)
}
// QueryRow executes a query that is expected to return at most one row.
// QueryRow always return a non-nil value.
// Errors are deferred until Row's Scan method is called.
// QueryRow uses a slave as the physical db.
func (db *DB) QueryRow(ctx context.Context, query string, args ...any) *sqlx.Row {
return db.Slave().QueryRowxContext(ctx, query, args...)
}
// Select using this DB.
// Any placeholder parameters are replaced with supplied args.
func (db *DB) Select(ctx context.Context, dest any, query string, args ...any) error {
return db.Slave().SelectContext(ctx, dest, query, args...)
}
// Get using this DB.
// Any placeholder parameters are replaced with supplied args.
// An error is returned if the result set is empty.
func (db *DB) Get(ctx context.Context, dest any, query string, args ...any) error {
return db.Slave().GetContext(ctx, dest, query, args...)
}
// NamedExec using this DB.
// Any named placeholder parameters are replaced with fields from arg.
func (db *DB) NamedExec(ctx context.Context, query string, arg any) (sql.Result, error) {
return db.Master().NamedExecContext(ctx, query, arg)
}
// NamedQuery using this DB.
// Any named placeholder parameters are replaced with fields from arg.
func (db *DB) NamedQuery(ctx context.Context, query string, arg any) (*sqlx.Rows, error) {
return db.Slave().NamedQueryContext(ctx, query, arg)
}
// NamedPrepare returns an NamedStmt
func (db *DB) NamedPrepare(ctx context.Context, query string) (NamedStmt, error) {
stmts := make([]*sqlx.NamedStmt, len(db.conns))
err := scan(len(db.conns), func(i int) (err error) {
stmts[i], err = db.conns[i].PrepareNamedContext(ctx, query)
return err
})
if err != nil {
return nil, err
}
return &namedStmt{db: db, stmts: stmts}, nil
}
func (db *DB) slave(n int) int {
if n <= 1 {
return 0
}
return int(1 + (atomic.AddUint64(&db.count, 1) % uint64(n-1)))
}