package cache

import (
	"context"
	"fmt"
	"sort"
	"strings"
	"sync"
	"time"

	"github.com/containerd/containerd/content"
	"github.com/containerd/containerd/diff"
	"github.com/containerd/containerd/errdefs"
	"github.com/containerd/containerd/filters"
	"github.com/containerd/containerd/gc"
	"github.com/containerd/containerd/leases"
	"github.com/docker/docker/pkg/idtools"
	"github.com/moby/buildkit/cache/metadata"
	"github.com/moby/buildkit/client"
	"github.com/moby/buildkit/identity"
	"github.com/moby/buildkit/session"
	"github.com/moby/buildkit/snapshot"
	"github.com/moby/buildkit/util/bklog"
	"github.com/moby/buildkit/util/flightcontrol"
	"github.com/moby/buildkit/util/progress"
	digest "github.com/opencontainers/go-digest"
	imagespecidentity "github.com/opencontainers/image-spec/identity"
	ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
	"github.com/pkg/errors"
	"golang.org/x/sync/errgroup"
)

var (
	ErrLocked   = errors.New("locked")
	errNotFound = errors.New("not found")
	errInvalid  = errors.New("invalid")
)

type ManagerOpt struct {
	Snapshotter     snapshot.Snapshotter
	ContentStore    content.Store
	LeaseManager    leases.Manager
	PruneRefChecker ExternalRefCheckerFunc
	GarbageCollect  func(ctx context.Context) (gc.Stats, error)
	Applier         diff.Applier
	Differ          diff.Comparer
	MetadataStore   *metadata.Store
	MountPoolRoot   string
}

type Accessor interface {
	MetadataStore

	GetByBlob(ctx context.Context, desc ocispecs.Descriptor, parent ImmutableRef, opts ...RefOption) (ImmutableRef, error)
	Get(ctx context.Context, id string, pg progress.Controller, opts ...RefOption) (ImmutableRef, error)

	New(ctx context.Context, parent ImmutableRef, s session.Group, opts ...RefOption) (MutableRef, error)
	GetMutable(ctx context.Context, id string, opts ...RefOption) (MutableRef, error) // Rebase?
	IdentityMapping() *idtools.IdentityMapping
	Merge(ctx context.Context, parents []ImmutableRef, pg progress.Controller, opts ...RefOption) (ImmutableRef, error)
	Diff(ctx context.Context, lower, upper ImmutableRef, pg progress.Controller, opts ...RefOption) (ImmutableRef, error)
}

type Controller interface {
	DiskUsage(ctx context.Context, info client.DiskUsageInfo) ([]*client.UsageInfo, error)
	Prune(ctx context.Context, ch chan client.UsageInfo, info ...client.PruneInfo) error
}

type Manager interface {
	Accessor
	Controller
	Close() error
}

type ExternalRefCheckerFunc func() (ExternalRefChecker, error)

type ExternalRefChecker interface {
	Exists(string, []digest.Digest) bool
}

type cacheManager struct {
	records         map[string]*cacheRecord
	mu              sync.Mutex
	Snapshotter     snapshot.MergeSnapshotter
	ContentStore    content.Store
	LeaseManager    leases.Manager
	PruneRefChecker ExternalRefCheckerFunc
	GarbageCollect  func(ctx context.Context) (gc.Stats, error)
	Applier         diff.Applier
	Differ          diff.Comparer
	MetadataStore   *metadata.Store

	mountPool sharableMountPool

	muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
	unlazyG flightcontrol.Group[struct{}]
}

func NewManager(opt ManagerOpt) (Manager, error) {
	cm := &cacheManager{
		Snapshotter:     snapshot.NewMergeSnapshotter(context.TODO(), opt.Snapshotter, opt.LeaseManager),
		ContentStore:    opt.ContentStore,
		LeaseManager:    opt.LeaseManager,
		PruneRefChecker: opt.PruneRefChecker,
		GarbageCollect:  opt.GarbageCollect,
		Applier:         opt.Applier,
		Differ:          opt.Differ,
		MetadataStore:   opt.MetadataStore,
		records:         make(map[string]*cacheRecord),
	}

	if err := cm.init(context.TODO()); err != nil {
		return nil, err
	}

	p, err := newSharableMountPool(opt.MountPoolRoot)
	if err != nil {
		return nil, err
	}
	cm.mountPool = p

	// cm.scheduleGC(5 * time.Minute)

	return cm, nil
}

