Groupcache

In-depth overview

22 October 2014

Mo Omer

Software Engineer, Campbell Mithun/CompassPoint Media/McCann Worldgroup

Shoutout to my colleague Kurt Froelich for writing some of the code in the examples while I was patching together Hadoop / data ingestion in Go from its outputs.

Common Caching Issues

Hot Key mirroring

Version/Namespace tracking

Distribution/Clustering

Common Caching Issues Cont.

Thundering Herds

Call me maybe: Redis
Call me maybe: Redis Redux

Enter Groupcache

From readme:

From dl.google.com: Powered by Go (talk by Brad Fitzpatrick):

Groupcache: Least Recently Used (LRU)

(image from Arkansas Tech University)

Seem like a simple caching strategy? It is. Underneath github.com/groupcache/lru we see an implementation built on container/list - a doubly linked list.

There are lots of caching algorithms for different needs, but we'll assume that for many use cases, LRU will likely be sufficient.

Note: In GroupCache, you pay for moving the most recently used item to the front of the list at every Add and Get.

Hold on, there's no versioning/TTL?

Roll Your Own

    h := fnv.New64a()

    // Hash of Timestamp rounded to previous hour
    h.Write([]byte(time.Now().UTC().Round(time.Hour).Add(-1 * time.Hour).String()))
    h.Write([]byte("UofMWeather/zip:get"))
    fmt.Print(hex.EncodeToString(h.Sum(nil)), "\n")
    // Hash of Timestamp rounded to this Hour
    h.Write([]byte(time.Now().UTC().Round(time.Hour).String()))
    h.Write([]byte("UofMWeather/zip:get"))
    fmt.Print(hex.EncodeToString(h.Sum(nil)), "\n")
    // Or, just append a rounded time to your key
    start := time.Now().UTC().Truncate(time.Duration(6) * time.Hour).Unix()
    fmt.Printf("%d__%s", start, "UofMWeather/zip:get")

playground

Usage: RPC Server

A Remote Procedure Call is an incredibly powerful technique for using functions made publicly available on another server's API.

Usage: RPC Server

Flags

func init() {
    flag.Var(&gcPeers, "cache-peers", "comma-separated list of groupcache peers")
}
func main() {
    gcTTL = flag.Int("cache-ttl", 6, "groupcache ttl in hours")
    gcSize = flag.Int64("cache-size", 1, "groupcache size in MB")
    var gcPoolHost = flag.String("cache-pool", "http://localhost", "Groupcache Pool Host") 
    var gcIp = flag.String("cache-ip", "0.0.0.0", "Groupcache Bind IP") 
    var gcPort = flag.Int("cache-port", 9901, "groupcache local port") 
    var gcExtPort = flag.Int("cache-xport", 9901, "groupcache Exported (host) port") 
    var rpcPort = flag.Int("rpc-port", 9001, "Dataccess frontend port") 

    flag.Parse() 

Usage: RPC Server cont.

Create a CacheGroup (a grouping of keys to be shared)

    poolEndpoint := fmt.Sprintf("%s:%d", *gcPoolHost, *gcExtPort)
    cachePool := groupcache.NewHTTPPool(poolEndpoint)

    var urlGroup = groupcache.NewGroup("ContextGroup", *gcSize<<20, groupcache.GetterFunc(
        func(ctx groupcache.Context, key string, dest groupcache.Sink) error {

            ogkey, _ := KeyGetOriginal(key) // Remove Timestamp/TTL

            // Query our 'Database'
            result := int(rand.New(rand.NewSource(time.Now().UnixNano())).Int31())
            reply := &Result{Value: result}

            jsonIn, _ := json.Marshal(reply)
            dest.SetBytes(jsonIn)
            return nil
        }))

Usage: RPC Server cont.

    // ... in main()
    cachePool.Set(gcPeers...)
    dataAccessServer := NewServer(urlGroup)

    rpcep := fmt.Sprintf(":%d", *rpcPort)
    dataAccessServer.start(rpcep)

    gcep := fmt.Sprintf("%s:%d", *gcIp, *gcPort)
    http.ListenAndServe(gcep, http.HandlerFunc(cachePool.ServeHTTP))

Usage: Via Web Server

func main() {
    var err error
    rpcConnection.rpcClient, err = rpc.DialHTTP("tcp", *dataHost)
    if err != nil {
        log.Fatal("Unable to initialize connection to RPC")
    }
    defer rpcConnection.rpcClient.Close()

    // Gorilla Mux router
    router = mux.NewRouter()
    apiV1 := router.PathPrefix("/api/v1").Subrouter()

    apiV1.HandleFunc("/ctx/", CtxUrlHandler).Queries("url", "")

    http.Handle("/", router)
    http.ListenAndServe("0.0.0.0:"+*port, nil)
}

Usage: Via Web Server cont.

func CtxUrlHandler(w http.ResponseWriter, r *http.Request) {
    ctxUrl := r.FormValue("url")
    args := &Load{ctxUrl}
    reply := &Result{Value: make([]byte, 0, 0)}

    accessC := rpcConnection.rpcClient.Go("DataAccess.GetCacheUrlNumber", args, reply, nil)
    replyC := <-accessC.Done

    if replyC.Error != nil {
        http.Error(w, replyC.Error.Error(), http.StatusInternalServerError)
        RpcConnectionRepair(replyC.Error)
        return
    }

    w.Write(reply.Value.([]byte))
    return
}

Thank you

Mo Omer

Software Engineer, Campbell Mithun/CompassPoint Media/McCann Worldgroup

Shoutout to my colleague Kurt Froelich for writing some of the code in the examples while I was patching together Hadoop / data ingestion in Go from its outputs.