Compare commits

..

No commits in common. "main" and "v0.3.0" have entirely different histories.
main ... v0.3.0

5 changed files with 52 additions and 140 deletions

View File

@ -1,13 +1,15 @@
package client package client
import ( import (
"bytes"
"encoding/binary" "encoding/binary"
"encoding/json"
"fmt" "fmt"
"log"
"net/http" "net/http"
"time" "time"
"git.tijl.dev/tijl/shortify/pkg/generation" "git.tijl.dev/tijl/shortify/pkg/generation"
lru "github.com/hashicorp/golang-lru"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
) )
@ -16,25 +18,21 @@ type Client struct {
httpClient *http.Client httpClient *http.Client
prefix uint16 prefix uint16
gen *generation.Generator gen *generation.Generator
domain string // e.g. https://sho.rt
db *bolt.DB db *bolt.DB
retryQueue chan shortenJob retryQueue chan shortenJob
stopRetry chan struct{} stopRetry chan struct{}
// In-memory cache
cacheMap *lru.Cache
maxCacheSize int
maxCacheInitialLoad int
} }
// NewClient with persistence and retry queue // NewClient with persistence and retry queue
func NewClient(serverURL string, folder string) (*Client, error) { func NewClient(serverURL, domain string) (*Client, error) {
httpClient, baseURL, err := createHTTPClient(serverURL) httpClient, baseURL, err := createHTTPClient(serverURL)
if err != nil { if err != nil {
return nil, err return nil, err
} }
db, err := bolt.Open(folder+"/"+dbFileName, 0600, &bolt.Options{Timeout: 1 * time.Second}) db, err := bolt.Open(dbFileName, 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -42,18 +40,12 @@ func NewClient(serverURL string, folder string) (*Client, error) {
cli := &Client{ cli := &Client{
serverURL: baseURL, serverURL: baseURL,
httpClient: httpClient, httpClient: httpClient,
domain: domain,
db: db, db: db,
retryQueue: make(chan shortenJob, 100000), retryQueue: make(chan shortenJob, 1000),
stopRetry: make(chan struct{}), stopRetry: make(chan struct{}),
} }
cli.maxCacheSize = 100000 // or make this configurable
cli.maxCacheInitialLoad = 10000
cli.cacheMap, err = lru.New(cli.maxCacheSize)
if err != nil {
return nil, err
}
// Create buckets if not exist // Create buckets if not exist
err = db.Update(func(tx *bolt.Tx) error { err = db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(bucketPrefix)) _, err := tx.CreateBucketIfNotExists([]byte(bucketPrefix))
@ -61,16 +53,7 @@ func NewClient(serverURL string, folder string) (*Client, error) {
return err return err
} }
_, err = tx.CreateBucketIfNotExists([]byte(bucketRetryJobs)) _, err = tx.CreateBucketIfNotExists([]byte(bucketRetryJobs))
if err != nil {
return err return err
}
_, err = tx.CreateBucketIfNotExists([]byte(bucketURLCache))
if err != nil {
return err
}
return nil
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -113,21 +96,6 @@ func NewClient(serverURL string, folder string) (*Client, error) {
cli.prefix = prefix cli.prefix = prefix
cli.gen = generation.NewGenerator(prefix) cli.gen = generation.NewGenerator(prefix)
// load cache
_ = cli.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("url_cache"))
if b == nil {
return nil
}
c := b.Cursor()
initalCounter := 0
for k, v := c.First(); k != nil && initalCounter < cli.maxCacheInitialLoad; k, v = c.Next() {
cli.cacheMap.Add(string(k), string(v))
initalCounter++
}
return nil
})
// Load retry jobs from DB into channel // Load retry jobs from DB into channel
go cli.loadRetryJobs() go cli.loadRetryJobs()
@ -137,72 +105,31 @@ func NewClient(serverURL string, folder string) (*Client, error) {
return cli, nil return cli, nil
} }
/* // Shorten creates a short URL and sends it async to the central server
Shorten func (c *Client) Shorten(longURL string) string {
*/
type ShortenOpt func(*shortenOptions)
type shortenOptions struct {
useCache bool
}
func UseCache() ShortenOpt {
return func(opts *shortenOptions) {
opts.useCache = true
}
}
func (c *Client) Shorten(longURL string, opts ...ShortenOpt) string {
options := shortenOptions{}
for _, opt := range opts {
opt(&options)
}
// Check memory cache
if options.useCache {
if shortID, ok := c.cacheMap.Get(longURL); ok {
return shortID.(string)
}
}
// Generate new ID
shortID := c.gen.NextID() shortID := c.gen.NextID()
// Queue job go func() {
go c.enqueueJob(shortenJob{ payload := map[string]string{
ID: shortID, "id": shortID,
URL: longURL, "url": longURL,
})
// Async store in cache
if options.useCache {
go c.addToCache(longURL, shortID)
} }
data, _ := json.Marshal(payload)
return shortID req, err := http.NewRequest("POST", fmt.Sprintf("%s/shorten", c.serverURL), bytes.NewReader(data))
} if err != nil {
log.Println("shorten request build error:", err)
return
}
req.Header.Set("Content-Type", "application/json")
func (c *Client) addToCache(longURL, shortID string) { resp, err := c.httpClient.Do(req)
c.cacheMap.Add(longURL, shortID) if err != nil {
log.Println("shorten request failed:", err)
// Async write to BoltDB return
go func() { }
_ = c.db.Update(func(tx *bolt.Tx) error { defer resp.Body.Close()
b := tx.Bucket([]byte("url_cache"))
return b.Put([]byte(longURL), []byte(shortID))
})
}() }()
}
func (c *Client) remFromCache(longURL string) { return fmt.Sprintf("%s/%s", c.domain, shortID)
c.cacheMap.Remove(longURL)
// Async write to BoltDB
go func() {
_ = c.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("url_cache"))
return b.Delete([]byte(longURL))
})
}()
} }