func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor, parent ImmutableRef, opts ...RefOption) (ir ImmutableRef, rerr error) {
	diffID, err := diffIDFromDescriptor(desc)
	if err != nil {
		return nil, err
	}
	chainID := diffID
	blobChainID := imagespecidentity.ChainID([]digest.Digest{desc.Digest, diffID})

	descHandlers := descHandlersOf(opts...)
	if desc.Digest != "" && (descHandlers == nil || descHandlers[desc.Digest] == nil) {
		if _, err := cm.ContentStore.Info(ctx, desc.Digest); errors.Is(err, errdefs.ErrNotFound) {
			return nil, NeedsRemoteProviderError([]digest.Digest{desc.Digest})
		} else if err != nil {
			return nil, err
		}
	}

	var p *immutableRef
	if parent != nil {
		p2, err := cm.Get(ctx, parent.ID(), nil, NoUpdateLastUsed, descHandlers)
		if err != nil {
			return nil, err
		}
		p = p2.(*immutableRef)

		if err := p.Finalize(ctx); err != nil {
			p.Release(context.TODO())
			return nil, err
		}

		if p.getChainID() == "" || p.getBlobChainID() == "" {
			p.Release(context.TODO())
			return nil, errors.Errorf("failed to get ref by blob on non-addressable parent")
		}
		chainID = imagespecidentity.ChainID([]digest.Digest{p.getChainID(), chainID})
		blobChainID = imagespecidentity.ChainID([]digest.Digest{p.getBlobChainID(), blobChainID})
	}

	releaseParent := false
	defer func() {
		if releaseParent || rerr != nil && p != nil {
			p.Release(context.TODO())
		}
	}()

	cm.mu.Lock()
	defer cm.mu.Unlock()

	sis, err := cm.searchBlobchain(ctx, blobChainID)
	if err != nil {
		return nil, err
	}

	for _, si := range sis {
		ref, err := cm.get(ctx, si.ID(), nil, opts...)
		if err != nil {
			if errors.As(err, &NeedsRemoteProviderError{}) {
				// This shouldn't happen and indicates that blobchain IDs are being set incorrectly,
				// but if it does happen it's not fatal as we can just not try to re-use by blobchainID.
				// Log the error but continue.
				bklog.G(ctx).Errorf("missing providers for ref with equivalent blobchain ID %s", blobChainID)
			} else if !IsNotFound(err) {
				return nil, errors.Wrapf(err, "failed to get record %s by blobchainid", sis[0].ID())
			}
		}
		if ref == nil {
			continue
		}
		if p != nil {
			releaseParent = true
		}
		if err := setImageRefMetadata(ref.cacheMetadata, opts...); err != nil {
			return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", ref.ID())
		}
		return ref, nil
	}

	sis, err = cm.searchChain(ctx, chainID)
	if err != nil {
		return nil, err
	}

	var link *immutableRef
	for _, si := range sis {
		ref, err := cm.get(ctx, si.ID(), nil, opts...)
		// if the error was NotFound or NeedsRemoteProvider, we can't re-use the snapshot from the blob so just skip it
		if err != nil && !IsNotFound(err) && !errors.As(err, &NeedsRemoteProviderError{}) {
			return nil, errors.Wrapf(err, "failed to get record %s by chainid", si.ID())
		}
		if ref != nil {
			link = ref
			break
		}
	}

	id := identity.NewID()
	snapshotID := chainID.String()
	if link != nil {
		snapshotID = link.getSnapshotID()
		go link.Release(context.TODO())
	}

	l, err := cm.LeaseManager.Create(ctx, func(l *leases.Lease) error {
		l.ID = id
		l.Labels = map[string]string{
			"containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano),
		}
		return nil
	})
	if err != nil {
		return nil, errors.Wrap(err, "failed to create lease")
	}

	defer func() {
		if rerr != nil {
			if err := cm.LeaseManager.Delete(context.TODO(), leases.Lease{
				ID: l.ID,
			}); err != nil {
				bklog.G(ctx).Errorf("failed to remove lease: %+v", err)
			}
		}
	}()

	if err := cm.LeaseManager.AddResource(ctx, l, leases.Resource{
		ID:   snapshotID,
		Type: "snapshots/" + cm.Snapshotter.Name(),
	}); err != nil && !errdefs.IsAlreadyExists(err) {
		return nil, errors.Wrapf(err, "failed to add snapshot %s to lease", id)
	}

	if desc.Digest != "" {
		if err := cm.LeaseManager.AddResource(ctx, leases.Lease{ID: id}, leases.Resource{
			ID:   desc.Digest.String(),
			Type: "content",
		}); err != nil {
			return nil, errors.Wrapf(err, "failed to add blob %s to lease", id)
		}
	}

	md, _ := cm.getMetadata(id)

	rec := &cacheRecord{
		mu:            &sync.Mutex{},
		cm:            cm,
		refs:          make(map[ref]struct{}),
		parentRefs:    parentRefs{layerParent: p},
		cacheMetadata: md,
	}

	if err := initializeMetadata(rec.cacheMetadata, rec.parentRefs, opts...); err != nil {
		return nil, err
	}

	if err := setImageRefMetadata(rec.cacheMetadata, opts...); err != nil {
		return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", rec.ID())
	}

	rec.queueDiffID(diffID)
	rec.queueBlob(desc.Digest)
	rec.queueChainID(chainID)
	rec.queueBlobChainID(blobChainID)
	rec.queueSnapshotID(snapshotID)
	rec.queueBlobOnly(true)
	rec.queueMediaType(desc.MediaType)
	rec.queueBlobSize(desc.Size)
	rec.appendURLs(desc.URLs)
	rec.queueCommitted(true)

	if err := rec.commitMetadata(); err != nil {
		return nil, err
	}

	cm.records[id] = rec

	ref := rec.ref(true, descHandlers, nil)
	if s := unlazySessionOf(opts...); s != nil {
		if err := ref.unlazy(ctx, ref.descHandlers, ref.progress, s, true); err != nil {
			return nil, err
		}
	}

	return ref, nil
}

// init loads all snapshots from metadata state and tries to load the records
// from the snapshotter. If snaphot can't be found, metadata is deleted as well.
func (cm *cacheManager) init(ctx context.Context) error {
	items, err := cm.MetadataStore.All()
	if err != nil {
		return err
	}

	for _, si := range items {
		if _, err := cm.getRecord(ctx, si.ID()); err != nil {
			bklog.G(ctx).Debugf("could not load snapshot %s: %+v", si.ID(), err)
			cm.MetadataStore.Clear(si.ID())
			cm.LeaseManager.Delete(ctx, leases.Lease{ID: si.ID()})
		}
	}
	return nil
}

// IdentityMapping returns the userns remapping used for refs
func (cm *cacheManager) IdentityMapping() *idtools.IdentityMapping {
	return cm.Snapshotter.IdentityMapping()
}

// Close closes the manager and releases the metadata database lock. No other
// method should be called after Close.
func (cm *cacheManager) Close() error {
	// TODO: allocate internal context and cancel it here
	return cm.MetadataStore.Close()
}

// Get returns an immutable snapshot reference for ID
func (cm *cacheManager) Get(ctx context.Context, id string, pg progress.Controller, opts ...RefOption) (ImmutableRef, error) {
	cm.mu.Lock()
	defer cm.mu.Unlock()
	return cm.get(ctx, id, pg, opts...)
}

// get requires manager lock to be taken
func (cm *cacheManager) get(ctx context.Context, id string, pg progress.Controller, opts ...RefOption) (*immutableRef, error) {
	rec, err := cm.getRecord(ctx, id, opts...)
	if err != nil {
		return nil, err
	}
	rec.mu.Lock()
	defer rec.mu.Unlock()

	triggerUpdate := true
	for _, o := range opts {
		if o == NoUpdateLastUsed {
			triggerUpdate = false
		}
	}

	descHandlers := descHandlersOf(opts...)

	if rec.mutable {
		if len(rec.refs) != 0 {
			return nil, errors.Wrapf(ErrLocked, "%s is locked", id)
		}
		if rec.equalImmutable != nil {
			return rec.equalImmutable.ref(triggerUpdate, descHandlers, pg), nil
		}
		return rec.mref(triggerUpdate, descHandlers).commit(ctx)
	}

	return rec.ref(triggerUpdate, descHandlers, pg), nil
}

