amtoaer

晓风残月

叹息似的渺茫,你仍要保存着那真!
github
x
telegram
steam
nintendo switch
email

The role, implementation, and reflection of singleflight

Recently, I learned about singleflight in GeeCache and wanted to write an article to share my understanding.

What is it?#

First, let's introduce the concept of cache breakdown:

When a key exists and a large number of requests arrive at the moment the cache expires, these requests will all hit the database, causing a sudden increase in database requests and pressure.

It's actually easy to understand. Let's simplify the cache as map[string]interface{}, and the get(key) operation can be divided into three steps:

  1. Check if the key exists in the map, and if it does, return it directly.
  2. If the key does not exist, call fn(key) to retrieve the data from the database.
  3. After the call is completed and the database returns the result, cache the result in the map and return it.

When there is a sudden influx of requests and the key does not exist in the map, the first request will go to step 2 and call fn(key) to access the database. While the fn(key) of the first request has not returned yet, subsequent requests arrive. The result can only be cached after the function call is completed, but at this time the function has not returned yet. Therefore, the subsequent requests will also see that the key does not exist in the cache and continue to call fn(key) to access the database, ultimately leading to a large number of requests directly hitting the database, just like the cache has been broken.

How to solve this problem? One straightforward idea is to let the subsequent requests "realize" that fn is being called at this time, so that the subsequent requests do not need to be called repeatedly and can wait for the existing fn to return the result. This is what singleflight does.

How does it work?#

First, let's refer to the implementation in groupcache:

// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight

import "sync"

// call is an in-flight or completed Do call
type call struct {
	wg  sync.WaitGroup
	val interface{}
	err error
}

// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		g.mu.Unlock()
		c.wg.Wait()
		return c.val, c.err
	}
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	c.val, c.err = fn()
	c.wg.Done()

	g.mu.Lock()
	delete(g.m, key)
	g.mu.Unlock()

	return c.val, c.err
}

The implementation is very simple. It abstracts a function call into the call structure, which stores the return result val and err of the function call, as well as a sync.WaitGroup used to implement "singleton".

Group is the core of implementing non-repetitive calls. It has a built-in mapping from keys to function calls and a mutex to protect the mapping.

When calling the Do method:

  1. The mapping is lazily loaded.
  2. Check if the function call corresponding to the key already exists. If it does, wait for the function to return the result directly.
  3. If it does not exist, initialize a new function call, save it to the mapping, and then call the function. After the function call is completed, delete it from the mapping.

In this code, the use of sync.WaitGroup is particularly clever. I mentioned in my previous article Understanding sync.Cond in Go that:

sync.WaitGroup is also used for goroutine synchronization, but its use case is exactly the opposite of sync.Cond. The latter is mostly used for multiple goroutines waiting and single goroutine notifying, while the former is mostly used for single goroutine waiting for multiple goroutines to complete.

In this case, the author achieves a similar effect to sync.Cond by using sync.WaitGroup flexibly, which can be considered elegant.

What are the issues?#

The above code will not have any problems as long as fn returns normally, but we have to consider exceptional cases. What if fn encounters a problem during execution?

Consider a scenario where fn is delayed for some reason and does not return for a long time. This will cause a large number of requests to be blocked at the c.wg.Wait() position, which may lead to:

  • A large increase in the number of goroutines
  • A sharp increase in memory usage
  • ...

How to solve this? We can refer to the official implementation. In the official extended version, two public methods are added to Group:

  • func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

    DoChan is like Do but returns a channel that will receive the results when they are ready.

    The returned channel will not be closed.

    DoChan is similar to Do, but it returns a channel that will receive the results when they are ready.

    The returned channel will not be closed.

  • func (g *Group) Forget(key string)

    Forget tells the singleflight to forget about a key. Future calls to Do for this key will call the function rather than waiting for an earlier call to complete.

    Forget tells the singleflight to forget about a key. Future calls to Do for this key will call the function rather than waiting for an earlier call to complete.

The former, DoChan, can solve the above problem well: because the result is returned as a channel instead of a value, users can control the timeout to prevent requests from being blocked for a long time:

ch := g.DoChan(key, func() (interface{}, error) {
    ...
    return result, err
})

timeout := time.After(500 * time.Millisecond)

select {
case <-timeout:
    // Timeout
    return
case <-ch:
    // Return the result
}

The latter, Forget, has a main use case that I found in How to use sync.singleflight correctly?:

In some scenarios where high availability is required, a certain request saturation is often needed to ensure the final success rate of the business. For downstream services, there is not much difference between one request and multiple requests. At this time, using singleflight is just to reduce the number of requests. In this case, using Forget() can increase the concurrency of downstream requests:

v, _, shared := g.Do(key, func() (interface{}, error) {
    go func() {
        time.Sleep(10 * time.Millisecond)
        fmt.Printf("Deleting key: %v\n", key)
        g.Forget(key)
    }()
    ret, err := find(context.Background(), key)
    return ret, err
})

When one concurrent request exceeds 10ms, a second request will be initiated. At this time, only one request within 10ms can be initiated at most, which means the maximum concurrency is 100 QPS. The impact of a single request failure is greatly reduced.

References#

In no particular order:

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.