Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
3ca245dadb | |||
6f0e883bff | |||
5fc1b55d52 | |||
9718a027e0 | |||
2377fb191a | |||
6f7c7c5ea7 | |||
474de0734f | |||
181e33db92 | |||
1e382f1552 |
@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"git.tijl.dev/tijl/shortify/pkg/generation"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
@ -19,6 +20,11 @@ type Client struct {
|
||||
db *bolt.DB
|
||||
retryQueue chan shortenJob
|
||||
stopRetry chan struct{}
|
||||
|
||||
// In-memory cache
|
||||
cacheMap *lru.Cache
|
||||
maxCacheSize int
|
||||
maxCacheInitialLoad int
|
||||
}
|
||||
|
||||
// NewClient with persistence and retry queue
|
||||
@ -37,10 +43,17 @@ func NewClient(serverURL string, folder string) (*Client, error) {
|
||||
serverURL: baseURL,
|
||||
httpClient: httpClient,
|
||||
db: db,
|
||||
retryQueue: make(chan shortenJob, 1000),
|
||||
retryQueue: make(chan shortenJob, 100000),
|
||||
stopRetry: make(chan struct{}),
|
||||
}
|
||||
|
||||
cli.maxCacheSize = 100000 // or make this configurable
|
||||
cli.maxCacheInitialLoad = 10000
|
||||
cli.cacheMap, err = lru.New(cli.maxCacheSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create buckets if not exist
|
||||
err = db.Update(func(tx *bolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists([]byte(bucketPrefix))
|
||||
@ -48,7 +61,16 @@ func NewClient(serverURL string, folder string) (*Client, error) {
|
||||
return err
|
||||
}
|
||||
_, err = tx.CreateBucketIfNotExists([]byte(bucketRetryJobs))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.CreateBucketIfNotExists([]byte(bucketURLCache))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -91,6 +113,21 @@ func NewClient(serverURL string, folder string) (*Client, error) {
|
||||
cli.prefix = prefix
|
||||
cli.gen = generation.NewGenerator(prefix)
|
||||
|
||||
// load cache
|
||||
_ = cli.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte("url_cache"))
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
c := b.Cursor()
|
||||
initalCounter := 0
|
||||
for k, v := c.First(); k != nil && initalCounter < cli.maxCacheInitialLoad; k, v = c.Next() {
|
||||
cli.cacheMap.Add(string(k), string(v))
|
||||
initalCounter++
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Load retry jobs from DB into channel
|
||||
go cli.loadRetryJobs()
|
||||
|
||||
@ -100,14 +137,72 @@ func NewClient(serverURL string, folder string) (*Client, error) {
|
||||
return cli, nil
|
||||
}
|
||||
|
||||
// Shorten creates a short URL and sends it async to the central server
|
||||
func (c *Client) Shorten(longURL string) string {
|
||||
/*
|
||||
Shorten
|
||||
*/
|
||||
|
||||
type ShortenOpt func(*shortenOptions)
|
||||
|
||||
type shortenOptions struct {
|
||||
useCache bool
|
||||
}
|
||||
|
||||
func UseCache() ShortenOpt {
|
||||
return func(opts *shortenOptions) {
|
||||
opts.useCache = true
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Shorten(longURL string, opts ...ShortenOpt) string {
|
||||
options := shortenOptions{}
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
}
|
||||
|
||||
// Check memory cache
|
||||
if options.useCache {
|
||||
if shortID, ok := c.cacheMap.Get(longURL); ok {
|
||||
return shortID.(string)
|
||||
}
|
||||
}
|
||||
|
||||
// Generate new ID
|
||||
shortID := c.gen.NextID()
|
||||
|
||||
// Queue job
|
||||
go c.enqueueJob(shortenJob{
|
||||
ID: shortID,
|
||||
URL: longURL,
|
||||
})
|
||||
|
||||
// Async store in cache
|
||||
if options.useCache {
|
||||
go c.addToCache(longURL, shortID)
|
||||
}
|
||||
|
||||
return shortID
|
||||
}
|
||||
|
||||
func (c *Client) addToCache(longURL, shortID string) {
|
||||
c.cacheMap.Add(longURL, shortID)
|
||||
|
||||
// Async write to BoltDB
|
||||
go func() {
|
||||
_ = c.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte("url_cache"))
|
||||
return b.Put([]byte(longURL), []byte(shortID))
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *Client) remFromCache(longURL string) {
|
||||
c.cacheMap.Remove(longURL)
|
||||
|
||||
// Async write to BoltDB
|
||||
go func() {
|
||||
_ = c.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte("url_cache"))
|
||||
return b.Delete([]byte(longURL))
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
16
pkg/client/global.go
Normal file
16
pkg/client/global.go
Normal file
@ -0,0 +1,16 @@
|
||||
package client
|
||||
|
||||
import "sync"
|
||||
|
||||
var (
|
||||
Global *Client
|
||||
once sync.Once
|
||||
)
|
||||
|
||||
func Init(serverURL string, folder string) error {
|
||||
var err error
|
||||
once.Do(func() {
|
||||
Global, err = NewClient(serverURL, folder)
|
||||
})
|
||||
return err
|
||||
}
|
@ -16,6 +16,7 @@ import (
|
||||
const (
|
||||
bucketPrefix = "prefix"
|
||||
bucketRetryJobs = "retry_queue"
|
||||
bucketURLCache = "url_cache"
|
||||
dbFileName = "shorty_client.db"
|
||||
)
|
||||
|
||||
@ -64,6 +65,7 @@ func (c *Client) retryWorker() {
|
||||
case job := <-c.retryQueue:
|
||||
err := c.sendShortenJob(job)
|
||||
if err != nil {
|
||||
log.Panicln("got error sending shorten job to server", err)
|
||||
// Re-enqueue with delay
|
||||
go func(j shortenJob) {
|
||||
time.Sleep(2 * time.Second)
|
||||
@ -94,7 +96,8 @@ func (c *Client) enqueueJob(job shortenJob) {
|
||||
select {
|
||||
case c.retryQueue <- job:
|
||||
default:
|
||||
log.Println("Retry queue full, dropping job:", job.ID)
|
||||
log.Println("Retry queue full, dropping job and removing from caches:", job.ID, job.URL)
|
||||
go c.remFromCache(job.URL)
|
||||
}
|
||||
}
|
||||
|
||||
|
2
serve.go
2
serve.go
@ -16,7 +16,7 @@ func (s *Server) Admin() *fiber.App {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var response []byte
|
||||
response := make([]byte, 2)
|
||||
binary.LittleEndian.PutUint16(response, prefix)
|
||||
return c.Send(response)
|
||||
})
|
||||
|
Loading…
x
Reference in New Issue
Block a user