// getRecord returns record for id. Requires manager lock.
func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOption) (cr *cacheRecord, retErr error) {
	checkLazyProviders := func(rec *cacheRecord) error {
		missing := NeedsRemoteProviderError(nil)
		dhs := descHandlersOf(opts...)
		if err := rec.walkUniqueAncestors(func(cr *cacheRecord) error {
			blob := cr.getBlob()
			if isLazy, err := cr.isLazy(ctx); err != nil {
				return err
			} else if isLazy && dhs[blob] == nil {
				missing = append(missing, blob)
			}
			return nil
		}); err != nil {
			return err
		}
		if len(missing) > 0 {
			return missing
		}
		return nil
	}

	if rec, ok := cm.records[id]; ok {
		if rec.isDead() {
			return nil, errors.Wrapf(errNotFound, "failed to get dead record %s", id)
		}
		if err := checkLazyProviders(rec); err != nil {
			return nil, err
		}
		return rec, nil
	}

	md, ok := cm.getMetadata(id)
	if !ok {
		return nil, errors.Wrap(errNotFound, id)
	}

	parents, err := cm.parentsOf(ctx, md, opts...)
	if err != nil {
		return nil, errors.Wrapf(err, "failed to get parents")
	}
	defer func() {
		if retErr != nil {
			parents.release(context.TODO())
		}
	}()

	if mutableID := md.getEqualMutable(); mutableID != "" {
		mutable, err := cm.getRecord(ctx, mutableID)
		if err == nil {
			rec := &cacheRecord{
				mu:            &sync.Mutex{},
				cm:            cm,
				refs:          make(map[ref]struct{}),
				parentRefs:    parents,
				cacheMetadata: md,
				equalMutable:  &mutableRef{cacheRecord: mutable},
			}
			mutable.equalImmutable = &immutableRef{cacheRecord: rec}
			cm.records[id] = rec
			return rec, nil
		} else if IsNotFound(err) {
			// The equal mutable for this ref is not found, check to see if our snapshot exists
			if _, statErr := cm.Snapshotter.Stat(ctx, md.getSnapshotID()); statErr != nil {
				// this ref's snapshot also doesn't exist, just remove this record
				cm.MetadataStore.Clear(id)
				return nil, errors.Wrap(errNotFound, id)
			}
			// Our snapshot exists, so there may have been a crash while finalizing this ref.
			// Clear the equal mutable field and continue using this ref.
			md.clearEqualMutable()
			md.commitMetadata()
		} else {
			return nil, err
		}
	}

	rec := &cacheRecord{
		mu:            &sync.Mutex{},
		mutable:       !md.getCommitted(),
		cm:            cm,
		refs:          make(map[ref]struct{}),
		parentRefs:    parents,
		cacheMetadata: md,
	}

	// TODO:(sipsma) this is kludge to deal with a bug in v0.10.{0,1} where
	// merge and diff refs didn't have committed set to true:
	// https://github.com/moby/buildkit/issues/2740
	if kind := rec.kind(); kind == Merge || kind == Diff {
		rec.mutable = false
	}

	// the record was deleted but we crashed before data on disk was removed
	if md.getDeleted() {
		if err := rec.remove(ctx, true); err != nil {
			return nil, err
		}
		return nil, errors.Wrapf(errNotFound, "failed to get deleted record %s", id)
	}

	if rec.mutable {
		// If the record is mutable, then the snapshot must exist
		if _, err := cm.Snapshotter.Stat(ctx, rec.ID()); err != nil {
			if !errdefs.IsNotFound(err) {
				return nil, errors.Wrap(err, "failed to check mutable ref snapshot")
			}
			// the snapshot doesn't exist, clear this record
			if err := rec.remove(ctx, true); err != nil {
				return nil, errors.Wrap(err, "failed to remove mutable rec with missing snapshot")
			}
			return nil, errors.Wrap(errNotFound, rec.ID())
		}
	}

	if err := initializeMetadata(rec.cacheMetadata, rec.parentRefs, opts...); err != nil {
		return nil, err
	}

	if err := setImageRefMetadata(rec.cacheMetadata, opts...); err != nil {
		return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", rec.ID())
	}

	cm.records[id] = rec
	if err := checkLazyProviders(rec); err != nil {
		return nil, err
	}
	return rec, nil
}

func (cm *cacheManager) parentsOf(ctx context.Context, md *cacheMetadata, opts ...RefOption) (ps parentRefs, rerr error) {
	defer func() {
		if rerr != nil {
			ps.release(context.TODO())
		}
	}()
	if parentID := md.getParent(); parentID != "" {
		p, err := cm.get(ctx, parentID, nil, append(opts, NoUpdateLastUsed))
		if err != nil {
			return ps, err
		}
		ps.layerParent = p
		return ps, nil
	}
	for _, parentID := range md.getMergeParents() {
		p, err := cm.get(ctx, parentID, nil, append(opts, NoUpdateLastUsed))
		if err != nil {
			return ps, err
		}
		ps.mergeParents = append(ps.mergeParents, p)
	}
	if lowerParentID := md.getLowerDiffParent(); lowerParentID != "" {
		p, err := cm.get(ctx, lowerParentID, nil, append(opts, NoUpdateLastUsed))
		if err != nil {
			return ps, err
		}
		if ps.diffParents == nil {
			ps.diffParents = &diffParents{}
		}
		ps.diffParents.lower = p
	}
	if upperParentID := md.getUpperDiffParent(); upperParentID != "" {
		p, err := cm.get(ctx, upperParentID, nil, append(opts, NoUpdateLastUsed))
		if err != nil {
			return ps, err
		}
		if ps.diffParents == nil {
			ps.diffParents = &diffParents{}
		}
		ps.diffParents.upper = p
	}
	return ps, nil
}

