diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..0d0eed2c --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/bradfitz/gomemcache + +go 1.12 diff --git a/memcache/memcache.go b/memcache/memcache.go index b98a7653..2ad8fc4b 100644 --- a/memcache/memcache.go +++ b/memcache/memcache.go @@ -23,17 +23,16 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net" - "strconv" "strings" "sync" + "sync/atomic" "time" ) // Similar to: -// http://code.google.com/appengine/docs/go/memcache/reference.html +// https://godoc.org/google.golang.org/appengine/memcache var ( // ErrCacheMiss means that a Get failed because the item wasn't present. @@ -113,6 +112,7 @@ var ( resultTouched = []byte("TOUCHED\r\n") resultClientErrorPrefix = []byte("CLIENT_ERROR ") + versionPrefix = []byte("VERSION") ) // New returns a memcache client using the provided server(s) @@ -129,6 +129,12 @@ func NewFromSelector(ss ServerSelector) *Client { return &Client{selector: ss} } +// Stats contains statistic about connections being used by client. +type Stats struct { + ActiveConns int + IdleConns int +} + // Client is a memcache client. // It is safe for unlocked use by multiple concurrent goroutines. type Client struct { @@ -144,9 +150,12 @@ type Client struct { // be set to a number higher than your peak parallel requests. MaxIdleConns int + // number of currently used connections + activeConns int32 + selector ServerSelector - lk sync.Mutex + lk sync.RWMutex freeconn map[string][]*conn } @@ -193,6 +202,7 @@ func (cn *conn) extendDeadline() { // cache miss). The purpose is to not recycle TCP connections that // are bad. func (cn *conn) condRelease(err *error) { + atomic.AddInt32(&cn.c.activeConns, -1) if *err == nil || resumableError(*err) { cn.release() } else { @@ -276,6 +286,7 @@ func (c *Client) getConn(addr net.Addr) (*conn, error) { cn, ok := c.getFreeConn(addr) if ok { cn.extendDeadline() + atomic.AddInt32(&c.activeConns, 1) return cn, nil } nc, err := c.dial(addr) @@ -289,6 +300,7 @@ func (c *Client) getConn(addr net.Addr) (*conn, error) { c: c, } cn.extendDeadline() + atomic.AddInt32(&c.activeConns, 1) return cn, nil } @@ -326,8 +338,9 @@ func (c *Client) Get(key string) (item *Item, err error) { // Touch updates the expiry for the given key. The seconds parameter is either // a Unix timestamp or, if seconds is less than 1 month, the number of seconds -// into the future at which time the item will expire. ErrCacheMiss is returned if the -// key is not in the cache. The key must be at most 250 bytes in length. +// into the future at which time the item will expire. Zero means the item has +// no expiration time. ErrCacheMiss is returned if the key is not in the cache. +// The key must be at most 250 bytes in length. func (c *Client) Touch(key string, seconds int32) (err error) { return c.withKeyAddr(key, func(addr net.Addr) error { return c.touchFromAddr(addr, []string{key}, seconds) @@ -398,6 +411,30 @@ func (c *Client) flushAllFromAddr(addr net.Addr) error { }) } +// ping sends the version command to the given addr +func (c *Client) ping(addr net.Addr) error { + return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error { + if _, err := fmt.Fprintf(rw, "version\r\n"); err != nil { + return err + } + if err := rw.Flush(); err != nil { + return err + } + line, err := rw.ReadSlice('\n') + if err != nil { + return err + } + + switch { + case bytes.HasPrefix(line, versionPrefix): + break + default: + return fmt.Errorf("memcache: unexpected response line from ping: %q", string(line)) + } + return nil + }) +} + func (c *Client) touchFromAddr(addr net.Addr, keys []string, expiration int32) error { return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error { for _, key := range keys { @@ -465,6 +502,21 @@ func (c *Client) GetMulti(keys []string) (map[string]*Item, error) { return m, err } +// Stats returns current statistic +func (c *Client) Stats() Stats { + c.lk.RLock() + idleConns := 0 + for _, conns := range c.freeconn { + idleConns += len(conns) + } + c.lk.RUnlock() + + return Stats{ + ActiveConns: int(atomic.LoadInt32(&c.activeConns)), + IdleConns: idleConns, + } +} + // parseGetResponse reads a GET response from r and calls cb for each // read and allocated Item func parseGetResponse(r *bufio.Reader, cb func(*Item)) error { @@ -481,11 +533,14 @@ func parseGetResponse(r *bufio.Reader, cb func(*Item)) error { if err != nil { return err } - it.Value, err = ioutil.ReadAll(io.LimitReader(r, int64(size)+2)) + it.Value = make([]byte, size+2) + _, err = io.ReadFull(r, it.Value) if err != nil { + it.Value = nil return err } if !bytes.HasSuffix(it.Value, crlf) { + it.Value = nil return fmt.Errorf("memcache: corrupt get result read") } it.Value = it.Value[:size] @@ -641,6 +696,12 @@ func (c *Client) DeleteAll() error { }) } +// Ping checks all instances if they are alive. Returns error if any +// of them is down. +func (c *Client) Ping() error { + return c.selector.Each(c.ping) +} + // Increment atomically increments key by delta. The return value is // the new value after being incremented or an error. If the value // didn't exist in memcached the error is ErrCacheMiss. The value in diff --git a/memcache/memcache_test.go b/memcache/memcache_test.go index 4b52a911..70d47026 100644 --- a/memcache/memcache_test.go +++ b/memcache/memcache_test.go @@ -209,6 +209,9 @@ func testWithClient(t *testing.T, c *Client) { t.Errorf("post-DeleteAll want ErrCacheMiss, got %v", err) } + // Test Ping + err = c.Ping() + checkErr(err, "error ping: %s", err) } func testTouchWithClient(t *testing.T, c *Client) {