diff --git a/agents/libdmt.go b/agents/libdmt.go index 39766b378..778de367c 100644 --- a/agents/libdmt.go +++ b/agents/libdmt.go @@ -204,13 +204,19 @@ func metaHandler(m *diam.Message, tag, arg string, debitInterval time.Duration) return "", nil } -func avpsWithPath(m *diam.Message, rsrFld *utils.RSRField) ([]*diam.AVP, error) { - hierarchyPath := strings.Split(rsrFld.Id, utils.HIERARCHY_SEP) - hpIf := make([]interface{}, len(hierarchyPath)) - for i, val := range hierarchyPath { - hpIf[i] = val +// splitIntoInterface is used to split a string into []interface{} instead of []string +func splitIntoInterface(content, sep string) []interface{} { + spltStr := strings.Split(content, sep) + spltIf := make([]interface{}, len(spltStr)) + for i, val := range spltStr { + spltIf[i] = val } - return m.FindAVPsWithPath(hpIf, dict.UndefinedVendorID) + return spltIf +} + +// avpsWithPath is used to find AVPs by specifying RSRField as filter +func avpsWithPath(m *diam.Message, rsrFld *utils.RSRField) ([]*diam.AVP, error) { + return m.FindAVPsWithPath(splitIntoInterface(rsrFld.Id, utils.HIERARCHY_SEP), dict.UndefinedVendorID) } // Follows the implementation in the StorCdr @@ -297,6 +303,44 @@ func fieldOutVal(m *diam.Message, cfgFld *config.CfgCdrField, debitInterval time return fmtValOut, nil } +// messageAddAVPsWithPath will dynamically add AVPs into the message +func messageAddAVPsWithPath(m *diam.Message, path []interface{}, avpValByte []byte) error { + if len(path) == 0 { + return errors.New("Empty path as AVP filter") + } + dictAVPs := make([]*dict.AVP, len(path)) // for each subpath, one dictionary AVP + for i, subpath := range path { + if dictAVP, err := m.Dictionary().FindAVP(m.Header.ApplicationID, subpath); err != nil { + return err + } else if dictAVP == nil { + return fmt.Errorf("Cannot find AVP with id: %s", path[len(path)-1]) + } else { + dictAVPs[i] = dictAVP + } + } + if dictAVPs[len(path)-1].Data.Type == diam.GroupedAVPType { + return errors.New("Last AVP in path needs not to be GroupedAVP") + } + var msgAVP *diam.AVP // Keep a reference here towards last AVP + for i := len(path) - 1; i >= 0; i-- { + var typeVal datatype.Type + var err error + if msgAVP == nil { + typeVal, err = datatype.Decode(dictAVPs[i].Data.Type, avpValByte) + if err != nil { + return err + } + } else { + typeVal = &diam.GroupedAVP{ + AVP: []*diam.AVP{msgAVP}} + } + msgAVP = diam.NewAVP(dictAVPs[i].Code, avp.Mbit, dictAVPs[i].VendorID, typeVal) // FixMe: maybe Mbit with dictionary one + } + m.AVP = append(m.AVP, msgAVP) + m.Header.MessageLength += uint32(msgAVP.Len()) + return nil +} + // debitInterval is the configured debitInterval, in sync with the diameter client one func NewCCRFromDiameterMessage(m *diam.Message, debitInterval time.Duration) (*CCR, error) { var ccr CCR @@ -455,7 +499,7 @@ func NewCCAFromCCR(ccr *CCR) *CCA { } } -// Call Control Answer +// Call Control Answer, bare structure so we can dynamically manage adding it's fields type CCA struct { SessionId string `avp:"Session-Id"` OriginHost string `avp:"Origin-Host"` diff --git a/agents/libdmt_test.go b/agents/libdmt_test.go index 49f28b5ed..68fdb608e 100644 --- a/agents/libdmt_test.go +++ b/agents/libdmt_test.go @@ -19,6 +19,7 @@ along with this program. If not, see package agents import ( + "reflect" "testing" "time" @@ -135,3 +136,17 @@ func TestFieldOutVal(t *testing.T) { t.Errorf("Expecting: %s, received: %s", eOut, fldOut) } } + +func TestMessageAddAVPsWithPath(t *testing.T) { + eMessage := diam.NewRequest(diam.CreditControl, 4, nil) + eMessage.NewAVP("Subscription-Id", avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("33708000003")), // Subscription-Id-Data + }}) + m := diam.NewMessage(diam.CreditControl, diam.RequestFlag, 4, eMessage.Header.HopByHopID, eMessage.Header.EndToEndID, nil) + if err := messageAddAVPsWithPath(m, []interface{}{"Subscription-Id", "Subscription-Id-Data"}, []byte("33708000003")); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eMessage, m) { + t.Errorf("Expecting: %+v, received: %+v", eMessage, m) + } +} diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index e77aeed4a..a3a5b77a7 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -19,8 +19,11 @@ along with this program. If not, see package engine import ( + "bytes" + "compress/zlib" "errors" "fmt" + "io/ioutil" "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/utils" @@ -57,6 +60,7 @@ const ( type MongoStorage struct { session *mgo.Session db *mgo.Database + ms Marshaler } func NewMongoStorage(host, port, db, user, pass string) (*MongoStorage, error) { @@ -77,7 +81,7 @@ func NewMongoStorage(host, port, db, user, pass string) (*MongoStorage, error) { Background: false, // Build index in background and return immediately Sparse: false, // Only index documents containing the Key fields } - collections := []string{colAct, colApl, colAtr, colDcs, colAls, colUsr, colLcr, colLht} + collections := []string{colAct, colApl, colAtr, colDcs, colAls, colUsr, colLcr, colLht, colRpl, colDst} for _, col := range collections { if err = ndb.C(col).EnsureIndex(index); err != nil { return nil, err @@ -90,7 +94,7 @@ func NewMongoStorage(host, port, db, user, pass string) (*MongoStorage, error) { Background: false, Sparse: false, } - collections = []string{colDst, colRpf, colRpl, colDst, colShg, colAcc, colCrs} + collections = []string{colRpf, colShg, colAcc, colCrs} for _, col := range collections { if err = ndb.C(col).EnsureIndex(index); err != nil { return nil, err @@ -200,7 +204,8 @@ func NewMongoStorage(host, port, db, user, pass string) (*MongoStorage, error) { return nil, err } } - return &MongoStorage{db: ndb, session: session}, err + + return &MongoStorage{db: ndb, session: session, ms: NewCodecMsgpackMarshaler()}, err } func (ms *MongoStorage) Close() { @@ -275,10 +280,10 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac if dKeys == nil || (float64(cache2go.CountEntries(utils.DESTINATION_PREFIX))*utils.DESTINATIONS_LOAD_THRESHOLD < float64(len(dKeys))) { // if need to load more than a half of exiting keys load them all utils.Logger.Info("Caching all destinations") - iter := ms.db.C(colDst).Find(nil).Select(bson.M{"id": 1}).Iter() + iter := ms.db.C(colDst).Find(nil).Select(bson.M{"key": 1}).Iter() dKeys = make([]string, 0) - for iter.Next(&idResult) { - dKeys = append(dKeys, utils.DESTINATION_PREFIX+idResult.Id) + for iter.Next(&keyResult) { + dKeys = append(dKeys, utils.DESTINATION_PREFIX+keyResult.Key) } if err := iter.Close(); err != nil { cache2go.RollbackTransaction() @@ -304,10 +309,10 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac } if rpKeys == nil { utils.Logger.Info("Caching all rating plans") - iter := ms.db.C(colRpl).Find(nil).Select(bson.M{"id": 1}).Iter() + iter := ms.db.C(colRpl).Find(nil).Select(bson.M{"key": 1}).Iter() rpKeys = make([]string, 0) - for iter.Next(&idResult) { - rpKeys = append(rpKeys, utils.RATING_PLAN_PREFIX+idResult.Id) + for iter.Next(&keyResult) { + rpKeys = append(rpKeys, utils.RATING_PLAN_PREFIX+keyResult.Key) } if err := iter.Close(); err != nil { cache2go.RollbackTransaction() @@ -572,10 +577,10 @@ func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) { func (ms *MongoStorage) HasData(category, subject string) (bool, error) { switch category { case utils.DESTINATION_PREFIX: - count, err := ms.db.C(colDst).Find(bson.M{"id": subject}).Count() + count, err := ms.db.C(colDst).Find(bson.M{"key": subject}).Count() return count > 0, err case utils.RATING_PLAN_PREFIX: - count, err := ms.db.C(colRpl).Find(bson.M{"id": subject}).Count() + count, err := ms.db.C(colRpl).Find(bson.M{"key": subject}).Count() return count > 0, err case utils.RATING_PROFILE_PREFIX: count, err := ms.db.C(colRpf).Find(bson.M{"id": subject}).Count() @@ -602,15 +607,44 @@ func (ms *MongoStorage) GetRatingPlan(key string, skipCache bool) (rp *RatingPla } } rp = new(RatingPlan) - err = ms.db.C(colRpl).Find(bson.M{"id": key}).One(rp) + var kv struct { + Key string + Value []byte + } + err = ms.db.C(colRpl).Find(bson.M{"key": key}).One(&kv) if err == nil { + b := bytes.NewBuffer(kv.Value) + r, err := zlib.NewReader(b) + if err != nil { + return nil, err + } + out, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + r.Close() + err = ms.ms.Unmarshal(out, &rp) + if err != nil { + return nil, err + } cache2go.Cache(utils.RATING_PLAN_PREFIX+key, rp) } return } func (ms *MongoStorage) SetRatingPlan(rp *RatingPlan) error { - _, err := ms.db.C(colRpl).Upsert(bson.M{"id": rp.Id}, rp) + result, err := ms.ms.Marshal(rp) + if err != nil { + return err + } + var b bytes.Buffer + w := zlib.NewWriter(&b) + w.Write(result) + w.Close() + _, err = ms.db.C(colRpl).Upsert(bson.M{"key": rp.Id}, &struct { + Key string + Value []byte + }{Key: rp.Id, Value: b.Bytes()}) if err == nil && historyScribe != nil { var response int historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(), &response) @@ -690,20 +724,50 @@ func (ms *MongoStorage) SetLCR(lcr *LCR) error { func (ms *MongoStorage) GetDestination(key string) (result *Destination, err error) { result = new(Destination) - err = ms.db.C(colDst).Find(bson.M{"id": key}).One(result) + var kv struct { + Key string + Value []byte + } + err = ms.db.C(colDst).Find(bson.M{"key": key}).One(&kv) + if err == nil { + b := bytes.NewBuffer(kv.Value) + r, err := zlib.NewReader(b) + if err != nil { + return nil, err + } + out, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + r.Close() + err = ms.ms.Unmarshal(out, &result) + if err != nil { + return nil, err + } + // create optimized structure + for _, p := range result.Prefixes { + cache2go.Push(utils.DESTINATION_PREFIX+p, result.Id) + } + } if err != nil { result = nil - return - } - // create optimized structure - for _, p := range result.Prefixes { - cache2go.Push(utils.DESTINATION_PREFIX+p, result.Id) } return } func (ms *MongoStorage) SetDestination(dest *Destination) (err error) { - _, err = ms.db.C(colDst).Upsert(bson.M{"id": dest.Id}, dest) + result, err := ms.ms.Marshal(dest) + if err != nil { + return err + } + var b bytes.Buffer + w := zlib.NewWriter(&b) + w.Write(result) + w.Close() + _, err = ms.db.C(colDst).Upsert(bson.M{"key": dest.Id}, &struct { + Key string + Value []byte + }{Key: dest.Id, Value: b.Bytes()}) if err == nil && historyScribe != nil { var response int historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(), &response) @@ -1037,20 +1101,31 @@ func (ms *MongoStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl } var kv struct { Key string - Value *ActionPlan + Value []byte } err = ms.db.C(colApl).Find(bson.M{"key": key}).One(&kv) if err == nil { - ats = kv.Value + b := bytes.NewBuffer(kv.Value) + r, err := zlib.NewReader(b) + if err != nil { + return nil, err + } + out, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + r.Close() + err = ms.ms.Unmarshal(out, &ats) + if err != nil { + return nil, err + } cache2go.Cache(utils.ACTION_PLAN_PREFIX+key, ats) } - ats.AccountIDs = utils.YesDots(ats.AccountIDs) return } func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan) error { // clean dots from account ids map - ats.AccountIDs = utils.NoDots(ats.AccountIDs) if len(ats.ActionTimings) == 0 { cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key) err := ms.db.C(colApl).Remove(bson.M{"key": key}) @@ -1059,10 +1134,18 @@ func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan) error { } return nil } - _, err := ms.db.C(colApl).Upsert(bson.M{"key": key}, &struct { + result, err := ms.ms.Marshal(ats) + if err != nil { + return err + } + var b bytes.Buffer + w := zlib.NewWriter(&b) + w.Write(result) + w.Close() + _, err = ms.db.C(colApl).Upsert(bson.M{"key": key}, &struct { Key string - Value *ActionPlan - }{Key: key, Value: ats}) + Value []byte + }{Key: key, Value: b.Bytes()}) return err } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 819eaec38..80568003e 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -927,7 +927,7 @@ func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan) (err error) { cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key) return err } - result, err := rs.ms.Marshal(&ats) + result, err := rs.ms.Marshal(ats) if err != nil { return err } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 8012e7ad0..8c1467e0c 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -1341,6 +1341,21 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) { AccountID: accID, ActionsID: at.ActionsID, } + if verbose { + log.Println("\tTask: ", t) + } + if err = tpr.ratingStorage.PushTask(t); err != nil { + return err + } + } + if len(ap.AccountIDs) == 0 { + t := &Task{ + Uuid: utils.GenUUID(), + ActionsID: at.ActionsID, + } + if verbose { + log.Println("\tTask: ", t) + } if err = tpr.ratingStorage.PushTask(t); err != nil { return err } diff --git a/glide.lock b/glide.lock index 032aebf5d..97ec30d38 100644 --- a/glide.lock +++ b/glide.lock @@ -1,9 +1,8 @@ hash: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 -updated: 2015-12-16T09:32:54.975724404+02:00 +updated: 2015-12-18T18:02:12.669823411+02:00 imports: - name: github.com/cenkalti/hub version: 57d753b5f4856e77b3cf8ecce78c97215a7d324d - repo: https://github.com/cenkalti/hub - name: github.com/cenkalti/rpc2 version: 2d1be381ce47537e9e076b2b76dc70933162e4e9 - name: github.com/cgrates/fsock @@ -29,10 +28,9 @@ imports: - name: github.com/gorhill/cronexpr version: a557574d6c024ed6e36acc8b610f5f211c91568a - name: github.com/jinzhu/gorm - version: c6a22c50962028255a718f22fe7e8959e8c67884 + version: d209be3138acbe304daffee637bc495499c1e70e - name: github.com/jinzhu/inflection version: 3272df6c21d04180007eb3349844c89a3856bc25 - repo: https://github.com/jinzhu/inflection - name: github.com/kr/pty version: f7ee69f31298ecbe5d2b349c711e2547a617d398 - name: github.com/lib/pq @@ -43,7 +41,7 @@ imports: - /pool - redis - name: github.com/peterh/liner - version: 4d47685ab2fd2dbb46c66b831344d558bc4be5b9 + version: 3f1c20449d1836aa4cbe38731b96f95cdf89634d - name: github.com/ugorji/go version: cd43bdd6be4b5675a0d1e75c4af55ee1dc0d9c5e subpackages: @@ -51,18 +49,17 @@ imports: - name: golang.org/x/crypto version: f18420efc3b4f8e9f3d51f6bd2476e92c46260e9 - name: golang.org/x/net - version: 548f7bf20c8aae87fecd9aa09cc89065451e6271 + version: 28273ec927bee3bea305f112fc28ceee575ea893 subpackages: - /websocket - name: golang.org/x/text - version: 92ca7bbb695e2e9675f1d731fa85760f95d2c0df + version: cf4986612c83df6c55578ba198316d1684a9a287 - name: gopkg.in/fsnotify.v1 - version: 2cdd39bd6129c6a49c74fb07fb9d77ba1271c572 + version: 508915b7500b6e42a87132e4afeb4729cebc7cbb - name: gopkg.in/mgo.v2 version: e30de8ac9ae3b30df7065f766c71f88bba7d4e49 subpackages: - bson - name: gopkg.in/tomb.v2 version: 14b3d72120e8d10ea6e6b7f87f7175734b1faab8 - repo: https://gopkg.in/tomb.v2 devImports: []