func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, sess session.Group, opts ...RefOption) (mr MutableRef, err error) {
	id := identity.NewID()

	var parent *immutableRef
	var parentSnapshotID string
	if s != nil {
		if _, ok := s.(*immutableRef); ok {
			parent = s.Clone().(*immutableRef)
		} else {
			p, err := cm.Get(ctx, s.ID(), nil, append(opts, NoUpdateLastUsed)...)
			if err != nil {
				return nil, err
			}
			parent = p.(*immutableRef)
		}
		if err := parent.Finalize(ctx); err != nil {
			return nil, err
		}
		if err := parent.Extract(ctx, sess); err != nil {
			return nil, err
		}
		parentSnapshotID = parent.getSnapshotID()
	}

	defer func() {
		if err != nil && parent != nil {
			parent.Release(context.TODO())
		}
	}()

	l, err := cm.LeaseManager.Create(ctx, func(l *leases.Lease) error {
		l.ID = id
		l.Labels = map[string]string{
			"containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano),
		}
		return nil
	})
	if err != nil {
		return nil, errors.Wrap(err, "failed to create lease")
	}

	defer func() {
		if err != nil {
			if err := cm.LeaseManager.Delete(context.TODO(), leases.Lease{
				ID: l.ID,
			}); err != nil {
				bklog.G(ctx).Errorf("failed to remove lease: %+v", err)
			}
		}
	}()

	snapshotID := id
	if err := cm.LeaseManager.AddResource(ctx, l, leases.Resource{
		ID:   snapshotID,
		Type: "snapshots/" + cm.Snapshotter.Name(),
	}); err != nil && !errdefs.IsAlreadyExists(err) {
		return nil, errors.Wrapf(err, "failed to add snapshot %s to lease", snapshotID)
	}

	if cm.Snapshotter.Name() == "stargz" && parent != nil {
		if rerr := parent.withRemoteSnapshotLabelsStargzMode(ctx, sess, func() {
			err = cm.Snapshotter.Prepare(ctx, snapshotID, parentSnapshotID)
		}); rerr != nil {
			return nil, rerr
		}
	} else {
		err = cm.Snapshotter.Prepare(ctx, snapshotID, parentSnapshotID)
	}
	if err != nil {
		return nil, errors.Wrapf(err, "failed to prepare %v as %s", parentSnapshotID, snapshotID)
	}

	cm.mu.Lock()
	defer cm.mu.Unlock()

	md, _ := cm.getMetadata(id)

	rec := &cacheRecord{
		mu:            &sync.Mutex{},
		mutable:       true,
		cm:            cm,
		refs:          make(map[ref]struct{}),
		parentRefs:    parentRefs{layerParent: parent},
		cacheMetadata: md,
	}

	opts = append(opts, withSnapshotID(snapshotID))
	if err := initializeMetadata(rec.cacheMetadata, rec.parentRefs, opts...); err != nil {
		return nil, err
	}

	if err := setImageRefMetadata(rec.cacheMetadata, opts...); err != nil {
		return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", rec.ID())
	}

	cm.records[id] = rec // TODO: save to db

	// parent refs are possibly lazy so keep it hold the description handlers.
	var dhs DescHandlers
	if parent != nil {
		dhs = parent.descHandlers
	}
	return rec.mref(true, dhs), nil
}

func (cm *cacheManager) GetMutable(ctx context.Context, id string, opts ...RefOption) (MutableRef, error) {
	cm.mu.Lock()
	defer cm.mu.Unlock()

	rec, err := cm.getRecord(ctx, id, opts...)
	if err != nil {
		return nil, err
	}

	rec.mu.Lock()
	defer rec.mu.Unlock()
	if !rec.mutable {
		return nil, errors.Wrapf(errInvalid, "%s is not mutable", id)
	}

	if len(rec.refs) != 0 {
		return nil, errors.Wrapf(ErrLocked, "%s is locked", id)
	}

	if rec.equalImmutable != nil {
		if len(rec.equalImmutable.refs) != 0 {
			return nil, errors.Wrapf(ErrLocked, "%s is locked", id)
		}
		delete(cm.records, rec.equalImmutable.ID())
		if err := rec.equalImmutable.remove(ctx, false); err != nil {
			return nil, err
		}
		rec.equalImmutable = nil
	}

	return rec.mref(true, descHandlersOf(opts...)), nil
}

func (cm *cacheManager) Merge(ctx context.Context, inputParents []ImmutableRef, pg progress.Controller, opts ...RefOption) (ir ImmutableRef, rerr error) {
	// TODO:(sipsma) optimize merge further by
	// * Removing repeated occurrences of input layers (only leaving the uppermost)
	// * Reusing existing merges that are equivalent to this one
	// * Reusing existing merges that can be used as a base for this one
	// * Calculating diffs only once (across both merges and during computeBlobChain). Save diff metadata so it can be reapplied.
	// These optimizations may make sense here in cache, in the snapshotter or both.
	// Be sure that any optimizations handle existing pre-optimization refs correctly.

	parents := parentRefs{mergeParents: make([]*immutableRef, 0, len(inputParents))}
	dhs := make(map[digest.Digest]*DescHandler)
	defer func() {
		if rerr != nil {
			parents.release(context.TODO())
		}
	}()
	for _, inputParent := range inputParents {
		if inputParent == nil {
			continue
		}
		var parent *immutableRef
		if p, ok := inputParent.(*immutableRef); ok {
			parent = p
		} else {
			// inputParent implements ImmutableRef but isn't our internal struct, get an instance of the internal struct
			// by calling Get on its ID.
			p, err := cm.Get(ctx, inputParent.ID(), nil, append(opts, NoUpdateLastUsed)...)
			if err != nil {
				return nil, err
			}
			parent = p.(*immutableRef)
			defer parent.Release(context.TODO())
		}
		// On success, cloned parents will be not be released and will be owned by the returned ref
		switch parent.kind() {
		case Merge:
			// if parent is itself a merge, flatten it out by just setting our parents directly to its parents
			for _, grandparent := range parent.mergeParents {
				parents.mergeParents = append(parents.mergeParents, grandparent.clone())
			}
		default:
			parents.mergeParents = append(parents.mergeParents, parent.clone())
		}
		for dgst, handler := range parent.descHandlers {
			dhs[dgst] = handler
		}
	}

	// On success, createMergeRef takes ownership of parents
	mergeRef, err := cm.createMergeRef(ctx, parents, dhs, pg, opts...)
	if err != nil {
		return nil, err
	}
	return mergeRef, nil
}