View File

@ -1,16 +0,0 @@
package client
import "sync"
var (
Global *Client
once sync.Once
)
func Init(serverURL string, folder string) error {
var err error
once.Do(func() {
Global, err = NewClient(serverURL, folder)
})
return err
}

View File

@ -1,13 +1,11 @@
package client package client
import ( import (
"encoding/binary" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"log" "log"
"net/http" "net/http"
"strings"
"time" "time"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
@ -16,7 +14,6 @@ import (
const ( const (
bucketPrefix = "prefix" bucketPrefix = "prefix"
bucketRetryJobs = "retry_queue" bucketRetryJobs = "retry_queue"
bucketURLCache = "url_cache"
dbFileName = "shorty_client.db" dbFileName = "shorty_client.db"
) )
@ -26,18 +23,19 @@ type shortenJob struct {
} }
func (c *Client) registerPrefix() (uint16, error) { func (c *Client) registerPrefix() (uint16, error) {
resp, err := c.httpClient.Get(fmt.Sprintf("%s/register", c.serverURL)) resp, err := c.httpClient.Post(fmt.Sprintf("%s/register", c.serverURL), "application/json", nil)
if err != nil { if err != nil {
return 0, err return 0, err
} }
defer resp.Body.Close() defer resp.Body.Close()
bytes, err := io.ReadAll(resp.Body) var result struct {
if err != nil { Prefix uint16 `json:"prefix"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return 0, err return 0, err
} }
return result.Prefix, nil
return binary.LittleEndian.Uint16(bytes), nil
} }
func (c *Client) loadRetryJobs() { func (c *Client) loadRetryJobs() {
@ -65,7 +63,6 @@ func (c *Client) retryWorker() {
case job := <-c.retryQueue: case job := <-c.retryQueue:
err := c.sendShortenJob(job) err := c.sendShortenJob(job)
if err != nil { if err != nil {
log.Panicln("got error sending shorten job to server", err)
// Re-enqueue with delay // Re-enqueue with delay
go func(j shortenJob) { go func(j shortenJob) {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
@ -96,8 +93,7 @@ func (c *Client) enqueueJob(job shortenJob) {
select { select {
case c.retryQueue <- job: case c.retryQueue <- job:
default: default:
log.Println("Retry queue full, dropping job and removing from caches:", job.ID, job.URL) log.Println("Retry queue full, dropping job:", job.ID)
go c.remFromCache(job.URL)
} }
} }
@ -112,10 +108,18 @@ func (c *Client) deleteJobFromDB(job shortenJob) {
} }
func (c *Client) sendShortenJob(job shortenJob) error { func (c *Client) sendShortenJob(job shortenJob) error {
req, err := http.NewRequest("POST", fmt.Sprintf("%s/shorten?s=%s", c.serverURL, job.ID), strings.NewReader(job.URL)) payload := map[string]string{
"id": job.ID,
"url": job.URL,
}
data, _ := json.Marshal(payload)
req, err := http.NewRequest("POST", fmt.Sprintf("%s/shorten", c.serverURL), bytes.NewReader(data))
if err != nil { if err != nil {
return err return err
} }
req.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(req) resp, err := c.httpClient.Do(req)
if err != nil { if err != nil {
return err return err

View File

@ -16,22 +16,19 @@ func (s *Server) Admin() *fiber.App {
if err != nil { if err != nil {
return err return err
} }
response := make([]byte, 2) var response []byte
binary.LittleEndian.PutUint16(response, prefix) binary.LittleEndian.PutUint16(response, prefix)
return c.Send(response) return c.Send(response)
}) })
a.Post("/shorten", func(c *fiber.Ctx) error { a.Post("/shorten", func(c *fiber.Ctx) error {
longUrl := string(c.Body())
shortUrl := c.Query("s") shortUrl := c.Query("s")
if shortUrl == "" { if shortUrl == "" {
shortUrl = s.serverGen.NextID() shortUrl = s.serverGen.NextID()
} }
err := s.storage.Put(shortUrl, longUrl) longUrl := string(c.Body())
if err != nil {
return err return s.storage.Put(shortUrl, longUrl)
}
return c.SendString(shortUrl)
}) })
return a return a

View File

@ -38,7 +38,7 @@ func (s *Server) HandleGetURL() func(*fiber.Ctx) error {
url, err := s.GetURL(shortID) url, err := s.GetURL(shortID)
if err != nil { if err != nil {
return c.Next() return err
} }
s.LogVisit(VisitLog{ s.LogVisit(VisitLog{