diff --git a/engine/calldesc.go b/engine/calldesc.go index 7ca9c5bfa..5b5b070aa 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -37,10 +37,10 @@ func init() { Logger.Err(fmt.Sprintf("Could not connect to syslog: %v", err)) } //db_server := "127.0.0.1" - //db_server := "192.168.0.17" m, _ := NewMapStorage() //m, _ := NewMongoStorage(db_server, "27017", "cgrates_test", "", "") //m, _ := NewRedisStorage(db_server+":6379", 11, "", utils.MSGPACK) + //m, _ := NewSRedisStorage(db_server+":6379", 11, "", utils.MSGPACK) //fm, _ := NewRedigoStorage(db_server+":6379", 11, "") //m, _ := NewRadixStorage(db_server+":6379", 11, "") storageGetter, _ = m.(DataStorage) @@ -401,13 +401,15 @@ func (cd *CallDescriptor) GetCost() (*CallCost, error) { cost = utils.Round(cost, roundingDecimals, roundingMethod) //startIndex := len(fmt.Sprintf("%s:%s:%s:", cd.Direction, cd.Tenant, cd.TOR)) cc := &CallCost{ - Direction: cd.Direction, - TOR: cd.TOR, - Tenant: cd.Tenant, - Account: cd.Account, - Cost: cost, - ConnectFee: connectionFee, - Timespans: timespans} + Direction: cd.Direction, + TOR: cd.TOR, + Tenant: cd.Tenant, + Account: cd.Account, + Destination: cd.Destination, + Subject: cd.Subject, + Cost: cost, + ConnectFee: connectionFee, + Timespans: timespans} //Logger.Info(fmt.Sprintf(" Get Cost: %s => %v", cd.GetKey(), cc)) return cc, err } diff --git a/engine/loader_local_test.go b/engine/loader_local_test.go index 4e582ea9f..cfee8632a 100644 --- a/engine/loader_local_test.go +++ b/engine/loader_local_test.go @@ -271,7 +271,7 @@ func TestMatchLoadCsvWithStor(t *testing.T) { t.Fatal("Failed querying redis keys for csv data") } for _, key := range keysCsv { - refVal := "" + var refVal []byte for idx, rs := range []*RedisStorage{rsCsv, rsStor, rsApier} { qVal, err := rs.db.Get(key) if err != nil { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index ceed1941f..528134821 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -26,10 +26,8 @@ import ( "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/history" "github.com/cgrates/cgrates/utils" + "github.com/hoisie/redis" "io/ioutil" - "menteslibres.net/gosexy/redis" - "strconv" - "strings" "time" ) @@ -40,27 +38,13 @@ type RedisStorage struct { } func NewRedisStorage(address string, db int, pass, mrshlerStr string) (DataStorage, error) { - addrSplit := strings.Split(address, ":") - host := addrSplit[0] - port := 6379 - if len(addrSplit) == 2 { - port, _ = strconv.Atoi(addrSplit[1]) - } - ndb := redis.New() - err := ndb.Connect(host, uint(port)) - if err != nil { - return nil, err - } + ndb := &redis.Client{Addr: address, Db: db} if pass != "" { - if _, err = ndb.Auth(pass); err != nil { - return nil, err - } - } - if db > 0 { - if _, err = ndb.Select(int64(db)); err != nil { + if err := ndb.Auth(pass); err != nil { return nil, err } } + var mrshler Marshaler if mrshlerStr == utils.MSGPACK { mrshler = NewCodecMsgpackMarshaler() @@ -73,11 +57,12 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string) (DataStora } func (rs *RedisStorage) Close() { - rs.db.Quit() + // no close for me + //rs.db.Quit() } func (rs *RedisStorage) Flush() (err error) { - _, err = rs.db.FlushDB() + err = rs.db.Flush(false) return } @@ -120,9 +105,9 @@ func (rs *RedisStorage) GetRatingPlan(key string) (rp *RatingPlan, err error) { if x, err := cache2go.GetCached(key); err == nil { return x.(*RatingPlan), nil } - var values string + var values []byte if values, err = rs.db.Get(RATING_PLAN_PREFIX + key); err == nil { - b := bytes.NewBufferString(values) + b := bytes.NewBuffer(values) r, err := zlib.NewReader(b) if err != nil { return nil, err @@ -145,7 +130,7 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) { w := zlib.NewWriter(&b) w.Write(result) w.Close() - _, err = rs.db.Set(RATING_PLAN_PREFIX+rp.Id, b.Bytes()) + err = rs.db.Set(RATING_PLAN_PREFIX+rp.Id, b.Bytes()) if err == nil && historyScribe != nil { response := 0 historyScribe.Record(&history.Record{RATING_PLAN_PREFIX + rp.Id, rp}, &response) @@ -154,17 +139,17 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) { } func (rs *RedisStorage) GetRatingProfile(key string) (rp *RatingProfile, err error) { - var values string + var values []byte if values, err = rs.db.Get(RATING_PROFILE_PREFIX + key); err == nil { rp = new(RatingProfile) - err = rs.ms.Unmarshal([]byte(values), rp) + err = rs.ms.Unmarshal(values, rp) } return } func (rs *RedisStorage) SetRatingProfile(rp *RatingProfile) (err error) { result, err := rs.ms.Marshal(rp) - _, err = rs.db.Set(RATING_PROFILE_PREFIX+rp.Id, result) + err = rs.db.Set(RATING_PROFILE_PREFIX+rp.Id, result) if err == nil && historyScribe != nil { response := 0 historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rp.Id, rp}, &response) @@ -176,9 +161,9 @@ func (rs *RedisStorage) GetDestination(key string) (dest *Destination, err error if x, err := cache2go.GetCached(key); err == nil { return x.(*Destination), nil } - var values string + var values []byte if values, err = rs.db.Get(DESTINATION_PREFIX + key); len(values) > 0 && err == nil { - b := bytes.NewBufferString(values) + b := bytes.NewBuffer(values) r, err := zlib.NewReader(b) if err != nil { return nil, err @@ -195,24 +180,6 @@ func (rs *RedisStorage) GetDestination(key string) (dest *Destination, err error return } -/*func (rs *RedisStorage) DestinationContainsPrefix(key string, prefix string) (precision int, err error) { - if _, err := rs.db.SAdd(TEMP_DESTINATION_PREFIX+prefix, utils.SplitPrefixInterface(prefix)...); err != nil { - return 0, err - } - var values []string - if values, err = rs.db.SInter(DESTINATION_PREFIX+key, TEMP_DESTINATION_PREFIX+prefix); err == nil { - for _, p := range values { - if len(p) > precision { - precision = len(p) - } - } - } - if _, err := rs.db.Del(TEMP_DESTINATION_PREFIX + prefix); err != nil { - Logger.Err("Error removing temp ") - } - return -}*/ - func (rs *RedisStorage) SetDestination(dest *Destination) (err error) { result, err := rs.ms.Marshal(dest) if err != nil { @@ -222,7 +189,7 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) { w := zlib.NewWriter(&b) w.Write(result) w.Close() - _, err = rs.db.Set(DESTINATION_PREFIX+dest.Id, b.Bytes()) + err = rs.db.Set(DESTINATION_PREFIX+dest.Id, b.Bytes()) if err == nil && historyScribe != nil { response := 0 historyScribe.Record(&history.Record{DESTINATION_PREFIX + dest.Id, dest}, &response) @@ -231,24 +198,24 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) { } func (rs *RedisStorage) GetActions(key string) (as Actions, err error) { - var values string + var values []byte if values, err = rs.db.Get(ACTION_PREFIX + key); err == nil { - err = rs.ms.Unmarshal([]byte(values), &as) + err = rs.ms.Unmarshal(values, &as) } return } func (rs *RedisStorage) SetActions(key string, as Actions) (err error) { result, err := rs.ms.Marshal(&as) - _, err = rs.db.Set(ACTION_PREFIX+key, result) + err = rs.db.Set(ACTION_PREFIX+key, result) return } func (rs *RedisStorage) GetUserBalance(key string) (ub *UserBalance, err error) { - var values string + var values []byte if values, err = rs.db.Get(USER_BALANCE_PREFIX + key); err == nil { ub = &UserBalance{Id: key} - err = rs.ms.Unmarshal([]byte(values), ub) + err = rs.ms.Unmarshal(values, ub) } return @@ -256,14 +223,14 @@ func (rs *RedisStorage) GetUserBalance(key string) (ub *UserBalance, err error) func (rs *RedisStorage) SetUserBalance(ub *UserBalance) (err error) { result, err := rs.ms.Marshal(ub) - _, err = rs.db.Set(USER_BALANCE_PREFIX+ub.Id, result) + err = rs.db.Set(USER_BALANCE_PREFIX+ub.Id, result) return } func (rs *RedisStorage) GetActionTimings(key string) (ats ActionTimings, err error) { - var values string + var values []byte if values, err = rs.db.Get(ACTION_TIMING_PREFIX + key); err == nil { - err = rs.ms.Unmarshal([]byte(values), &ats) + err = rs.ms.Unmarshal(values, &ats) } return } @@ -275,7 +242,7 @@ func (rs *RedisStorage) SetActionTimings(key string, ats ActionTimings) (err err return err } result, err := rs.ms.Marshal(&ats) - _, err = rs.db.Set(ACTION_TIMING_PREFIX+key, result) + err = rs.db.Set(ACTION_TIMING_PREFIX+key, result) return } @@ -291,7 +258,7 @@ func (rs *RedisStorage) GetAllActionTimings() (ats map[string]ActionTimings, err continue } var tempAts ActionTimings - err = rs.ms.Unmarshal([]byte(values), &tempAts) + err = rs.ms.Unmarshal(values, &tempAts) ats[key[len(ACTION_TIMING_PREFIX):]] = tempAts } @@ -304,14 +271,14 @@ func (rs *RedisStorage) LogCallCost(uuid, source string, cc *CallCost) (err erro if err != nil { return } - _, err = rs.db.Set(LOG_CALL_COST_PREFIX+source+"_"+uuid, result) + err = rs.db.Set(LOG_CALL_COST_PREFIX+source+"_"+uuid, result) return } func (rs *RedisStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) { - var values string + var values []byte if values, err = rs.db.Get(LOG_CALL_COST_PREFIX + source + "_" + uuid); err == nil { - err = rs.ms.Unmarshal([]byte(values), cc) + err = rs.ms.Unmarshal(values, cc) } return } @@ -338,11 +305,11 @@ func (rs *RedisStorage) LogActionTiming(source string, at *ActionTiming, as Acti if err != nil { return } - _, err = rs.db.Set(LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v", string(mat), string(mas)))) + err = rs.db.Set(LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v", string(mat), string(mas)))) return } func (rs *RedisStorage) LogError(uuid, source, errstr string) (err error) { - _, err = rs.db.Set(LOG_ERR+source+"_"+uuid, errstr) + err = rs.db.Set(LOG_ERR+source+"_"+uuid, []byte(errstr)) return } diff --git a/engine/storage_sredis.go b/engine/storage_sredis.go new file mode 100644 index 000000000..3a9229387 --- /dev/null +++ b/engine/storage_sredis.go @@ -0,0 +1,348 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2013 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "bytes" + "compress/zlib" + "errors" + "fmt" + "github.com/cgrates/cgrates/cache2go" + "github.com/cgrates/cgrates/history" + "github.com/cgrates/cgrates/utils" + "io/ioutil" + "menteslibres.net/gosexy/redis" + "strconv" + "strings" + "time" +) + +type SRedisStorage struct { + dbNb int + db *redis.Client + ms Marshaler +} + +func NewSRedisStorage(address string, db int, pass, mrshlerStr string) (DataStorage, error) { + addrSplit := strings.Split(address, ":") + host := addrSplit[0] + port := 6379 + if len(addrSplit) == 2 { + port, _ = strconv.Atoi(addrSplit[1]) + } + ndb := redis.New() + err := ndb.Connect(host, uint(port)) + if err != nil { + return nil, err + } + if pass != "" { + if _, err = ndb.Auth(pass); err != nil { + return nil, err + } + } + if db > 0 { + if _, err = ndb.Select(int64(db)); err != nil { + return nil, err + } + } + var mrshler Marshaler + if mrshlerStr == utils.MSGPACK { + mrshler = NewCodecMsgpackMarshaler() + } else if mrshlerStr == utils.JSON { + mrshler = new(JSONMarshaler) + } else { + return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr) + } + return &SRedisStorage{db: ndb, dbNb: db, ms: mrshler}, nil +} + +func (rs *SRedisStorage) Close() { + rs.db.Quit() +} + +func (rs *SRedisStorage) Flush() (err error) { + _, err = rs.db.FlushDB() + return +} + +func (rs *SRedisStorage) PreCache(dKeys, rpKeys []string) (err error) { + if dKeys == nil { + if dKeys, err = rs.db.Keys(DESTINATION_PREFIX + "*"); err != nil { + return + } + } + for _, key := range dKeys { + cache2go.RemKey(key) + if _, err = rs.GetDestination(key[len(DESTINATION_PREFIX):]); err != nil { + return err + } + } + if rpKeys == nil { + if rpKeys, err = rs.db.Keys(RATING_PLAN_PREFIX + "*"); err != nil { + return + } + } + for _, key := range rpKeys { + cache2go.RemKey(key) + if _, err = rs.GetRatingPlan(key[len(RATING_PLAN_PREFIX):]); err != nil { + return err + } + } + return +} + +// Used to check if specific subject is stored using prefix key attached to entity +func (rs *SRedisStorage) ExistsData(category, subject string) (bool, error) { + switch category { + case DESTINATION_PREFIX, RATING_PLAN_PREFIX, RATING_PROFILE_PREFIX, ACTION_PREFIX, ACTION_TIMING_PREFIX, USER_BALANCE_PREFIX: + return rs.db.Exists(category + subject) + } + return false, errors.New("Unsupported category in ExistsData") +} + +func (rs *SRedisStorage) GetRatingPlan(key string) (rp *RatingPlan, err error) { + if x, err := cache2go.GetCached(key); err == nil { + return x.(*RatingPlan), nil + } + var values string + if values, err = rs.db.Get(RATING_PLAN_PREFIX + key); err == nil { + b := bytes.NewBufferString(values) + r, err := zlib.NewReader(b) + if err != nil { + return nil, err + } + out, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + r.Close() + rp = new(RatingPlan) + err = rs.ms.Unmarshal(out, rp) + cache2go.Cache(key, rp) + } + return +} + +func (rs *SRedisStorage) SetRatingPlan(rp *RatingPlan) (err error) { + result, err := rs.ms.Marshal(rp) + var b bytes.Buffer + w := zlib.NewWriter(&b) + w.Write(result) + w.Close() + _, err = rs.db.Set(RATING_PLAN_PREFIX+rp.Id, b.Bytes()) + if err == nil && historyScribe != nil { + response := 0 + historyScribe.Record(&history.Record{RATING_PLAN_PREFIX + rp.Id, rp}, &response) + } + return +} + +func (rs *SRedisStorage) GetRatingProfile(key string) (rp *RatingProfile, err error) { + var values string + if values, err = rs.db.Get(RATING_PROFILE_PREFIX + key); err == nil { + rp = new(RatingProfile) + err = rs.ms.Unmarshal([]byte(values), rp) + } + return +} + +func (rs *SRedisStorage) SetRatingProfile(rp *RatingProfile) (err error) { + result, err := rs.ms.Marshal(rp) + _, err = rs.db.Set(RATING_PROFILE_PREFIX+rp.Id, result) + if err == nil && historyScribe != nil { + response := 0 + historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rp.Id, rp}, &response) + } + return +} + +func (rs *SRedisStorage) GetDestination(key string) (dest *Destination, err error) { + if x, err := cache2go.GetCached(key); err == nil { + return x.(*Destination), nil + } + var values string + if values, err = rs.db.Get(DESTINATION_PREFIX + key); len(values) > 0 && err == nil { + b := bytes.NewBufferString(values) + r, err := zlib.NewReader(b) + if err != nil { + return nil, err + } + out, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + r.Close() + dest = new(Destination) + err = rs.ms.Unmarshal(out, dest) + cache2go.Cache(key, dest) + } + return +} + +/*func (rs *SRedisStorage) DestinationContainsPrefix(key string, prefix string) (precision int, err error) { + if _, err := rs.db.SAdd(TEMP_DESTINATION_PREFIX+prefix, utils.SplitPrefixInterface(prefix)...); err != nil { + return 0, err + } + var values []string + if values, err = rs.db.SInter(DESTINATION_PREFIX+key, TEMP_DESTINATION_PREFIX+prefix); err == nil { + for _, p := range values { + if len(p) > precision { + precision = len(p) + } + } + } + if _, err := rs.db.Del(TEMP_DESTINATION_PREFIX + prefix); err != nil { + Logger.Err("Error removing temp ") + } + return +}*/ + +func (rs *SRedisStorage) SetDestination(dest *Destination) (err error) { + result, err := rs.ms.Marshal(dest) + if err != nil { + return err + } + var b bytes.Buffer + w := zlib.NewWriter(&b) + w.Write(result) + w.Close() + _, err = rs.db.Set(DESTINATION_PREFIX+dest.Id, b.Bytes()) + if err == nil && historyScribe != nil { + response := 0 + historyScribe.Record(&history.Record{DESTINATION_PREFIX + dest.Id, dest}, &response) + } + return +} + +func (rs *SRedisStorage) GetActions(key string) (as Actions, err error) { + var values string + if values, err = rs.db.Get(ACTION_PREFIX + key); err == nil { + err = rs.ms.Unmarshal([]byte(values), &as) + } + return +} + +func (rs *SRedisStorage) SetActions(key string, as Actions) (err error) { + result, err := rs.ms.Marshal(&as) + _, err = rs.db.Set(ACTION_PREFIX+key, result) + return +} + +func (rs *SRedisStorage) GetUserBalance(key string) (ub *UserBalance, err error) { + var values string + if values, err = rs.db.Get(USER_BALANCE_PREFIX + key); err == nil { + ub = &UserBalance{Id: key} + err = rs.ms.Unmarshal([]byte(values), ub) + } + + return +} + +func (rs *SRedisStorage) SetUserBalance(ub *UserBalance) (err error) { + result, err := rs.ms.Marshal(ub) + _, err = rs.db.Set(USER_BALANCE_PREFIX+ub.Id, result) + return +} + +func (rs *SRedisStorage) GetActionTimings(key string) (ats ActionTimings, err error) { + var values string + if values, err = rs.db.Get(ACTION_TIMING_PREFIX + key); err == nil { + err = rs.ms.Unmarshal([]byte(values), &ats) + } + return +} + +func (rs *SRedisStorage) SetActionTimings(key string, ats ActionTimings) (err error) { + if len(ats) == 0 { + // delete the key + _, err = rs.db.Del(ACTION_TIMING_PREFIX + key) + return err + } + result, err := rs.ms.Marshal(&ats) + _, err = rs.db.Set(ACTION_TIMING_PREFIX+key, result) + return +} + +func (rs *SRedisStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) { + keys, err := rs.db.Keys(ACTION_TIMING_PREFIX + "*") + if err != nil { + return nil, err + } + ats = make(map[string]ActionTimings, len(keys)) + for _, key := range keys { + values, err := rs.db.Get(key) + if err != nil { + continue + } + var tempAts ActionTimings + err = rs.ms.Unmarshal([]byte(values), &tempAts) + ats[key[len(ACTION_TIMING_PREFIX):]] = tempAts + } + + return +} + +func (rs *SRedisStorage) LogCallCost(uuid, source string, cc *CallCost) (err error) { + var result []byte + result, err = rs.ms.Marshal(cc) + if err != nil { + return + } + _, err = rs.db.Set(LOG_CALL_COST_PREFIX+source+"_"+uuid, result) + return +} + +func (rs *SRedisStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) { + var values string + if values, err = rs.db.Get(LOG_CALL_COST_PREFIX + source + "_" + uuid); err == nil { + err = rs.ms.Unmarshal([]byte(values), cc) + } + return +} + +func (rs *SRedisStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) { + mat, err := rs.ms.Marshal(at) + if err != nil { + return + } + mas, err := rs.ms.Marshal(as) + if err != nil { + return + } + rs.db.Set(LOG_ACTION_TRIGGER_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v*%v", ubId, string(mat), string(mas)))) + return +} + +func (rs *SRedisStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) { + mat, err := rs.ms.Marshal(at) + if err != nil { + return + } + mas, err := rs.ms.Marshal(as) + if err != nil { + return + } + _, err = rs.db.Set(LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v", string(mat), string(mas)))) + return +} + +func (rs *SRedisStorage) LogError(uuid, source, errstr string) (err error) { + _, err = rs.db.Set(LOG_ERR+source+"_"+uuid, errstr) + return +} diff --git a/hard_update_external_libs.py b/hard_update_external_libs.py index 3b55969b8..8e5d75915 100755 --- a/hard_update_external_libs.py +++ b/hard_update_external_libs.py @@ -13,6 +13,7 @@ libs = ('github.com/fzzy/radix/redis', 'github.com/go-sql-driver/mysql', 'github.com/garyburd/redigo/redis', 'menteslibres.net/gosexy/redis', + 'github.com/hoisie/redis' 'github.com/howeyc/fsnotify', ) diff --git a/update_external_libs.sh b/update_external_libs.sh index 975dd31c1..dc6687464 100755 --- a/update_external_libs.sh +++ b/update_external_libs.sh @@ -6,5 +6,6 @@ go get -v -u labix.org/v2/mgo go get -v -u github.com/cgrates/fsock go get -u -v github.com/go-sql-driver/mysql go get -u -v menteslibres.net/gosexy/redis +go get -u -v github.com/hoisie/redis go get -u -v github.com/howeyc/fsnotify