func (cm *cacheManager) createMergeRef(ctx context.Context, parents parentRefs, dhs DescHandlers, pg progress.Controller, opts ...RefOption) (ir *immutableRef, rerr error) {
	if len(parents.mergeParents) == 0 {
		// merge of nothing is nothing
		return nil, nil
	}
	if len(parents.mergeParents) == 1 {
		// merge of 1 thing is that thing
		parents.mergeParents[0].progress = pg
		return parents.mergeParents[0], nil
	}

	for _, parent := range parents.mergeParents {
		if err := parent.Finalize(ctx); err != nil {
			return nil, errors.Wrapf(err, "failed to finalize parent during merge")
		}
	}

	cm.mu.Lock()
	defer cm.mu.Unlock()

	// Build the new ref
	id := identity.NewID()
	md, _ := cm.getMetadata(id)

	rec := &cacheRecord{
		mu:            &sync.Mutex{},
		mutable:       false,
		cm:            cm,
		cacheMetadata: md,
		parentRefs:    parents,
		refs:          make(map[ref]struct{}),
	}

	if err := initializeMetadata(rec.cacheMetadata, rec.parentRefs, opts...); err != nil {
		return nil, err
	}

	snapshotID := id
	l, err := cm.LeaseManager.Create(ctx, func(l *leases.Lease) error {
		l.ID = id
		l.Labels = map[string]string{
			"containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano),
		}
		return nil
	})
	if err != nil {
		return nil, errors.Wrap(err, "failed to create lease")
	}
	defer func() {
		if rerr != nil {
			if err := cm.LeaseManager.Delete(context.TODO(), leases.Lease{
				ID: l.ID,
			}); err != nil {
				bklog.G(ctx).Errorf("failed to remove lease: %+v", err)
			}
		}
	}()

	if err := cm.LeaseManager.AddResource(ctx, leases.Lease{ID: id}, leases.Resource{
		ID:   snapshotID,
		Type: "snapshots/" + cm.Snapshotter.Name(),
	}); err != nil {
		return nil, err
	}

	rec.queueSnapshotID(snapshotID)
	rec.queueCommitted(true)
	if err := rec.commitMetadata(); err != nil {
		return nil, err
	}

	cm.records[id] = rec

	return rec.ref(true, dhs, pg), nil
}

func (cm *cacheManager) Diff(ctx context.Context, lower, upper ImmutableRef, pg progress.Controller, opts ...RefOption) (ir ImmutableRef, rerr error) {
	if lower == nil {
		return nil, errors.New("lower ref for diff cannot be nil")
	}

	var dps diffParents
	parents := parentRefs{diffParents: &dps}
	dhs := make(map[digest.Digest]*DescHandler)
	defer func() {
		if rerr != nil {
			parents.release(context.TODO())
		}
	}()
	for i, inputParent := range []ImmutableRef{lower, upper} {
		if inputParent == nil {
			continue
		}
		var parent *immutableRef
		if p, ok := inputParent.(*immutableRef); ok {
			parent = p
		} else {
			// inputParent implements ImmutableRef but isn't our internal struct, get an instance of the internal struct
			// by calling Get on its ID.
			p, err := cm.Get(ctx, inputParent.ID(), nil, append(opts, NoUpdateLastUsed)...)
			if err != nil {
				return nil, err
			}
			parent = p.(*immutableRef)
			defer parent.Release(context.TODO())
		}
		// On success, cloned parents will not be released and will be owned by the returned ref
		if i == 0 {
			dps.lower = parent.clone()
		} else {
			dps.upper = parent.clone()
		}
		for dgst, handler := range parent.descHandlers {
			dhs[dgst] = handler
		}
	}

	// Check to see if lower is an ancestor of upper. If so, define the diff as a merge
	// of the layers separating the two. This can result in a different diff than just
	// running the differ directly on lower and upper, but this is chosen as a default
	// behavior in order to maximize layer re-use in the default case. We may add an
	// option for controlling this behavior in the future if it's needed.
	if dps.upper != nil {
		lowerLayers := dps.lower.layerChain()
		upperLayers := dps.upper.layerChain()
		var lowerIsAncestor bool
		// when upper is only 1 layer different than lower, we can skip this as we
		// won't need a merge in order to get optimal behavior.
		if len(upperLayers) > len(lowerLayers)+1 {
			lowerIsAncestor = true
			for i, lowerLayer := range lowerLayers {
				if lowerLayer.ID() != upperLayers[i].ID() {
					lowerIsAncestor = false
					break
				}
			}
		}
		if lowerIsAncestor {
			mergeParents := parentRefs{mergeParents: make([]*immutableRef, len(upperLayers)-len(lowerLayers))}
			defer func() {
				if rerr != nil {
					mergeParents.release(context.TODO())
				}
			}()
			for i := len(lowerLayers); i < len(upperLayers); i++ {
				subUpper := upperLayers[i]
				subLower := subUpper.layerParent
				// On success, cloned refs will not be released and will be owned by the returned ref
				if subLower == nil {
					mergeParents.mergeParents[i-len(lowerLayers)] = subUpper.clone()
				} else {
					subParents := parentRefs{diffParents: &diffParents{lower: subLower.clone(), upper: subUpper.clone()}}
					diffRef, err := cm.createDiffRef(ctx, subParents, subUpper.descHandlers, pg,
						WithDescription(fmt.Sprintf("diff %q -> %q", subLower.ID(), subUpper.ID())))
					if err != nil {
						subParents.release(context.TODO())
						return nil, err
					}
					mergeParents.mergeParents[i-len(lowerLayers)] = diffRef
				}
			}
			// On success, createMergeRef takes ownership of mergeParents
			mergeRef, err := cm.createMergeRef(ctx, mergeParents, dhs, pg)
			if err != nil {
				return nil, err
			}
			parents.release(context.TODO())
			return mergeRef, nil
		}
	}

	// On success, createDiffRef takes ownership of parents
	diffRef, err := cm.createDiffRef(ctx, parents, dhs, pg, opts...)
	if err != nil {
		return nil, err
	}
	return diffRef, nil
}

