Compare commits
11 Commits
Author | SHA1 | Date | |
---|---|---|---|
3ca245dadb | |||
6f0e883bff | |||
5fc1b55d52 | |||
9718a027e0 | |||
2377fb191a | |||
6f7c7c5ea7 | |||
474de0734f | |||
181e33db92 | |||
1e382f1552 | |||
4150ebe27c | |||
68ef0fbf6b |
@ -1,15 +1,13 @@
|
|||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.tijl.dev/tijl/shortify/pkg/generation"
|
"git.tijl.dev/tijl/shortify/pkg/generation"
|
||||||
|
lru "github.com/hashicorp/golang-lru"
|
||||||
bolt "go.etcd.io/bbolt"
|
bolt "go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -18,21 +16,25 @@ type Client struct {
|
|||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
prefix uint16
|
prefix uint16
|
||||||
gen *generation.Generator
|
gen *generation.Generator
|
||||||
domain string // e.g. https://sho.rt
|
|
||||||
|
|
||||||
db *bolt.DB
|
db *bolt.DB
|
||||||
retryQueue chan shortenJob
|
retryQueue chan shortenJob
|
||||||
stopRetry chan struct{}
|
stopRetry chan struct{}
|
||||||
|
|
||||||
|
// In-memory cache
|
||||||
|
cacheMap *lru.Cache
|
||||||
|
maxCacheSize int
|
||||||
|
maxCacheInitialLoad int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClient with persistence and retry queue
|
// NewClient with persistence and retry queue
|
||||||
func NewClient(serverURL, domain string) (*Client, error) {
|
func NewClient(serverURL string, folder string) (*Client, error) {
|
||||||
httpClient, baseURL, err := createHTTPClient(serverURL)
|
httpClient, baseURL, err := createHTTPClient(serverURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
db, err := bolt.Open(dbFileName, 0600, &bolt.Options{Timeout: 1 * time.Second})
|
db, err := bolt.Open(folder+"/"+dbFileName, 0600, &bolt.Options{Timeout: 1 * time.Second})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -40,12 +42,18 @@ func NewClient(serverURL, domain string) (*Client, error) {
|
|||||||
cli := &Client{
|
cli := &Client{
|
||||||
serverURL: baseURL,
|
serverURL: baseURL,
|
||||||
httpClient: httpClient,
|
httpClient: httpClient,
|
||||||
domain: domain,
|
|
||||||
db: db,
|
db: db,
|
||||||
retryQueue: make(chan shortenJob, 1000),
|
retryQueue: make(chan shortenJob, 100000),
|
||||||
stopRetry: make(chan struct{}),
|
stopRetry: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cli.maxCacheSize = 100000 // or make this configurable
|
||||||
|
cli.maxCacheInitialLoad = 10000
|
||||||
|
cli.cacheMap, err = lru.New(cli.maxCacheSize)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
// Create buckets if not exist
|
// Create buckets if not exist
|
||||||
err = db.Update(func(tx *bolt.Tx) error {
|
err = db.Update(func(tx *bolt.Tx) error {
|
||||||
_, err := tx.CreateBucketIfNotExists([]byte(bucketPrefix))
|
_, err := tx.CreateBucketIfNotExists([]byte(bucketPrefix))
|
||||||
@ -53,7 +61,16 @@ func NewClient(serverURL, domain string) (*Client, error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = tx.CreateBucketIfNotExists([]byte(bucketRetryJobs))
|
_, err = tx.CreateBucketIfNotExists([]byte(bucketRetryJobs))
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.CreateBucketIfNotExists([]byte(bucketURLCache))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -96,6 +113,21 @@ func NewClient(serverURL, domain string) (*Client, error) {
|
|||||||
cli.prefix = prefix
|
cli.prefix = prefix
|
||||||
cli.gen = generation.NewGenerator(prefix)
|
cli.gen = generation.NewGenerator(prefix)
|
||||||
|
|
||||||
|
// load cache
|
||||||
|
_ = cli.db.View(func(tx *bolt.Tx) error {
|
||||||
|
b := tx.Bucket([]byte("url_cache"))
|
||||||
|
if b == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
c := b.Cursor()
|
||||||
|
initalCounter := 0
|
||||||
|
for k, v := c.First(); k != nil && initalCounter < cli.maxCacheInitialLoad; k, v = c.Next() {
|
||||||
|
cli.cacheMap.Add(string(k), string(v))
|
||||||
|
initalCounter++
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
// Load retry jobs from DB into channel
|
// Load retry jobs from DB into channel
|
||||||
go cli.loadRetryJobs()
|
go cli.loadRetryJobs()
|
||||||
|
|
||||||
@ -105,31 +137,72 @@ func NewClient(serverURL, domain string) (*Client, error) {
|
|||||||
return cli, nil
|
return cli, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shorten creates a short URL and sends it async to the central server
|
/*
|
||||||
func (c *Client) Shorten(longURL string) string {
|
Shorten
|
||||||
|
*/
|
||||||
|
|
||||||
|
type ShortenOpt func(*shortenOptions)
|
||||||
|
|
||||||
|
type shortenOptions struct {
|
||||||
|
useCache bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func UseCache() ShortenOpt {
|
||||||
|
return func(opts *shortenOptions) {
|
||||||
|
opts.useCache = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Shorten(longURL string, opts ...ShortenOpt) string {
|
||||||
|
options := shortenOptions{}
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(&options)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check memory cache
|
||||||
|
if options.useCache {
|
||||||
|
if shortID, ok := c.cacheMap.Get(longURL); ok {
|
||||||
|
return shortID.(string)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate new ID
|
||||||
shortID := c.gen.NextID()
|
shortID := c.gen.NextID()
|
||||||
|
|
||||||
|
// Queue job
|
||||||
|
go c.enqueueJob(shortenJob{
|
||||||
|
ID: shortID,
|
||||||
|
URL: longURL,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Async store in cache
|
||||||
|
if options.useCache {
|
||||||
|
go c.addToCache(longURL, shortID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return shortID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) addToCache(longURL, shortID string) {
|
||||||
|
c.cacheMap.Add(longURL, shortID)
|
||||||
|
|
||||||
|
// Async write to BoltDB
|
||||||
go func() {
|
go func() {
|
||||||
payload := map[string]string{
|
_ = c.db.Update(func(tx *bolt.Tx) error {
|
||||||
"id": shortID,
|
b := tx.Bucket([]byte("url_cache"))
|
||||||
"url": longURL,
|
return b.Put([]byte(longURL), []byte(shortID))
|
||||||
}
|
})
|
||||||
data, _ := json.Marshal(payload)
|
}()
|
||||||
|
}
|
||||||
req, err := http.NewRequest("POST", fmt.Sprintf("%s/shorten", c.serverURL), bytes.NewReader(data))
|
|
||||||
if err != nil {
|
func (c *Client) remFromCache(longURL string) {
|
||||||
log.Println("shorten request build error:", err)
|
c.cacheMap.Remove(longURL)
|
||||||
return
|
|
||||||
}
|
// Async write to BoltDB
|
||||||
req.Header.Set("Content-Type", "application/json")
|
go func() {
|
||||||
|
_ = c.db.Update(func(tx *bolt.Tx) error {
|
||||||
resp, err := c.httpClient.Do(req)
|
b := tx.Bucket([]byte("url_cache"))
|
||||||
if err != nil {
|
return b.Delete([]byte(longURL))
|
||||||
log.Println("shorten request failed:", err)
|
})
|
||||||
return
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return fmt.Sprintf("%s/%s", c.domain, shortID)
|
|
||||||
}
|
}
|
||||||
|
16
pkg/client/global.go
Normal file
16
pkg/client/global.go
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
var (
|
||||||
|
Global *Client
|
||||||
|
once sync.Once
|
||||||
|
)
|
||||||
|
|
||||||
|
func Init(serverURL string, folder string) error {
|
||||||
|
var err error
|
||||||
|
once.Do(func() {
|
||||||
|
Global, err = NewClient(serverURL, folder)
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
@ -1,11 +1,13 @@
|
|||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"encoding/binary"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
bolt "go.etcd.io/bbolt"
|
bolt "go.etcd.io/bbolt"
|
||||||
@ -14,6 +16,7 @@ import (
|
|||||||
const (
|
const (
|
||||||
bucketPrefix = "prefix"
|
bucketPrefix = "prefix"
|
||||||
bucketRetryJobs = "retry_queue"
|
bucketRetryJobs = "retry_queue"
|
||||||
|
bucketURLCache = "url_cache"
|
||||||
dbFileName = "shorty_client.db"
|
dbFileName = "shorty_client.db"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -23,19 +26,18 @@ type shortenJob struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) registerPrefix() (uint16, error) {
|
func (c *Client) registerPrefix() (uint16, error) {
|
||||||
resp, err := c.httpClient.Post(fmt.Sprintf("%s/register", c.serverURL), "application/json", nil)
|
resp, err := c.httpClient.Get(fmt.Sprintf("%s/register", c.serverURL))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
var result struct {
|
bytes, err := io.ReadAll(resp.Body)
|
||||||
Prefix uint16 `json:"prefix"`
|
if err != nil {
|
||||||
}
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
return result.Prefix, nil
|
|
||||||
|
return binary.LittleEndian.Uint16(bytes), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) loadRetryJobs() {
|
func (c *Client) loadRetryJobs() {
|
||||||
@ -63,6 +65,7 @@ func (c *Client) retryWorker() {
|
|||||||
case job := <-c.retryQueue:
|
case job := <-c.retryQueue:
|
||||||
err := c.sendShortenJob(job)
|
err := c.sendShortenJob(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Panicln("got error sending shorten job to server", err)
|
||||||
// Re-enqueue with delay
|
// Re-enqueue with delay
|
||||||
go func(j shortenJob) {
|
go func(j shortenJob) {
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
@ -93,7 +96,8 @@ func (c *Client) enqueueJob(job shortenJob) {
|
|||||||
select {
|
select {
|
||||||
case c.retryQueue <- job:
|
case c.retryQueue <- job:
|
||||||
default:
|
default:
|
||||||
log.Println("Retry queue full, dropping job:", job.ID)
|
log.Println("Retry queue full, dropping job and removing from caches:", job.ID, job.URL)
|
||||||
|
go c.remFromCache(job.URL)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -108,18 +112,10 @@ func (c *Client) deleteJobFromDB(job shortenJob) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) sendShortenJob(job shortenJob) error {
|
func (c *Client) sendShortenJob(job shortenJob) error {
|
||||||
payload := map[string]string{
|
req, err := http.NewRequest("POST", fmt.Sprintf("%s/shorten?s=%s", c.serverURL, job.ID), strings.NewReader(job.URL))
|
||||||
"id": job.ID,
|
|
||||||
"url": job.URL,
|
|
||||||
}
|
|
||||||
data, _ := json.Marshal(payload)
|
|
||||||
|
|
||||||
req, err := http.NewRequest("POST", fmt.Sprintf("%s/shorten", c.serverURL), bytes.NewReader(data))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", "application/json")
|
|
||||||
|
|
||||||
resp, err := c.httpClient.Do(req)
|
resp, err := c.httpClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
11
serve.go
11
serve.go
@ -16,19 +16,22 @@ func (s *Server) Admin() *fiber.App {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
var response []byte
|
response := make([]byte, 2)
|
||||||
binary.LittleEndian.PutUint16(response, prefix)
|
binary.LittleEndian.PutUint16(response, prefix)
|
||||||
return c.Send(response)
|
return c.Send(response)
|
||||||
})
|
})
|
||||||
|
|
||||||
a.Post("/shorten", func(c *fiber.Ctx) error {
|
a.Post("/shorten", func(c *fiber.Ctx) error {
|
||||||
|
longUrl := string(c.Body())
|
||||||
shortUrl := c.Query("s")
|
shortUrl := c.Query("s")
|
||||||
if shortUrl == "" {
|
if shortUrl == "" {
|
||||||
shortUrl = s.serverGen.NextID()
|
shortUrl = s.serverGen.NextID()
|
||||||
}
|
}
|
||||||
longUrl := string(c.Body())
|
err := s.storage.Put(shortUrl, longUrl)
|
||||||
|
if err != nil {
|
||||||
return s.storage.Put(shortUrl, longUrl)
|
return err
|
||||||
|
}
|
||||||
|
return c.SendString(shortUrl)
|
||||||
})
|
})
|
||||||
|
|
||||||
return a
|
return a
|
||||||
|
Loading…
x
Reference in New Issue
Block a user