package shortify import ( "crypto/rand" "errors" "log" "math/big" "sync" "time" "go.etcd.io/bbolt" ) type IDPool struct { db *bbolt.DB mu sync.Mutex idLen int poolCap int regenThresh int inMemoryPool []string cond *sync.Cond stopCh chan struct{} usedChan chan string } func NewIDPool(db *bbolt.DB, idLen int, poolCap int, regenThresh int) (*IDPool, error) { p := &IDPool{ db: db, idLen: idLen, poolCap: poolCap, regenThresh: regenThresh, stopCh: make(chan struct{}), usedChan: make(chan string, 1000), } p.cond = sync.NewCond(&p.mu) err := db.Update(func(tx *bbolt.Tx) error { _, err := tx.CreateBucketIfNotExists(idpoolBucket) return err }) if err != nil { return nil, err } if err := p.loadFromDB(); err != nil { return nil, err } if len(p.inMemoryPool) == 0 { // idpool empty at startup, generating initial batch... if err := p.GenerateBatch(); err != nil { return nil, err } if err := p.loadFromDB(); err != nil { return nil, err } } go p.backgroundGenerator() go p.flushUsedIDs() return p, nil } func (p *IDPool) loadFromDB() error { p.mu.Lock() defer p.mu.Unlock() var ids []string err := p.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket(idpoolBucket) if b == nil { return nil } c := b.Cursor() for k, _ := c.First(); k != nil; k, _ = c.Next() { ids = append(ids, string(k)) } return nil }) if err != nil { return err } p.inMemoryPool = ids return nil } func (p *IDPool) GenerateBatch() error { p.mu.Lock() defer p.mu.Unlock() return p.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(idpoolBucket) count := b.Stats().KeyN for count < p.poolCap { id, err := generateID(p.idLen) if err != nil { return err } if b.Get([]byte(id)) != nil { continue } if err := b.Put([]byte(id), []byte{}); err != nil { return err } count++ } return nil }) } // PopID returns an ID from memory and queues it for async DB removal func (p *IDPool) PopID() (string, error) { p.mu.Lock() defer p.mu.Unlock() if len(p.inMemoryPool) == 0 { return "", errors.New("id pool empty") } // Fast O(1) pop id := p.inMemoryPool[0] p.inMemoryPool = p.inMemoryPool[1:] // Queue for async delete select { case p.usedChan <- id: default: // If the channel is full, we drop the delete. Risky only if shutdown happens log.Println("Warning: used ID queue full; delete may be delayed") } // Signal for batch regen if low if len(p.inMemoryPool) < p.regenThresh { p.cond.Signal() } return id, nil } func (p *IDPool) backgroundGenerator() { for { p.mu.Lock() for len(p.inMemoryPool) >= p.regenThresh { p.cond.Wait() } p.mu.Unlock() select { case <-p.stopCh: return default: } // generating batch err := p.GenerateBatch() if err != nil { // error generating batch:( time.Sleep(time.Second * 5) continue } if err := p.loadFromDB(); err != nil { // error laoding from db:( } } } func (p *IDPool) flushUsedIDs() { for { select { case <-p.stopCh: return case id := <-p.usedChan: err := p.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(idpoolBucket) return b.Delete([]byte(id)) }) if err != nil { log.Printf("Failed to delete used ID %s: %v\n", id, err) } } } } func (p *IDPool) Stop() { close(p.stopCh) // Drain and flush remaining used IDs for { select { case id := <-p.usedChan: p.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(idpoolBucket) return b.Delete([]byte(id)) }) default: return } } } var idpoolBucket = []byte("idpool") var base62 = []rune("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz") func generateID(n int) (string, error) { id := make([]rune, n) for i := range id { num, err := rand.Int(rand.Reader, big.NewInt(int64(len(base62)))) if err != nil { return "", err } id[i] = base62[num.Int64()] } return string(id), nil }