func (cm *cacheManager) createDiffRef(ctx context.Context, parents parentRefs, dhs DescHandlers, pg progress.Controller, opts ...RefOption) (ir *immutableRef, rerr error) {
	dps := parents.diffParents
	if err := dps.lower.Finalize(ctx); err != nil {
		return nil, errors.Wrapf(err, "failed to finalize lower parent during diff")
	}
	if dps.upper != nil {
		if err := dps.upper.Finalize(ctx); err != nil {
			return nil, errors.Wrapf(err, "failed to finalize upper parent during diff")
		}
	}

	id := identity.NewID()

	snapshotID := id

	l, err := cm.LeaseManager.Create(ctx, func(l *leases.Lease) error {
		l.ID = id
		l.Labels = map[string]string{
			"containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano),
		}
		return nil
	})
	if err != nil {
		return nil, errors.Wrap(err, "failed to create lease")
	}
	defer func() {
		if rerr != nil {
			if err := cm.LeaseManager.Delete(context.TODO(), leases.Lease{
				ID: l.ID,
			}); err != nil {
				bklog.G(ctx).Errorf("failed to remove lease: %+v", err)
			}
		}
	}()

	if err := cm.LeaseManager.AddResource(ctx, leases.Lease{ID: id}, leases.Resource{
		ID:   snapshotID,
		Type: "snapshots/" + cm.Snapshotter.Name(),
	}); err != nil {
		return nil, err
	}

	cm.mu.Lock()
	defer cm.mu.Unlock()

	// Build the new ref
	md, _ := cm.getMetadata(id)

	rec := &cacheRecord{
		mu:            &sync.Mutex{},
		mutable:       false,
		cm:            cm,
		cacheMetadata: md,
		parentRefs:    parents,
		refs:          make(map[ref]struct{}),
	}

	if err := initializeMetadata(rec.cacheMetadata, rec.parentRefs, opts...); err != nil {
		return nil, err
	}

	rec.queueSnapshotID(snapshotID)
	rec.queueCommitted(true)
	if err := rec.commitMetadata(); err != nil {
		return nil, err
	}

	cm.records[id] = rec

	return rec.ref(true, dhs, pg), nil
}

func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo, opts ...client.PruneInfo) error {
	cm.muPrune.Lock()

	for _, opt := range opts {
		if err := cm.pruneOnce(ctx, ch, opt); err != nil {
			cm.muPrune.Unlock()
			return err
		}
	}

	cm.muPrune.Unlock()

	if cm.GarbageCollect != nil {
		if _, err := cm.GarbageCollect(ctx); err != nil {
			return err
		}
	}

	return nil
}

func (cm *cacheManager) pruneOnce(ctx context.Context, ch chan client.UsageInfo, opt client.PruneInfo) error {
	filter, err := filters.ParseAll(opt.Filter...)
	if err != nil {
		return errors.Wrapf(err, "failed to parse prune filters %v", opt.Filter)
	}

	var check ExternalRefChecker
	if f := cm.PruneRefChecker; f != nil && (!opt.All || len(opt.Filter) > 0) {
		c, err := f()
		if err != nil {
			return errors.WithStack(err)
		}
		check = c
	}

	totalSize := int64(0)
	if opt.KeepBytes != 0 {
		du, err := cm.DiskUsage(ctx, client.DiskUsageInfo{})
		if err != nil {
			return err
		}
		for _, ui := range du {
			if ui.Shared {
				continue
			}
			totalSize += ui.Size
		}
	}

	return cm.prune(ctx, ch, pruneOpt{
		filter:       filter,
		all:          opt.All,
		checkShared:  check,
		keepDuration: opt.KeepDuration,
		keepBytes:    opt.KeepBytes,
		totalSize:    totalSize,
	})
}

func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt pruneOpt) error {
	var toDelete []*deleteRecord

	if opt.keepBytes != 0 && opt.totalSize < opt.keepBytes {
		return nil
	}

	cm.mu.Lock()

	gcMode := opt.keepBytes != 0
	cutOff := time.Now().Add(-opt.keepDuration)

	locked := map[*sync.Mutex]struct{}{}

	for _, cr := range cm.records {
		if _, ok := locked[cr.mu]; ok {
			continue
		}
		cr.mu.Lock()

		// ignore duplicates that share data
		if cr.equalImmutable != nil && len(cr.equalImmutable.refs) > 0 || cr.equalMutable != nil && len(cr.refs) == 0 {
			cr.mu.Unlock()
			continue
		}

		if cr.isDead() {
			cr.mu.Unlock()
			continue
		}

		if len(cr.refs) == 0 {
			recordType := cr.GetRecordType()
			if recordType == "" {
				recordType = client.UsageRecordTypeRegular
			}

			shared := false
			if opt.checkShared != nil {
				shared = opt.checkShared.Exists(cr.ID(), cr.layerDigestChain())
			}

			if !opt.all {
				if recordType == client.UsageRecordTypeInternal || recordType == client.UsageRecordTypeFrontend || shared {
					cr.mu.Unlock()
					continue
				}
			}

			c := &client.UsageInfo{
				ID:         cr.ID(),
				Mutable:    cr.mutable,
				RecordType: recordType,
				Shared:     shared,
			}

			usageCount, lastUsedAt := cr.getLastUsed()
			c.LastUsedAt = lastUsedAt
			c.UsageCount = usageCount

			if opt.keepDuration != 0 {
				if lastUsedAt != nil && lastUsedAt.After(cutOff) {
					cr.mu.Unlock()
					continue
				}
			}

			if opt.filter.Match(adaptUsageInfo(c)) {
				toDelete = append(toDelete, &deleteRecord{
					cacheRecord: cr,
					lastUsedAt:  c.LastUsedAt,
					usageCount:  c.UsageCount,
				})
				if !gcMode {
					cr.dead = true

					// mark metadata as deleted in case we crash before cleanup finished
					if err := cr.queueDeleted(); err != nil {
						cr.mu.Unlock()
						cm.mu.Unlock()
						return err
					}
					if err := cr.commitMetadata(); err != nil {
						cr.mu.Unlock()
						cm.mu.Unlock()
						return err
					}
				} else {
					locked[cr.mu] = struct{}{}
					continue // leave the record locked
				}
			}
		}
		cr.mu.Unlock()
	}

	if gcMode && len(toDelete) > 0 {
		sortDeleteRecords(toDelete)
		var err error
		for i, cr := range toDelete {
			// only remove single record at a time
			if i == 0 {
				cr.dead = true
				err = cr.queueDeleted()
				if err == nil {
					err = cr.commitMetadata()
				}
			}
			cr.mu.Unlock()
		}
		if err != nil {
			return err
		}
		toDelete = toDelete[:1]
	}

	cm.mu.Unlock()

	if len(toDelete) == 0 {
		return nil
	}

	// calculate sizes here so that lock does not need to be held for slow process
	for _, cr := range toDelete {
		size := cr.getSize()

		if size == sizeUnknown && cr.equalImmutable != nil {
			size = cr.equalImmutable.getSize() // benefit from DiskUsage calc
		}
		if size == sizeUnknown {
			// calling size will warm cache for next call
			if _, err := cr.size(ctx); err != nil {
				return err
			}
		}
	}

	cm.mu.Lock()
	var err error
	for _, cr := range toDelete {
		cr.mu.Lock()

		usageCount, lastUsedAt := cr.getLastUsed()

		c := client.UsageInfo{
			ID:          cr.ID(),
			Mutable:     cr.mutable,
			InUse:       len(cr.refs) > 0,
			Size:        cr.getSize(),
			CreatedAt:   cr.GetCreatedAt(),
			Description: cr.GetDescription(),
			LastUsedAt:  lastUsedAt,
			UsageCount:  usageCount,
		}

		switch cr.kind() {
		case Layer:
			c.Parents = []string{cr.layerParent.ID()}
		case Merge:
			c.Parents = make([]string, len(cr.mergeParents))
			for i, p := range cr.mergeParents {
				c.Parents[i] = p.ID()
			}
		case Diff:
			c.Parents = make([]string, 0, 2)
			if cr.diffParents.lower != nil {
				c.Parents = append(c.Parents, cr.diffParents.lower.ID())
			}
			if cr.diffParents.upper != nil {
				c.Parents = append(c.Parents, cr.diffParents.upper.ID())
			}
		}
		if c.Size == sizeUnknown && cr.equalImmutable != nil {
			c.Size = cr.equalImmutable.getSize() // benefit from DiskUsage calc
		}

		opt.totalSize -= c.Size

		if cr.equalImmutable != nil {
			if err1 := cr.equalImmutable.remove(ctx, false); err == nil {
				err = err1
			}
		}
		if err1 := cr.remove(ctx, true); err == nil {
			err = err1
		}

		if err == nil && ch != nil {
			ch <- c
		}
		cr.mu.Unlock()
	}
	cm.mu.Unlock()
	if err != nil {
		return err
	}

	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
		return cm.prune(ctx, ch, opt)
	}
}

