From b15f6bbc1f9336b9d2379adea96f909d8541ac59 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 16 Feb 2012 14:35:06 +0200 Subject: [PATCH] added mongo as a storage --- cmd/loader/loader.go | 57 +++++++++++------ tariffplans/userbudget.go | 62 ------------------- tariffplans/userbudget_test.go | 51 --------------- timespans/calldesc.go | 109 +++++++++++++++++++++++---------- timespans/calldesc_test.go | 29 +++++++++ timespans/mongo_storage.go | 40 ++++++++++++ 6 files changed, 183 insertions(+), 165 deletions(-) delete mode 100644 tariffplans/userbudget.go delete mode 100644 tariffplans/userbudget_test.go create mode 100644 timespans/mongo_storage.go diff --git a/cmd/loader/loader.go b/cmd/loader/loader.go index 24de42d9d..a5c3b42e1 100644 --- a/cmd/loader/loader.go +++ b/cmd/loader/loader.go @@ -1,20 +1,23 @@ package main import ( - "flag" + "flag" "github.com/simonz05/godis" "github.com/fsouza/gokabinet/kc" - "github.com/rif/cgrates/timespans" + "launchpad.net/mgo" + "github.com/rif/cgrates/timespans" "log" - "os" - "encoding/json" + "os" + "encoding/json" ) -var ( - storage = flag.String("storage", "kyoto", "kyoto | redis") +var ( + storage = flag.String("storage", "kyoto", "kyoto|redis|mongo") kyotofile = flag.String("kyotofile", "storage.kch", "kyoto storage file (storage.kch)") - redisserver = flag.String("server", "tcp:127.0.0.1:6379", "redis server address (tcp:127.0.0.1:6379)") - redisdb = flag.Int("db", 10, "redis database number (10)") + redisserver = flag.String("redisserver", "tcp:127.0.0.1:6379", "redis server address (tcp:127.0.0.1:6379)") + mongoserver = flag.String("mongoserver", "127.0.0.1:27017", "mongo server address (127.0.0.1:27017)") + redisdb = flag.Int("rdb", 10, "redis database number (10)") + mongodb = flag.String("mdb", "test", "mongo database name (test)") redispass = flag.String("pass", "", "redis database password") inputfile = flag.String("inputfile", "input.json", "redis database password") ) @@ -22,39 +25,55 @@ var ( func main() { flag.Parse() - log.Printf("Reading from %q", *inputfile) + log.Printf("Reading from %q", *inputfile) fin, err := os.Open(*inputfile) - defer fin.Close() + defer fin.Close() if err != nil { log.Print("Cannot open input file", err) return - } - + } + dec := json.NewDecoder(fin) - var callDescriptors []*timespans.CallDescriptor + var callDescriptors []*timespans.CallDescriptor if err := dec.Decode(&callDescriptors); err != nil { log.Println(err) return - } + } - if *storage == "kyoto" { + switch *storage { + case "kyoto": db, _ := kc.Open(*kyotofile, kc.WRITE) defer db.Close() for _, cd := range callDescriptors{ key := cd.GetKey() db.Set(key, cd.EncodeValues()) log.Printf("Storing %q", key) - } - } else { + } + case "mongo": + session, err := mgo.Dial(*mongoserver) + if err != nil { + panic(err) + } + defer session.Close() + session.SetMode(mgo.Strong, true) + + c := session.DB(*mongodb).C("ap") + for _, cd := range callDescriptors{ + key := cd.GetKey() + c.Insert(×pans.KeyValue{key, cd.EncodeValues()}) + log.Printf("Storing %q", key) + } + + default: db := godis.New(*redisserver, *redisdb, *redispass) defer db.Quit() for _, cd := range callDescriptors{ key := cd.GetKey() db.Set(key, cd.EncodeValues()) log.Printf("Storing %q", key) - } - } + } + } } diff --git a/tariffplans/userbudget.go b/tariffplans/userbudget.go deleted file mode 100644 index fac0b4c3b..000000000 --- a/tariffplans/userbudget.go +++ /dev/null @@ -1,62 +0,0 @@ -package tariffplans - -import ( - "math" - "log" -) - -type UserBudget struct { - id string - minuteBuckets []*MinuteBucket - credit float64 - tariffPlan *TariffPlan - resetDayOfTheMonth int -} - -/* -Returns user's avaliable minutes for the specified destination -*/ -func (ub *UserBudget) GetSecondsForPrefix(prefix string) (seconds int) { - if len(ub.minuteBuckets) == 0{ - log.Print("There are no minute buckets to check for user", ub.id) - return - } - bestBucket := ub.minuteBuckets[0] - - for _, mb := range ub.minuteBuckets { - if mb.containsPrefix(prefix) && mb.priority > bestBucket.priority { - bestBucket = mb - } - } - seconds = bestBucket.seconds - if bestBucket.price > 0 { - seconds = int(math.Min(ub.credit / bestBucket.price, float64(seconds))) - } - return -} - -type Destination struct { - id string - prefixes []string -} - -type MinuteBucket struct { - seconds int - priority int - price float64 - destination *Destination -} - -func (mb *MinuteBucket) containsPrefix(prefix string) bool { - for _, p := range mb.destination.prefixes{ - if prefix == p { - return true - } - } - return false -} - -type TariffPlan struct { - minuteBuckets []*MinuteBucket -} - diff --git a/tariffplans/userbudget_test.go b/tariffplans/userbudget_test.go deleted file mode 100644 index dffd0ba8d..000000000 --- a/tariffplans/userbudget_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package tariffplans - -import ( - "testing" -) - -var ( - nationale = &Destination{id: "nationale", prefixes: []string{"0257", "0256", "0723"}} - retea = &Destination{id: "retea", prefixes: []string{"0723", "0724"}} -) - -func TestGetSeconds(t *testing.T) { - b1 := &MinuteBucket{seconds: 10, priority: 10, destination: nationale} - b2 := &MinuteBucket{seconds: 100, priority: 20, destination: retea} - tf1 := &TariffPlan{minuteBuckets: []*MinuteBucket{b1,b2}} - - ub1 := &UserBudget{id: "rif", minuteBuckets: []*MinuteBucket{b1,b2}, credit: 200, tariffPlan: tf1, resetDayOfTheMonth: 10} - seconds := ub1.GetSecondsForPrefix("0723") - expected := 100 - if seconds != expected { - t.Errorf("Expected %v was %v", expected, seconds) - } -} - -func TestGetPricedSeconds(t *testing.T) { - b1 := &MinuteBucket{seconds: 10, price:10, priority: 10, destination: nationale} - b2 := &MinuteBucket{seconds: 100, price:1, priority: 20, destination: retea} - tf1 := &TariffPlan{minuteBuckets: []*MinuteBucket{b1,b2}} - - ub1 := &UserBudget{id: "rif", minuteBuckets: []*MinuteBucket{b1,b2}, credit: 21, tariffPlan: tf1, resetDayOfTheMonth: 10} - seconds := ub1.GetSecondsForPrefix("0723") - expected := 21 - if seconds != expected { - t.Errorf("Expected %v was %v", expected, seconds) - } -} - -/*********************************** Benchmarks *******************************/ - -func BenchmarkActivationPeriodRestore(b *testing.B) { - b.StopTimer() - b1 := &MinuteBucket{seconds: 10, price:10, priority: 10, destination: nationale} - b2 := &MinuteBucket{seconds: 100, price:1, priority: 20, destination: retea} - tf1 := &TariffPlan{minuteBuckets: []*MinuteBucket{b1,b2}} - - ub1 := &UserBudget{id: "rif", minuteBuckets: []*MinuteBucket{b1,b2}, credit: 21, tariffPlan: tf1, resetDayOfTheMonth: 10} - b.StartTimer() - for i := 0; i < b.N; i++ { - ub1.GetSecondsForPrefix("0723") - } -} diff --git a/timespans/calldesc.go b/timespans/calldesc.go index 670d57077..057a24d46 100644 --- a/timespans/calldesc.go +++ b/timespans/calldesc.go @@ -3,10 +3,28 @@ package timespans import ( "fmt" "log" + "math" "strings" "time" ) +/* +Utility function for rounding a float to a certain number of decimals (not present in math). +*/ +func round(val float64, prec int) float64 { + + var rounder float64 + intermed := val * math.Pow(10, float64(prec)) + + if val >= 0.5 { + rounder = math.Ceil(intermed) + } else { + rounder = math.Floor(intermed) + } + + return rounder / math.Pow(10, float64(prec)) +} + /* The input stucture that contains call information. */ @@ -37,6 +55,34 @@ func (cd *CallDescriptor) EncodeValues() (result string) { return } +/* +Restores the activation periods from storage. +*/ +func (cd *CallDescriptor) RestoreFromStorage(sg StorageGetter) (destPrefix string, err error) { + cd.ActivationPeriods = make([]*ActivationPeriod, 0) + base := fmt.Sprintf("%s:%s:", cd.CstmId, cd.Subject) + destPrefix = cd.DestinationPrefix + key := base + destPrefix + values, err := sg.Get(key) + //get for a smaller prefix if the orignal one was not found + for i := len(cd.DestinationPrefix); err != nil && i > 1; values, err = sg.Get(key) { + i-- + destPrefix = cd.DestinationPrefix[:i] + key = base + destPrefix + } + //load the activation preriods + if err == nil { + for _, aps := range strings.Split(values, "\n") { + if len(aps) > 0 { + ap := &ActivationPeriod{} + ap.restore(aps) + cd.ActivationPeriods = append(cd.ActivationPeriods, ap) + } + } + } + return +} + /* Constructs the key for the storage lookup. The prefixLen is limiting the length of the destination prefix. @@ -96,29 +142,6 @@ func (cd *CallDescriptor) splitTimeSpan(firstSpan *TimeSpan) (timespans []*TimeS return } -func (cd *CallDescriptor) RestoreFromStorage(sg StorageGetter) (destPrefix string, err error) { - cd.ActivationPeriods = make([]*ActivationPeriod, 0) - base := fmt.Sprintf("%s:%s:", cd.CstmId, cd.Subject) - destPrefix = cd.DestinationPrefix - key := base + destPrefix - values, err := sg.Get(key) - for i := len(cd.DestinationPrefix); err != nil && i > 1; values, err = sg.Get(key) { - i-- - destPrefix = cd.DestinationPrefix[:i] - key = base + destPrefix - } - if err == nil { - for _, aps := range strings.Split(values, "\n") { - if len(aps) > 0 { - ap := &ActivationPeriod{} - ap.restore(aps) - cd.ActivationPeriods = append(cd.ActivationPeriods, ap) - } - } - } - return -} - /* Creates a CallCost structure with the cost nformation calculated for the received CallDescriptor. */ @@ -128,15 +151,14 @@ func (cd *CallDescriptor) GetCost(sg StorageGetter) (*CallCost, error) { timespans := cd.splitInTimeSpans() cost := 0.0 - for _, ts := range timespans { + connectionFee := 0.0 + for i, ts := range timespans { + if i == 0 { + connectionFee = ts.Interval.ConnectFee + } cost += ts.GetCost() } - connectionFee := 0.0 - if len(timespans) > 0 { - connectionFee = timespans[0].Interval.ConnectFee - } - cc := &CallCost{TOR: cd.TOR, CstmId: cd.CstmId, Subject: cd.Subject, @@ -148,17 +170,38 @@ func (cd *CallDescriptor) GetCost(sg StorageGetter) (*CallCost, error) { } /* -Returns +Returns the cost of a second in the present time conditions. */ func (cd *CallDescriptor) getPresentSecondCost(sg StorageGetter) (cost float64, err error) { _, err = cd.RestoreFromStorage(sg) now := time.Now() - oneSecond,_ := time.ParseDuration("1s") + oneSecond, _ := time.ParseDuration("1s") ts := &TimeSpan{TimeStart: now, TimeEnd: now.Add(oneSecond)} timespans := cd.splitTimeSpan(ts) - cost = timespans[0].GetCost() - return + cost = round(timespans[0].GetCost(), 3) + return +} + +/* +Returns the cost of a second in the present time conditions. +*/ +func (cd *CallDescriptor) GetMaxSessionTime(sg StorageGetter, maxSessionSeconds int) (seconds int, err error) { + _, err = cd.RestoreFromStorage(sg) + now := time.Now() + maxDuration, _ := time.ParseDuration(fmt.Sprintf("%ds", maxSessionSeconds)) + ts := &TimeSpan{TimeStart: now, TimeEnd: now.Add(maxDuration)} + timespans := cd.splitTimeSpan(ts) + + cost := 0.0 + for i, ts := range timespans { + if i == 0 { + cost += ts.Interval.ConnectFee + } + cost += ts.GetCost() + } + + return } /* diff --git a/timespans/calldesc_test.go b/timespans/calldesc_test.go index e8625be1e..c569b3b48 100644 --- a/timespans/calldesc_test.go +++ b/timespans/calldesc_test.go @@ -251,3 +251,32 @@ func BenchmarkKyotoGetCost(b *testing.B) { cd.GetCost(getter) } } + +func BenchmarkMongoGetting(b *testing.B) { + b.StopTimer() + getter, _ := NewMongoStorage("127.0.0.1","test") + defer getter.Close() + + t1 := time.Date(2012, time.February, 2, 17, 30, 0, 0, time.UTC) + t2 := time.Date(2012, time.February, 2, 18, 30, 0, 0, time.UTC) + cd := &CallDescriptor{CstmId: "vdf", Subject: "rif", DestinationPrefix: "0256", TimeStart: t1, TimeEnd: t2} + b.StartTimer() + for i := 0; i < b.N; i++ { + key := cd.GetKey() + getter.Get(key) + } +} + +func BenchmarkMongoGetCost(b *testing.B) { + b.StopTimer() + getter, _ := NewMongoStorage("127.0.0.1","test") + defer getter.Close() + + t1 := time.Date(2012, time.February, 2, 17, 30, 0, 0, time.UTC) + t2 := time.Date(2012, time.February, 2, 18, 30, 0, 0, time.UTC) + cd := &CallDescriptor{CstmId: "vdf", Subject: "rif", DestinationPrefix: "0256", TimeStart: t1, TimeEnd: t2} + b.StartTimer() + for i := 0; i < b.N; i++ { + cd.GetCost(getter) + } +} diff --git a/timespans/mongo_storage.go b/timespans/mongo_storage.go new file mode 100644 index 000000000..de42c0354 --- /dev/null +++ b/timespans/mongo_storage.go @@ -0,0 +1,40 @@ +package timespans + +import ( + "launchpad.net/mgo/bson" + "launchpad.net/mgo" +) + +type KeyValue struct { + Key string + Value string +} + +type MongoStorage struct { + db *mgo.Collection + session *mgo.Session +} + +func NewMongoStorage(address, db string) (*MongoStorage, error) { + session, err := mgo.Dial(address) + if err != nil { + panic(err) + } + session.SetMode(mgo.Monotonic, true) + + ndb := session.DB(db).C("ap") + //log.Print("Starting redis storage") + return &MongoStorage{db: ndb, session: session}, nil +} + +func (ms *MongoStorage) Close() { + //log.Print("Closing redis storage") + ms.session.Close() +} + +func (ms *MongoStorage) Get(key string) (string, error) { + result := KeyValue{} + err := ms.db.Find(bson.M{"key": key}).One(&result) + + return result.Value, err +}