From a7fb55792f4ac09f3584c571fab9727063c29e5c Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sun, 12 May 2013 12:32:37 +0300 Subject: [PATCH] more redis corrections --- rater/action_trigger.go | 1 + rater/calldesc.go | 12 ++- rater/calldesc_test.go | 1 + rater/storage_interface.go | 21 +++- rater/storage_redigo.go | 216 +++++++++++++++++++++++++++++++++++++ rater/storage_redis.go | 5 +- rater/units_counter.go | 2 +- rater/userbalance_test.go | 2 +- 8 files changed, 246 insertions(+), 14 deletions(-) create mode 100644 rater/storage_redigo.go diff --git a/rater/action_trigger.go b/rater/action_trigger.go index 27fd6b5c1..1728005cb 100644 --- a/rater/action_trigger.go +++ b/rater/action_trigger.go @@ -20,6 +20,7 @@ package rater import ( "fmt" + //"log" "sort" "strconv" "strings" diff --git a/rater/calldesc.go b/rater/calldesc.go index ded260d40..b2c457c69 100644 --- a/rater/calldesc.go +++ b/rater/calldesc.go @@ -42,11 +42,13 @@ const ( ) var ( - Logger LoggerInterface - storageGetter, _ = NewMapStorage() - //storageGetter, _ = NewMongoStorage("192.168.0.17", "27017", "cgrates", "", "") - //storageGetter, _ = NewRedisStorage("192.168.0.17:6379", 10, "") - //storageGetter, _ = NewRedigoStorage("192.168.0.17:6379", 10, "") + Logger LoggerInterface + db_server = "127.0.0.1" + //db_server = "192.168.0.17" + //storageGetter, _ = NewMapStorage() + //storageGetter, _ = NewMongoStorage(db_server, "27017", "cgrates_test", "", "") + storageGetter, _ = NewRedisStorage(db_server+":6379", 11, "") + //storageGetter, _ = NewRedigoStorage(db_server+":6379", 11, "") storageLogger = storageGetter debitPeriod = 10 * time.Second ) diff --git a/rater/calldesc_test.go b/rater/calldesc_test.go index a9f3faf95..d5afdf05b 100644 --- a/rater/calldesc_test.go +++ b/rater/calldesc_test.go @@ -48,6 +48,7 @@ func populateDB() { &MinuteBucket{Seconds: 100, DestinationId: "RET", Weight: 20}, }, } + storageGetter.Flush() storageGetter.SetUserBalance(broker) storageGetter.SetUserBalance(minu) } diff --git a/rater/storage_interface.go b/rater/storage_interface.go index abcf3e327..838ac4b1b 100644 --- a/rater/storage_interface.go +++ b/rater/storage_interface.go @@ -22,6 +22,7 @@ import ( "bytes" "encoding/gob" "encoding/json" + gmsgpack "github.com/ugorji/go-msgpack" "github.com/vmihailenco/msgpack" "strings" ) @@ -111,6 +112,16 @@ func (jm *MsgpackMarshaler) Unmarshal(data []byte, v interface{}) error { return msgpack.Unmarshal(data, v) } +type GoMsgpackMarshaler struct{} + +func (jm *GoMsgpackMarshaler) Marshal(v interface{}) ([]byte, error) { + return gmsgpack.Marshal(v) +} + +func (jm *GoMsgpackMarshaler) Unmarshal(data []byte, v interface{}) error { + return gmsgpack.Unmarshal(data, v, nil) +} + type GOBMarshaler struct { buf bytes.Buffer } @@ -143,14 +154,16 @@ func (mm *MyMarshaler) Marshal(v interface{}) (data []byte, err error) { case []*Action: result := "" for _, a := range v.([]*Action) { - result += a.store() + "+" + result += a.store() + "~" } + result = strings.TrimRight(result, "~") return []byte(result), nil case []*ActionTiming: result := "" for _, at := range v.([]*ActionTiming) { - result += at.store() + "+" + result += at.store() + "~" } + result = strings.TrimRight(result, "~") return []byte(result), nil case storer: s := v.(storer) @@ -167,7 +180,7 @@ func (mm *MyMarshaler) Unmarshal(data []byte, v interface{}) (err error) { switch v.(type) { case *[]*Action: as := v.(*[]*Action) - for _, a_string := range strings.Split(string(data), "+") { + for _, a_string := range strings.Split(string(data), "~") { if len(a_string) > 0 { a := &Action{} a.restore(a_string) @@ -177,7 +190,7 @@ func (mm *MyMarshaler) Unmarshal(data []byte, v interface{}) (err error) { return nil case *[]*ActionTiming: ats := v.(*[]*ActionTiming) - for _, at_string := range strings.Split(string(data), "+") { + for _, at_string := range strings.Split(string(data), "~") { if len(at_string) > 0 { at := &ActionTiming{} at.restore(at_string) diff --git a/rater/storage_redigo.go b/rater/storage_redigo.go new file mode 100644 index 000000000..046163a85 --- /dev/null +++ b/rater/storage_redigo.go @@ -0,0 +1,216 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2013 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package rater + +import ( + "fmt" + "github.com/garyburd/redigo/redis" + //"log" + "time" +) + +type RedigoStorage struct { + dbNb int + db redis.Conn + ms Marshaler +} + +func NewRedigoStorage(address string, db int, pass string) (DataStorage, error) { + ndb, err := redis.Dial("tcp", address) + if err != nil { + return nil, err + } + if pass != "" { + if _, err = ndb.Do("auth", pass); err != nil { + return nil, err + } + } + if db > 0 { + if _, err = ndb.Do("select", db); err != nil { + return nil, err + } + } + ms := new(MyMarshaler) + return &RedigoStorage{db: ndb, dbNb: db, ms: ms}, nil +} + +func (rs *RedigoStorage) Close() { + rs.db.Close() +} + +func (rs *RedigoStorage) Flush() (err error) { + _, err = rs.db.Do("flushdb") + return +} + +func (rs *RedigoStorage) GetRatingProfile(key string) (rp *RatingProfile, err error) { + var values []byte + if values, err = redis.Bytes(rs.db.Do("get", RATING_PROFILE_PREFIX+key)); err == nil { + rp = new(RatingProfile) + err = rs.ms.Unmarshal(values, rp) + } + return +} + +func (rs *RedigoStorage) SetRatingProfile(rp *RatingProfile) (err error) { + result, err := rs.ms.Marshal(rp) + _, err = rs.db.Do("set", RATING_PROFILE_PREFIX+rp.Id, result) + return +} + +func (rs *RedigoStorage) GetDestination(key string) (dest *Destination, err error) { + var values []byte + if values, err = redis.Bytes(rs.db.Do("get", DESTINATION_PREFIX+key)); err == nil { + dest = &Destination{Id: key} + err = rs.ms.Unmarshal(values, dest) + } + return +} + +func (rs *RedigoStorage) SetDestination(dest *Destination) (err error) { + var result []byte + if result, err = rs.ms.Marshal(dest); err != nil { + return + } + _, err = rs.db.Do("set", DESTINATION_PREFIX+dest.Id, result) + return +} + +func (rs *RedigoStorage) GetActions(key string) (as []*Action, err error) { + var values []byte + if values, err = redis.Bytes(rs.db.Do("get", ACTION_PREFIX+key)); err == nil { + err = rs.ms.Unmarshal(values, &as) + } + return +} + +func (rs *RedigoStorage) SetActions(key string, as []*Action) (err error) { + result, err := rs.ms.Marshal(as) + _, err = rs.db.Do("set", ACTION_PREFIX+key, result) + return +} + +func (rs *RedigoStorage) GetUserBalance(key string) (ub *UserBalance, err error) { + var values []byte + if values, err = redis.Bytes(rs.db.Do("get", USER_BALANCE_PREFIX+key)); err == nil { + ub = &UserBalance{Id: key} + err = rs.ms.Unmarshal(values, ub) + } + + return +} + +func (rs *RedigoStorage) SetUserBalance(ub *UserBalance) (err error) { + result, err := rs.ms.Marshal(ub) + _, err = rs.db.Do("set", USER_BALANCE_PREFIX+ub.Id, result) + return +} + +func (rs *RedigoStorage) GetActionTimings(key string) (ats []*ActionTiming, err error) { + var values []byte + if values, err = redis.Bytes(rs.db.Do("get", ACTION_TIMING_PREFIX+key)); err == nil { + err = rs.ms.Unmarshal(values, &ats) + } + return +} + +func (rs *RedigoStorage) SetActionTimings(key string, ats []*ActionTiming) (err error) { + if len(ats) == 0 { + // delete the key + _, err = rs.db.Do("del", ACTION_TIMING_PREFIX+key) + return err + } + result, err := rs.ms.Marshal(ats) + _, err = rs.db.Do("set", ACTION_TIMING_PREFIX+key, result) + return +} + +func (rs *RedigoStorage) GetAllActionTimings() (ats map[string][]*ActionTiming, err error) { + reply, err := redis.Values(rs.db.Do("keys", ACTION_TIMING_PREFIX+"*")) + if err != nil { + return nil, err + } + var keys []string + for _, x := range reply { + if v, ok := x.([]byte); ok { + keys = append(keys, string(v)) + } + } + ats = make(map[string][]*ActionTiming, len(keys)) + for _, key := range keys { + values, err := redis.Bytes(rs.db.Do("get", key)) + if err != nil { + continue + } + var tempAts []*ActionTiming + err = rs.ms.Unmarshal(values, &tempAts) + ats[key[len(ACTION_TIMING_PREFIX):]] = tempAts + } + + return +} + +func (rs *RedigoStorage) LogCallCost(uuid, source string, cc *CallCost) (err error) { + var result []byte + result, err = rs.ms.Marshal(cc) + if err != nil { + return + } + _, err = rs.db.Do("set", LOG_CALL_COST_PREFIX+source+"_"+uuid, result) + return +} + +func (rs *RedigoStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) { + var values []byte + if values, err = redis.Bytes(rs.db.Do("get", LOG_CALL_COST_PREFIX+source+"_"+uuid)); err == nil { + err = rs.ms.Unmarshal(values, cc) + } + return +} + +func (rs *RedigoStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as []*Action) (err error) { + mat, err := rs.ms.Marshal(at) + if err != nil { + return + } + mas, err := rs.ms.Marshal(as) + if err != nil { + return + } + rs.db.Do("set", LOG_ACTION_TRIGGER_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%s*%s*%s", ubId, string(mat), string(mas)))) + return +} + +func (rs *RedigoStorage) LogActionTiming(source string, at *ActionTiming, as []*Action) (err error) { + mat, err := rs.ms.Marshal(at) + if err != nil { + return + } + mas, err := rs.ms.Marshal(as) + if err != nil { + return + } + _, err = rs.db.Do("set", LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%s*%s", string(mat), string(mas)))) + return +} + +func (rs *RedigoStorage) LogError(uuid, source, errstr string) (err error) { + _, err = rs.db.Do("set", LOG_ERR+source+"_"+uuid, errstr) + return +} diff --git a/rater/storage_redis.go b/rater/storage_redis.go index a4c7baf0b..800a6f529 100644 --- a/rater/storage_redis.go +++ b/rater/storage_redis.go @@ -107,10 +107,9 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) { } func (rs *RedisStorage) GetActions(key string) (as []*Action, err error) { - if values, err := rs.db.Cmd("get", ACTION_PREFIX+key).Bytes(); err == nil { + var values []byte + if values, err = rs.db.Cmd("get", ACTION_PREFIX+key).Bytes(); err == nil { err = rs.ms.Unmarshal(values, &as) - } else { - return nil, err } return } diff --git a/rater/units_counter.go b/rater/units_counter.go index 97180b237..db05d504b 100644 --- a/rater/units_counter.go +++ b/rater/units_counter.go @@ -65,7 +65,7 @@ func (uc *UnitsCounter) addMinutes(amount float64, prefix string) { } func (uc *UnitsCounter) String() string { - return uc.BalanceId + " " + uc.Direction + return fmt.Sprintf("%s %s %v", uc.BalanceId, uc.Direction, uc.Units) } /* diff --git a/rater/userbalance_test.go b/rater/userbalance_test.go index 710800a62..d2b86aca4 100644 --- a/rater/userbalance_test.go +++ b/rater/userbalance_test.go @@ -19,6 +19,7 @@ along with this program. If not, see package rater import ( + //"log" "reflect" "testing" ) @@ -384,7 +385,6 @@ func TestUserBalanceExecuteTriggeredActions(t *testing.T) { if ub.BalanceMap[CREDIT+OUTBOUND] != 110 || ub.MinuteBuckets[0].Seconds != 20 { t.Error("Error executing triggered actions", ub.BalanceMap[CREDIT+OUTBOUND], ub.MinuteBuckets[0].Seconds) } - // we can reset them ub.resetActionTriggers() ub.countUnits(&Action{BalanceId: CREDIT, Direction: OUTBOUND, Units: 1})