func (cm *cacheManager) markShared(m map[string]*cacheUsageInfo) error {
	if cm.PruneRefChecker == nil {
		return nil
	}
	c, err := cm.PruneRefChecker()
	if err != nil {
		return errors.WithStack(err)
	}

	var markAllParentsShared func(...string)
	markAllParentsShared = func(ids ...string) {
		for _, id := range ids {
			if id == "" {
				continue
			}
			if v, ok := m[id]; ok {
				v.shared = true
				markAllParentsShared(v.parents...)
			}
		}
	}

	for id := range m {
		if m[id].shared {
			continue
		}
		if b := c.Exists(id, m[id].parentChain); b {
			markAllParentsShared(id)
		}
	}
	return nil
}

type cacheUsageInfo struct {
	refs        int
	parents     []string
	size        int64
	mutable     bool
	createdAt   time.Time
	usageCount  int
	lastUsedAt  *time.Time
	description string
	doubleRef   bool
	recordType  client.UsageRecordType
	shared      bool
	parentChain []digest.Digest
}

func (cm *cacheManager) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) {
	filter, err := filters.ParseAll(opt.Filter...)
	if err != nil {
		return nil, errors.Wrapf(err, "failed to parse diskusage filters %v", opt.Filter)
	}

	cm.mu.Lock()

	m := make(map[string]*cacheUsageInfo, len(cm.records))
	rescan := make(map[string]struct{}, len(cm.records))

	for id, cr := range cm.records {
		cr.mu.Lock()
		// ignore duplicates that share data
		if cr.equalImmutable != nil && len(cr.equalImmutable.refs) > 0 || cr.equalMutable != nil && len(cr.refs) == 0 {
			cr.mu.Unlock()
			continue
		}

		usageCount, lastUsedAt := cr.getLastUsed()
		c := &cacheUsageInfo{
			refs:        len(cr.refs),
			mutable:     cr.mutable,
			size:        cr.getSize(),
			createdAt:   cr.GetCreatedAt(),
			usageCount:  usageCount,
			lastUsedAt:  lastUsedAt,
			description: cr.GetDescription(),
			doubleRef:   cr.equalImmutable != nil,
			recordType:  cr.GetRecordType(),
			parentChain: cr.layerDigestChain(),
		}
		if c.recordType == "" {
			c.recordType = client.UsageRecordTypeRegular
		}

		switch cr.kind() {
		case Layer:
			c.parents = []string{cr.layerParent.ID()}
		case Merge:
			c.parents = make([]string, len(cr.mergeParents))
			for i, p := range cr.mergeParents {
				c.parents[i] = p.ID()
			}
		case Diff:
			if cr.diffParents.lower != nil {
				c.parents = append(c.parents, cr.diffParents.lower.ID())
			}
			if cr.diffParents.upper != nil {
				c.parents = append(c.parents, cr.diffParents.upper.ID())
			}
		}
		if cr.mutable && c.refs > 0 {
			c.size = 0 // size can not be determined because it is changing
		}
		m[id] = c
		rescan[id] = struct{}{}
		cr.mu.Unlock()
	}
	cm.mu.Unlock()

	for {
		if len(rescan) == 0 {
			break
		}
		for id := range rescan {
			v := m[id]
			if v.refs == 0 {
				for _, p := range v.parents {
					m[p].refs--
					if v.doubleRef {
						m[p].refs--
					}
					rescan[p] = struct{}{}
				}
			}
			delete(rescan, id)
		}
	}

	if err := cm.markShared(m); err != nil {
		return nil, err
	}

	var du []*client.UsageInfo
	for id, cr := range m {
		c := &client.UsageInfo{
			ID:          id,
			Mutable:     cr.mutable,
			InUse:       cr.refs > 0,
			Size:        cr.size,
			Parents:     cr.parents,
			CreatedAt:   cr.createdAt,
			Description: cr.description,
			LastUsedAt:  cr.lastUsedAt,
			UsageCount:  cr.usageCount,
			RecordType:  cr.recordType,
			Shared:      cr.shared,
		}
		if filter.Match(adaptUsageInfo(c)) {
			du = append(du, c)
		}
	}

	eg, ctx := errgroup.WithContext(ctx)

	for _, d := range du {
		if d.Size == sizeUnknown {
			func(d *client.UsageInfo) {
				eg.Go(func() error {
					cm.mu.Lock()
					ref, err := cm.get(ctx, d.ID, nil, NoUpdateLastUsed)
					cm.mu.Unlock()
					if err != nil {
						d.Size = 0
						return nil
					}
					defer ref.Release(context.TODO())
					s, err := ref.size(ctx)
					if err != nil {
						return err
					}
					d.Size = s
					return nil
				})
			}(d)
		}
	}

	if err := eg.Wait(); err != nil {
		return du, err
	}

	return du, nil
}

