mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
added mongo as a storage
This commit is contained in:
@@ -1,20 +1,23 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"flag"
|
||||
"github.com/simonz05/godis"
|
||||
"github.com/fsouza/gokabinet/kc"
|
||||
"github.com/rif/cgrates/timespans"
|
||||
"launchpad.net/mgo"
|
||||
"github.com/rif/cgrates/timespans"
|
||||
"log"
|
||||
"os"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
var (
|
||||
storage = flag.String("storage", "kyoto", "kyoto | redis")
|
||||
var (
|
||||
storage = flag.String("storage", "kyoto", "kyoto|redis|mongo")
|
||||
kyotofile = flag.String("kyotofile", "storage.kch", "kyoto storage file (storage.kch)")
|
||||
redisserver = flag.String("server", "tcp:127.0.0.1:6379", "redis server address (tcp:127.0.0.1:6379)")
|
||||
redisdb = flag.Int("db", 10, "redis database number (10)")
|
||||
redisserver = flag.String("redisserver", "tcp:127.0.0.1:6379", "redis server address (tcp:127.0.0.1:6379)")
|
||||
mongoserver = flag.String("mongoserver", "127.0.0.1:27017", "mongo server address (127.0.0.1:27017)")
|
||||
redisdb = flag.Int("rdb", 10, "redis database number (10)")
|
||||
mongodb = flag.String("mdb", "test", "mongo database name (test)")
|
||||
redispass = flag.String("pass", "", "redis database password")
|
||||
inputfile = flag.String("inputfile", "input.json", "redis database password")
|
||||
)
|
||||
@@ -22,39 +25,55 @@ var (
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
log.Printf("Reading from %q", *inputfile)
|
||||
log.Printf("Reading from %q", *inputfile)
|
||||
|
||||
fin, err := os.Open(*inputfile)
|
||||
defer fin.Close()
|
||||
defer fin.Close()
|
||||
|
||||
if err != nil {
|
||||
log.Print("Cannot open input file", err)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
dec := json.NewDecoder(fin)
|
||||
|
||||
var callDescriptors []*timespans.CallDescriptor
|
||||
var callDescriptors []*timespans.CallDescriptor
|
||||
if err := dec.Decode(&callDescriptors); err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if *storage == "kyoto" {
|
||||
switch *storage {
|
||||
case "kyoto":
|
||||
db, _ := kc.Open(*kyotofile, kc.WRITE)
|
||||
defer db.Close()
|
||||
for _, cd := range callDescriptors{
|
||||
key := cd.GetKey()
|
||||
db.Set(key, cd.EncodeValues())
|
||||
log.Printf("Storing %q", key)
|
||||
}
|
||||
} else {
|
||||
}
|
||||
case "mongo":
|
||||
session, err := mgo.Dial(*mongoserver)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer session.Close()
|
||||
session.SetMode(mgo.Strong, true)
|
||||
|
||||
c := session.DB(*mongodb).C("ap")
|
||||
for _, cd := range callDescriptors{
|
||||
key := cd.GetKey()
|
||||
c.Insert(×pans.KeyValue{key, cd.EncodeValues()})
|
||||
log.Printf("Storing %q", key)
|
||||
}
|
||||
|
||||
default:
|
||||
db := godis.New(*redisserver, *redisdb, *redispass)
|
||||
defer db.Quit()
|
||||
for _, cd := range callDescriptors{
|
||||
key := cd.GetKey()
|
||||
db.Set(key, cd.EncodeValues())
|
||||
log.Printf("Storing %q", key)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,62 +0,0 @@
|
||||
package tariffplans
|
||||
|
||||
import (
|
||||
"math"
|
||||
"log"
|
||||
)
|
||||
|
||||
type UserBudget struct {
|
||||
id string
|
||||
minuteBuckets []*MinuteBucket
|
||||
credit float64
|
||||
tariffPlan *TariffPlan
|
||||
resetDayOfTheMonth int
|
||||
}
|
||||
|
||||
/*
|
||||
Returns user's avaliable minutes for the specified destination
|
||||
*/
|
||||
func (ub *UserBudget) GetSecondsForPrefix(prefix string) (seconds int) {
|
||||
if len(ub.minuteBuckets) == 0{
|
||||
log.Print("There are no minute buckets to check for user", ub.id)
|
||||
return
|
||||
}
|
||||
bestBucket := ub.minuteBuckets[0]
|
||||
|
||||
for _, mb := range ub.minuteBuckets {
|
||||
if mb.containsPrefix(prefix) && mb.priority > bestBucket.priority {
|
||||
bestBucket = mb
|
||||
}
|
||||
}
|
||||
seconds = bestBucket.seconds
|
||||
if bestBucket.price > 0 {
|
||||
seconds = int(math.Min(ub.credit / bestBucket.price, float64(seconds)))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type Destination struct {
|
||||
id string
|
||||
prefixes []string
|
||||
}
|
||||
|
||||
type MinuteBucket struct {
|
||||
seconds int
|
||||
priority int
|
||||
price float64
|
||||
destination *Destination
|
||||
}
|
||||
|
||||
func (mb *MinuteBucket) containsPrefix(prefix string) bool {
|
||||
for _, p := range mb.destination.prefixes{
|
||||
if prefix == p {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type TariffPlan struct {
|
||||
minuteBuckets []*MinuteBucket
|
||||
}
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
package tariffplans
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
var (
|
||||
nationale = &Destination{id: "nationale", prefixes: []string{"0257", "0256", "0723"}}
|
||||
retea = &Destination{id: "retea", prefixes: []string{"0723", "0724"}}
|
||||
)
|
||||
|
||||
func TestGetSeconds(t *testing.T) {
|
||||
b1 := &MinuteBucket{seconds: 10, priority: 10, destination: nationale}
|
||||
b2 := &MinuteBucket{seconds: 100, priority: 20, destination: retea}
|
||||
tf1 := &TariffPlan{minuteBuckets: []*MinuteBucket{b1,b2}}
|
||||
|
||||
ub1 := &UserBudget{id: "rif", minuteBuckets: []*MinuteBucket{b1,b2}, credit: 200, tariffPlan: tf1, resetDayOfTheMonth: 10}
|
||||
seconds := ub1.GetSecondsForPrefix("0723")
|
||||
expected := 100
|
||||
if seconds != expected {
|
||||
t.Errorf("Expected %v was %v", expected, seconds)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPricedSeconds(t *testing.T) {
|
||||
b1 := &MinuteBucket{seconds: 10, price:10, priority: 10, destination: nationale}
|
||||
b2 := &MinuteBucket{seconds: 100, price:1, priority: 20, destination: retea}
|
||||
tf1 := &TariffPlan{minuteBuckets: []*MinuteBucket{b1,b2}}
|
||||
|
||||
ub1 := &UserBudget{id: "rif", minuteBuckets: []*MinuteBucket{b1,b2}, credit: 21, tariffPlan: tf1, resetDayOfTheMonth: 10}
|
||||
seconds := ub1.GetSecondsForPrefix("0723")
|
||||
expected := 21
|
||||
if seconds != expected {
|
||||
t.Errorf("Expected %v was %v", expected, seconds)
|
||||
}
|
||||
}
|
||||
|
||||
/*********************************** Benchmarks *******************************/
|
||||
|
||||
func BenchmarkActivationPeriodRestore(b *testing.B) {
|
||||
b.StopTimer()
|
||||
b1 := &MinuteBucket{seconds: 10, price:10, priority: 10, destination: nationale}
|
||||
b2 := &MinuteBucket{seconds: 100, price:1, priority: 20, destination: retea}
|
||||
tf1 := &TariffPlan{minuteBuckets: []*MinuteBucket{b1,b2}}
|
||||
|
||||
ub1 := &UserBudget{id: "rif", minuteBuckets: []*MinuteBucket{b1,b2}, credit: 21, tariffPlan: tf1, resetDayOfTheMonth: 10}
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ub1.GetSecondsForPrefix("0723")
|
||||
}
|
||||
}
|
||||
@@ -3,10 +3,28 @@ package timespans
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
/*
|
||||
Utility function for rounding a float to a certain number of decimals (not present in math).
|
||||
*/
|
||||
func round(val float64, prec int) float64 {
|
||||
|
||||
var rounder float64
|
||||
intermed := val * math.Pow(10, float64(prec))
|
||||
|
||||
if val >= 0.5 {
|
||||
rounder = math.Ceil(intermed)
|
||||
} else {
|
||||
rounder = math.Floor(intermed)
|
||||
}
|
||||
|
||||
return rounder / math.Pow(10, float64(prec))
|
||||
}
|
||||
|
||||
/*
|
||||
The input stucture that contains call information.
|
||||
*/
|
||||
@@ -37,6 +55,34 @@ func (cd *CallDescriptor) EncodeValues() (result string) {
|
||||
return
|
||||
}
|
||||
|
||||
/*
|
||||
Restores the activation periods from storage.
|
||||
*/
|
||||
func (cd *CallDescriptor) RestoreFromStorage(sg StorageGetter) (destPrefix string, err error) {
|
||||
cd.ActivationPeriods = make([]*ActivationPeriod, 0)
|
||||
base := fmt.Sprintf("%s:%s:", cd.CstmId, cd.Subject)
|
||||
destPrefix = cd.DestinationPrefix
|
||||
key := base + destPrefix
|
||||
values, err := sg.Get(key)
|
||||
//get for a smaller prefix if the orignal one was not found
|
||||
for i := len(cd.DestinationPrefix); err != nil && i > 1; values, err = sg.Get(key) {
|
||||
i--
|
||||
destPrefix = cd.DestinationPrefix[:i]
|
||||
key = base + destPrefix
|
||||
}
|
||||
//load the activation preriods
|
||||
if err == nil {
|
||||
for _, aps := range strings.Split(values, "\n") {
|
||||
if len(aps) > 0 {
|
||||
ap := &ActivationPeriod{}
|
||||
ap.restore(aps)
|
||||
cd.ActivationPeriods = append(cd.ActivationPeriods, ap)
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
/*
|
||||
Constructs the key for the storage lookup.
|
||||
The prefixLen is limiting the length of the destination prefix.
|
||||
@@ -96,29 +142,6 @@ func (cd *CallDescriptor) splitTimeSpan(firstSpan *TimeSpan) (timespans []*TimeS
|
||||
return
|
||||
}
|
||||
|
||||
func (cd *CallDescriptor) RestoreFromStorage(sg StorageGetter) (destPrefix string, err error) {
|
||||
cd.ActivationPeriods = make([]*ActivationPeriod, 0)
|
||||
base := fmt.Sprintf("%s:%s:", cd.CstmId, cd.Subject)
|
||||
destPrefix = cd.DestinationPrefix
|
||||
key := base + destPrefix
|
||||
values, err := sg.Get(key)
|
||||
for i := len(cd.DestinationPrefix); err != nil && i > 1; values, err = sg.Get(key) {
|
||||
i--
|
||||
destPrefix = cd.DestinationPrefix[:i]
|
||||
key = base + destPrefix
|
||||
}
|
||||
if err == nil {
|
||||
for _, aps := range strings.Split(values, "\n") {
|
||||
if len(aps) > 0 {
|
||||
ap := &ActivationPeriod{}
|
||||
ap.restore(aps)
|
||||
cd.ActivationPeriods = append(cd.ActivationPeriods, ap)
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
/*
|
||||
Creates a CallCost structure with the cost nformation calculated for the received CallDescriptor.
|
||||
*/
|
||||
@@ -128,15 +151,14 @@ func (cd *CallDescriptor) GetCost(sg StorageGetter) (*CallCost, error) {
|
||||
timespans := cd.splitInTimeSpans()
|
||||
|
||||
cost := 0.0
|
||||
for _, ts := range timespans {
|
||||
connectionFee := 0.0
|
||||
for i, ts := range timespans {
|
||||
if i == 0 {
|
||||
connectionFee = ts.Interval.ConnectFee
|
||||
}
|
||||
cost += ts.GetCost()
|
||||
}
|
||||
|
||||
connectionFee := 0.0
|
||||
if len(timespans) > 0 {
|
||||
connectionFee = timespans[0].Interval.ConnectFee
|
||||
}
|
||||
|
||||
cc := &CallCost{TOR: cd.TOR,
|
||||
CstmId: cd.CstmId,
|
||||
Subject: cd.Subject,
|
||||
@@ -148,17 +170,38 @@ func (cd *CallDescriptor) GetCost(sg StorageGetter) (*CallCost, error) {
|
||||
}
|
||||
|
||||
/*
|
||||
Returns
|
||||
Returns the cost of a second in the present time conditions.
|
||||
*/
|
||||
func (cd *CallDescriptor) getPresentSecondCost(sg StorageGetter) (cost float64, err error) {
|
||||
_, err = cd.RestoreFromStorage(sg)
|
||||
now := time.Now()
|
||||
oneSecond,_ := time.ParseDuration("1s")
|
||||
oneSecond, _ := time.ParseDuration("1s")
|
||||
ts := &TimeSpan{TimeStart: now, TimeEnd: now.Add(oneSecond)}
|
||||
timespans := cd.splitTimeSpan(ts)
|
||||
|
||||
cost = timespans[0].GetCost()
|
||||
return
|
||||
cost = round(timespans[0].GetCost(), 3)
|
||||
return
|
||||
}
|
||||
|
||||
/*
|
||||
Returns the cost of a second in the present time conditions.
|
||||
*/
|
||||
func (cd *CallDescriptor) GetMaxSessionTime(sg StorageGetter, maxSessionSeconds int) (seconds int, err error) {
|
||||
_, err = cd.RestoreFromStorage(sg)
|
||||
now := time.Now()
|
||||
maxDuration, _ := time.ParseDuration(fmt.Sprintf("%ds", maxSessionSeconds))
|
||||
ts := &TimeSpan{TimeStart: now, TimeEnd: now.Add(maxDuration)}
|
||||
timespans := cd.splitTimeSpan(ts)
|
||||
|
||||
cost := 0.0
|
||||
for i, ts := range timespans {
|
||||
if i == 0 {
|
||||
cost += ts.Interval.ConnectFee
|
||||
}
|
||||
cost += ts.GetCost()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@@ -251,3 +251,32 @@ func BenchmarkKyotoGetCost(b *testing.B) {
|
||||
cd.GetCost(getter)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMongoGetting(b *testing.B) {
|
||||
b.StopTimer()
|
||||
getter, _ := NewMongoStorage("127.0.0.1","test")
|
||||
defer getter.Close()
|
||||
|
||||
t1 := time.Date(2012, time.February, 2, 17, 30, 0, 0, time.UTC)
|
||||
t2 := time.Date(2012, time.February, 2, 18, 30, 0, 0, time.UTC)
|
||||
cd := &CallDescriptor{CstmId: "vdf", Subject: "rif", DestinationPrefix: "0256", TimeStart: t1, TimeEnd: t2}
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
key := cd.GetKey()
|
||||
getter.Get(key)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMongoGetCost(b *testing.B) {
|
||||
b.StopTimer()
|
||||
getter, _ := NewMongoStorage("127.0.0.1","test")
|
||||
defer getter.Close()
|
||||
|
||||
t1 := time.Date(2012, time.February, 2, 17, 30, 0, 0, time.UTC)
|
||||
t2 := time.Date(2012, time.February, 2, 18, 30, 0, 0, time.UTC)
|
||||
cd := &CallDescriptor{CstmId: "vdf", Subject: "rif", DestinationPrefix: "0256", TimeStart: t1, TimeEnd: t2}
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
cd.GetCost(getter)
|
||||
}
|
||||
}
|
||||
|
||||
40
timespans/mongo_storage.go
Normal file
40
timespans/mongo_storage.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package timespans
|
||||
|
||||
import (
|
||||
"launchpad.net/mgo/bson"
|
||||
"launchpad.net/mgo"
|
||||
)
|
||||
|
||||
type KeyValue struct {
|
||||
Key string
|
||||
Value string
|
||||
}
|
||||
|
||||
type MongoStorage struct {
|
||||
db *mgo.Collection
|
||||
session *mgo.Session
|
||||
}
|
||||
|
||||
func NewMongoStorage(address, db string) (*MongoStorage, error) {
|
||||
session, err := mgo.Dial(address)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
session.SetMode(mgo.Monotonic, true)
|
||||
|
||||
ndb := session.DB(db).C("ap")
|
||||
//log.Print("Starting redis storage")
|
||||
return &MongoStorage{db: ndb, session: session}, nil
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) Close() {
|
||||
//log.Print("Closing redis storage")
|
||||
ms.session.Close()
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) Get(key string) (string, error) {
|
||||
result := KeyValue{}
|
||||
err := ms.db.Find(bson.M{"key": key}).One(&result)
|
||||
|
||||
return result.Value, err
|
||||
}
|
||||
Reference in New Issue
Block a user