func IsNotFound(err error) bool {
	return errors.Is(err, errNotFound)
}

type RefOption interface{}

type cachePolicy int

const (
	cachePolicyDefault cachePolicy = iota
	cachePolicyRetain
)

type noUpdateLastUsed struct{}

var NoUpdateLastUsed noUpdateLastUsed

func CachePolicyRetain(m *cacheMetadata) error {
	return m.SetCachePolicyRetain()
}

func CachePolicyDefault(m *cacheMetadata) error {
	return m.SetCachePolicyDefault()
}

func WithDescription(descr string) RefOption {
	return func(m *cacheMetadata) error {
		return m.queueDescription(descr)
	}
}

func WithRecordType(t client.UsageRecordType) RefOption {
	return func(m *cacheMetadata) error {
		return m.queueRecordType(t)
	}
}

func WithCreationTime(tm time.Time) RefOption {
	return func(m *cacheMetadata) error {
		return m.queueCreatedAt(tm)
	}
}

// Need a separate type for imageRef because it needs to be called outside
// initializeMetadata while still being a RefOption, so wrapping it in a
// different type ensures initializeMetadata won't catch it too and duplicate
// setting the metadata.
type imageRefOption func(m *cacheMetadata) error

// WithImageRef appends the given imageRef to the cache ref's metadata
func WithImageRef(imageRef string) RefOption {
	return imageRefOption(func(m *cacheMetadata) error {
		return m.appendImageRef(imageRef)
	})
}

func setImageRefMetadata(m *cacheMetadata, opts ...RefOption) error {
	for _, opt := range opts {
		if fn, ok := opt.(imageRefOption); ok {
			if err := fn(m); err != nil {
				return err
			}
		}
	}
	return m.commitMetadata()
}

func withSnapshotID(id string) RefOption {
	return imageRefOption(func(m *cacheMetadata) error {
		return m.queueSnapshotID(id)
	})
}

func initializeMetadata(m *cacheMetadata, parents parentRefs, opts ...RefOption) error {
	if tm := m.GetCreatedAt(); !tm.IsZero() {
		return nil
	}

	switch {
	case parents.layerParent != nil:
		if err := m.queueParent(parents.layerParent.ID()); err != nil {
			return err
		}
	case len(parents.mergeParents) > 0:
		var ids []string
		for _, p := range parents.mergeParents {
			ids = append(ids, p.ID())
		}
		if err := m.queueMergeParents(ids); err != nil {
			return err
		}
	case parents.diffParents != nil:
		if parents.diffParents.lower != nil {
			if err := m.queueLowerDiffParent(parents.diffParents.lower.ID()); err != nil {
				return err
			}
		}
		if parents.diffParents.upper != nil {
			if err := m.queueUpperDiffParent(parents.diffParents.upper.ID()); err != nil {
				return err
			}
		}
	}

	if err := m.queueCreatedAt(time.Now()); err != nil {
		return err
	}

	for _, opt := range opts {
		if fn, ok := opt.(func(*cacheMetadata) error); ok {
			if err := fn(m); err != nil {
				return err
			}
		}
	}

	return m.commitMetadata()
}

func adaptUsageInfo(info *client.UsageInfo) filters.Adaptor {
	return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
		if len(fieldpath) == 0 {
			return "", false
		}

		switch fieldpath[0] {
		case "id":
			return info.ID, info.ID != ""
		case "parents":
			return strings.Join(info.Parents, ";"), len(info.Parents) > 0
		case "description":
			return info.Description, info.Description != ""
		case "inuse":
			return "", info.InUse
		case "mutable":
			return "", info.Mutable
		case "immutable":
			return "", !info.Mutable
		case "type":
			return string(info.RecordType), info.RecordType != ""
		case "shared":
			return "", info.Shared
		case "private":
			return "", !info.Shared
		}

		// TODO: add int/datetime/bytes support for more fields

		return "", false
	})
}

type pruneOpt struct {
	filter       filters.Filter
	all          bool
	checkShared  ExternalRefChecker
	keepDuration time.Duration
	keepBytes    int64
	totalSize    int64
}

type deleteRecord struct {
	*cacheRecord
	lastUsedAt      *time.Time
	usageCount      int
	lastUsedAtIndex int
	usageCountIndex int
}

func sortDeleteRecords(toDelete []*deleteRecord) {
	sort.Slice(toDelete, func(i, j int) bool {
		if toDelete[i].lastUsedAt == nil {
			return true
		}
		if toDelete[j].lastUsedAt == nil {
			return false
		}
		return toDelete[i].lastUsedAt.Before(*toDelete[j].lastUsedAt)
	})

	maxLastUsedIndex := 0
	var val time.Time
	for _, v := range toDelete {
		if v.lastUsedAt != nil && v.lastUsedAt.After(val) {
			val = *v.lastUsedAt
			maxLastUsedIndex++
		}
		v.lastUsedAtIndex = maxLastUsedIndex
	}

	sort.Slice(toDelete, func(i, j int) bool {
		return toDelete[i].usageCount < toDelete[j].usageCount
	})

	maxUsageCountIndex := 0
	var count int
	for _, v := range toDelete {
		if v.usageCount != count {
			count = v.usageCount
			maxUsageCountIndex++
		}
		v.usageCountIndex = maxUsageCountIndex
	}

	sort.Slice(toDelete, func(i, j int) bool {
		return float64(toDelete[i].lastUsedAtIndex)/float64(maxLastUsedIndex)+
			float64(toDelete[i].usageCountIndex)/float64(maxUsageCountIndex) <
			float64(toDelete[j].lastUsedAtIndex)/float64(maxLastUsedIndex)+
				float64(toDelete[j].usageCountIndex)/float64(maxUsageCountIndex)
	})
}

func diffIDFromDescriptor(desc ocispecs.Descriptor) (digest.Digest, error) {
	diffIDStr, ok := desc.Annotations["containerd.io/uncompressed"]
	if !ok {
		return "", errors.Errorf("missing uncompressed annotation for %s", desc.Digest)
	}
	diffID, err := digest.Parse(diffIDStr)
	if err != nil {
		return "", errors.Wrapf(err, "failed to parse diffID %q for %s", diffIDStr, desc.Digest)
	}
	return diffID, nil
}
