diff --git a/agents/dmtagent.go b/agents/dmtagent.go index 21bf137b0..3b928faf4 100644 --- a/agents/dmtagent.go +++ b/agents/dmtagent.go @@ -93,12 +93,13 @@ func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestPro } smgEv, err := ccr.AsSMGenericEvent(reqProcessor.CCRFields) if err != nil { + utils.Logger.Err(fmt.Sprintf(" Processing message: %+v AsSMGenericEvent, error: %s", ccr.diamMessage, err)) *cca = *NewBareCCAFromCCR(ccr, self.cgrCfg.DiameterAgentCfg().OriginHost, self.cgrCfg.DiameterAgentCfg().OriginRealm) if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, strconv.Itoa(DiameterRatingFailed), false, self.cgrCfg.DiameterAgentCfg().Timezone); err != nil { + utils.Logger.Err(fmt.Sprintf(" Processing message: %+v messageSetAVPsWithPath, error: %s", cca.diamMessage, err.Error())) return false, err } - utils.Logger.Err(fmt.Sprintf(" Processing message: %+v AsSMGenericEvent, error: %s", ccr.diamMessage, err)) return false, ErrDiameterRatingFailed } if len(reqProcessor.Flags) != 0 { diff --git a/apier/v2/cdrs_it_test.go b/apier/v2/cdrs_it_test.go index 25644c26e..882909f85 100644 --- a/apier/v2/cdrs_it_test.go +++ b/apier/v2/cdrs_it_test.go @@ -110,7 +110,7 @@ func testV2CDRsInjectUnratedCdr(t *testing.T) { cdrsCfg.StorDBMaxOpenConns, cdrsCfg.StorDBMaxIdleConns) case "cdrsv2mongo": db, err = engine.NewMongoStorage(cdrsCfg.StorDBHost, cdrsCfg.StorDBPort, cdrsCfg.StorDBName, - cdrsCfg.StorDBUser, cdrsCfg.StorDBPass, cdrsCfg.StorDBCDRSIndexes, nil, 10) + cdrsCfg.StorDBUser, cdrsCfg.StorDBPass, utils.StorDB, cdrsCfg.StorDBCDRSIndexes, nil, 10) } if err != nil { t.Error("Error on opening database connection: ", err) diff --git a/apier/v2/tp_it_test.go b/apier/v2/tp_it_test.go index b55592646..517b2dab8 100644 --- a/apier/v2/tp_it_test.go +++ b/apier/v2/tp_it_test.go @@ -227,15 +227,12 @@ func testTPitDestinations(t *testing.T) { } } // Test get - /* - FixMe for mongodb - var rplyDst *utils.TPDestination - if err := tpRPC.Call("ApierV2.GetTPDestination", v1.AttrGetTPDestination{testTPid, dstDEMobile.DestinationId}, &rplyDst); err != nil { - t.Error("Calling ApierV2.GetTPDestination, got error: ", err.Error()) - } else if len(dstDEMobile.Prefixes) != len(rplyDst.Prefixes) { - t.Errorf("Calling ApierV2.GetTPDestination expected: %v, received: %v", dstDEMobile, rplyDst) - } - */ + var rplyDst *utils.TPDestination + if err := tpRPC.Call("ApierV2.GetTPDestination", v1.AttrGetTPDestination{testTPid, dstDEMobile.DestinationId}, &rplyDst); err != nil { + t.Error("Calling ApierV2.GetTPDestination, got error: ", err.Error()) + } else if len(dstDEMobile.Prefixes) != len(rplyDst.Prefixes) { + t.Errorf("Calling ApierV2.GetTPDestination expected: %v, received: %v", dstDEMobile, rplyDst) + } // Test remove if err := tpRPC.Call("ApierV2.RemTPDestination", v1.AttrGetTPDestination{testTPid, dstDUMMY.DestinationId}, &reply); err != nil { t.Error("Calling ApierV1.RemTPTiming, got error: ", err.Error()) @@ -250,5 +247,4 @@ func testTPitDestinations(t *testing.T) { } else if len(expectedDstIds) != len(rplyDstIds) { t.Errorf("Calling ApierV2.GetTPDestinationIds expected: %v, received: %v", expectedDstIds, rplyDstIds) } - } diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index 406f01cf5..a6cc6e53f 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -13,20 +13,17 @@ "tariffplan_db": { // database used to store active tariff plan configuration "db_type": "mongo", // stor database type to use: "db_port": 27017, // the port to reach the stordb - "db_name": "tpdb", }, "data_db": { // database used to store runtime data (eg: accounts, cdr stats) "db_type": "mongo", // stor database type to use: "db_port": 27017, // the port to reach the stordb - "db_name": "datadb", }, "stor_db": { "db_type": "mongo", // stor database type to use: "db_port": 27017, // the port to reach the stordb - "db_name": "stordb", }, diff --git a/engine/calldesc.go b/engine/calldesc.go index c37c1e730..41e732988 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -47,11 +47,11 @@ func init() { ratingStorage, _ = NewMapStorage() accountingStorage, _ = NewMapStorage() case "mongo": - ratingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_rating_test", "", "", nil, &config.CacheConfig{RatingPlans: &config.CacheParamConfig{Precache: true}}, 10) + ratingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_rating_test", "", "", utils.TariffPlanDB, nil, &config.CacheConfig{RatingPlans: &config.CacheParamConfig{Precache: true}}, 10) if err != nil { log.Fatal(err) } - accountingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_accounting_test", "", "", nil, &config.CacheConfig{RatingPlans: &config.CacheParamConfig{Precache: true}}, 10) + accountingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_accounting_test", "", "", utils.DataDB, nil, &config.CacheConfig{RatingPlans: &config.CacheParamConfig{Precache: true}}, 10) if err != nil { log.Fatal(err) } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 385e80524..d56eb75be 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -85,20 +85,7 @@ var ( CostLow = strings.ToLower(utils.COST) ) -type MongoStorage struct { - session *mgo.Session - db string - ms Marshaler - cacheCfg *config.CacheConfig - loadHistorySize int -} - -func (ms *MongoStorage) conn(col string) (*mgo.Session, *mgo.Collection) { - sessionCopy := ms.session.Copy() - return sessionCopy, sessionCopy.DB(ms.db).C(col) -} - -func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string, cacheCfg *config.CacheConfig, loadHistorySize int) (*MongoStorage, error) { +func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes []string, cacheCfg *config.CacheConfig, loadHistorySize int) (ms *MongoStorage, err error) { address := fmt.Sprintf("%s:%s", host, port) if user != "" && pass != "" { address = fmt.Sprintf("%s:%s@%s", user, pass, address) @@ -107,169 +94,183 @@ func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string, ca if err != nil { return nil, err } - - ndb := session.DB(db) session.SetMode(mgo.Strong, true) - index := mgo.Index{ + return &MongoStorage{db: db, session: session, storageType: storageType, ms: NewCodecMsgpackMarshaler(), cacheCfg: cacheCfg, loadHistorySize: loadHistorySize, cdrsIndexes: cdrsIndexes}, nil +} + +type MongoStorage struct { + session *mgo.Session + db string + storageType string // tariffplandb, datadb, stordb + ms Marshaler + cacheCfg *config.CacheConfig + loadHistorySize int + cdrsIndexes []string +} + +func (ms *MongoStorage) conn(col string) (*mgo.Session, *mgo.Collection) { + sessionCopy := ms.session.Copy() + return sessionCopy, sessionCopy.DB(ms.db).C(col) +} + +func (ms *MongoStorage) EnsureIndexes() (err error) { + dbSession, _ := ms.conn("") + defer dbSession.Close() + db := dbSession.DB(ms.db) + idx := mgo.Index{ Key: []string{"key"}, Unique: true, // Prevent two documents from having the same index key DropDups: false, // Drop documents with the same index key as a previously indexed one Background: false, // Build index in background and return immediately Sparse: false, // Only index documents containing the Key fields } - collections := []string{colAct, colApl, colAtr, colDcs, colAls, colRls, colUsr, colLcr, colLht, colRpl, colDst, colRds} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err + var colectNames []string // collection names containing this index + if ms.storageType == utils.TariffPlanDB { + colectNames = []string{colAct, colApl, colAtr, colDcs, colRls, colRpl, colLcr, colDst, colRds} + } else if ms.storageType == utils.DataDB { + colectNames = []string{colAls, colUsr, colLht} + } + for _, col := range colectNames { + if err = db.C(col).EnsureIndex(idx); err != nil { + return } } - index = mgo.Index{ + idx = mgo.Index{ Key: []string{"id"}, Unique: true, DropDups: false, Background: false, Sparse: false, } - collections = []string{colRpf, colShg, colAcc, colCrs} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err + if ms.storageType == utils.TariffPlanDB { + colectNames = []string{colRpf, colShg, colCrs} + } else if ms.storageType == utils.DataDB { + colectNames = []string{colAcc} + } + for _, col := range colectNames { + if err = db.C(col).EnsureIndex(idx); err != nil { + return } } - index = mgo.Index{ - Key: []string{"tpid", "tag"}, - Unique: true, - DropDups: false, - Background: false, - Sparse: false, - } - collections = []string{utils.TBL_TP_TIMINGS, utils.TBL_TP_DESTINATIONS, utils.TBL_TP_DESTINATION_RATES, utils.TBL_TP_RATING_PLANS, utils.TBL_TP_SHARED_GROUPS, utils.TBL_TP_CDR_STATS, utils.TBL_TP_ACTIONS, utils.TBL_TP_ACTION_PLANS, utils.TBL_TP_ACTION_TRIGGERS} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err + if ms.storageType == utils.StorDB { + idx = mgo.Index{ + Key: []string{"tpid", "tag"}, + Unique: true, + DropDups: false, + Background: false, + Sparse: false, } - } - index = mgo.Index{ - Key: []string{"tpid", "direction", "tenant", "category", "subject", "loadid"}, - Unique: true, - DropDups: false, - Background: false, - Sparse: false, - } - collections = []string{utils.TBL_TP_RATE_PROFILES} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err + for _, col := range []string{utils.TBL_TP_TIMINGS, utils.TBL_TP_DESTINATIONS, utils.TBL_TP_DESTINATION_RATES, utils.TBL_TP_RATING_PLANS, + utils.TBL_TP_SHARED_GROUPS, utils.TBL_TP_CDR_STATS, utils.TBL_TP_ACTIONS, utils.TBL_TP_ACTION_PLANS, utils.TBL_TP_ACTION_TRIGGERS} { + if err = db.C(col).EnsureIndex(idx); err != nil { + return + } } - } - index = mgo.Index{ - Key: []string{"tpid", "direction", "tenant", "category", "account", "subject"}, - Unique: true, - DropDups: false, - Background: false, - Sparse: false, - } - collections = []string{utils.TBL_TP_LCRS} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err + + idx = mgo.Index{ + Key: []string{"tpid", "direction", "tenant", "category", "subject", "loadid"}, + Unique: true, + DropDups: false, + Background: false, + Sparse: false, } - } - index = mgo.Index{ - Key: []string{"tpid", "tenant", "username"}, - Unique: true, - DropDups: false, - Background: false, - Sparse: false, - } - collections = []string{utils.TBL_TP_USERS} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err + if err = db.C(utils.TBL_TP_RATE_PROFILES).EnsureIndex(idx); err != nil { + return } - } - index = mgo.Index{ - Key: []string{"tpid", "direction", "tenant", "category", "account", "subject", "context"}, - Unique: true, - DropDups: false, - Background: false, - Sparse: false, - } - collections = []string{utils.TBL_TP_LCRS} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err + idx = mgo.Index{ + Key: []string{"tpid", "direction", "tenant", "category", "account", "subject"}, + Unique: true, + DropDups: false, + Background: false, + Sparse: false, } - } - index = mgo.Index{ - Key: []string{"tpid", "direction", "tenant", "category", "subject", "account", "loadid"}, - Unique: true, - DropDups: false, - Background: false, - Sparse: false, - } - collections = []string{utils.TBL_TP_DERIVED_CHARGERS} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err + if err = db.C(utils.TBL_TP_LCRS).EnsureIndex(idx); err != nil { + return } - } - index = mgo.Index{ - Key: []string{"tpid", "direction", "tenant", "account", "loadid"}, - Unique: true, - DropDups: false, - Background: false, - Sparse: false, - } - collections = []string{utils.TBL_TP_DERIVED_CHARGERS} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err + idx = mgo.Index{ + Key: []string{"tpid", "tenant", "username"}, + Unique: true, + DropDups: false, + Background: false, + Sparse: false, } - } - index = mgo.Index{ - Key: []string{CGRIDLow, RunIDLow, OriginIDLow}, - Unique: true, - DropDups: false, - Background: false, - Sparse: false, - } - if err = ndb.C(utils.TBL_CDRS).EnsureIndex(index); err != nil { - return nil, err - } - for _, idxKey := range cdrsIndexes { - index = mgo.Index{ - Key: []string{idxKey}, + if err = db.C(utils.TBL_TP_USERS).EnsureIndex(idx); err != nil { + return + } + idx = mgo.Index{ + Key: []string{"tpid", "direction", "tenant", "category", "account", "subject", "context"}, + Unique: true, + DropDups: false, + Background: false, + Sparse: false, + } + if err = db.C(utils.TBL_TP_LCRS).EnsureIndex(idx); err != nil { + return + } + idx = mgo.Index{ + Key: []string{"tpid", "direction", "tenant", "category", "subject", "account", "loadid"}, + Unique: true, + DropDups: false, + Background: false, + Sparse: false, + } + if err = db.C(utils.TBL_TP_DERIVED_CHARGERS).EnsureIndex(idx); err != nil { + return + } + idx = mgo.Index{ + Key: []string{"tpid", "direction", "tenant", "account", "loadid"}, + Unique: true, + DropDups: false, + Background: false, + Sparse: false, + } + if err = db.C(utils.TBL_TP_DERIVED_CHARGERS).EnsureIndex(idx); err != nil { + return + } + idx = mgo.Index{ + Key: []string{CGRIDLow, RunIDLow, OriginIDLow}, + Unique: true, + DropDups: false, + Background: false, + Sparse: false, + } + if err = db.C(utils.TBL_CDRS).EnsureIndex(idx); err != nil { + return + } + for _, idxKey := range ms.cdrsIndexes { + idx = mgo.Index{ + Key: []string{idxKey}, + Unique: false, + DropDups: false, + Background: false, + Sparse: false, + } + if err = db.C(utils.TBL_CDRS).EnsureIndex(idx); err != nil { + return + } + } + idx = mgo.Index{ + Key: []string{CGRIDLow, RunIDLow}, + Unique: true, + DropDups: false, + Background: false, + Sparse: false, + } + if err = db.C(utils.TBLSMCosts).EnsureIndex(idx); err != nil { + return + } + idx = mgo.Index{ + Key: []string{OriginHostLow, OriginIDLow}, Unique: false, DropDups: false, Background: false, Sparse: false, } - if err = ndb.C(utils.TBL_CDRS).EnsureIndex(index); err != nil { - return nil, err + if err = db.C(utils.TBLSMCosts).EnsureIndex(idx); err != nil { + return } } - index = mgo.Index{ - Key: []string{CGRIDLow, RunIDLow}, - Unique: true, - DropDups: false, - Background: false, - Sparse: false, - } - if err = ndb.C(utils.TBLSMCosts).EnsureIndex(index); err != nil { - return nil, err - } - index = mgo.Index{ - Key: []string{OriginHostLow, OriginIDLow}, - Unique: false, - DropDups: false, - Background: false, - Sparse: false, - } - if err = ndb.C(utils.TBLSMCosts).EnsureIndex(index); err != nil { - return nil, err - } - return &MongoStorage{db: db, session: session, ms: NewCodecMsgpackMarshaler(), cacheCfg: cacheCfg, loadHistorySize: loadHistorySize}, err + return } func (ms *MongoStorage) getColNameForPrefix(prefix string) (name string, ok bool) { @@ -327,7 +328,6 @@ func (ms *MongoStorage) RebuildReverseForPrefix(prefix string) error { if !ok { return utils.ErrInvalidKey } - session, col := ms.conn(colName) defer session.Close() diff --git a/engine/storage_mongo_it_test.go b/engine/storage_mongo_it_test.go index 02b4a6442..a28372c2e 100644 --- a/engine/storage_mongo_it_test.go +++ b/engine/storage_mongo_it_test.go @@ -38,7 +38,8 @@ func TestMGOitConnect(t *testing.T) { if err != nil { t.Fatal(err) } - if mgoITDB, err = NewMongoStorage(mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass, nil, mgoITCfg.CacheConfig, mgoITCfg.LoadHistorySize); err != nil { + if mgoITDB, err = NewMongoStorage(mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass, + utils.StorDB, nil, mgoITCfg.CacheConfig, mgoITCfg.LoadHistorySize); err != nil { t.Fatal(err) } } diff --git a/engine/storage_utils.go b/engine/storage_utils.go index 285fcd842..99b1d0f60 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -43,7 +43,7 @@ func ConfigureRatingStorage(db_type, host, port, name, user, pass, marshaler str } d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS, cacheCfg, loadHistorySize) case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, nil, cacheCfg, loadHistorySize) + d, err = NewMongoStorage(host, port, name, user, pass, utils.TariffPlanDB, nil, cacheCfg, loadHistorySize) db = d.(RatingStorage) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'", @@ -70,7 +70,7 @@ func ConfigureAccountingStorage(db_type, host, port, name, user, pass, marshaler } d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS, cacheCfg, loadHistorySize) case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, nil, cacheCfg, loadHistorySize) + d, err = NewMongoStorage(host, port, name, user, pass, utils.DataDB, nil, cacheCfg, loadHistorySize) db = d.(AccountingStorage) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'", @@ -99,7 +99,7 @@ func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler strin d, err = NewRedisStorage(host, db_nb, pass, marshaler) */ case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, cdrsIndexes, nil, 1) + d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1) case utils.POSTGRES: d, err = NewPostgresStorage(host, port, name, user, pass, maxConn, maxIdleConn) case utils.MYSQL: @@ -122,7 +122,7 @@ func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler strin case utils.MYSQL: d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn) case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, cdrsIndexes, nil, 1) + d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s]", db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES)) @@ -141,7 +141,7 @@ func ConfigureCdrStorage(db_type, host, port, name, user, pass string, maxConn, case utils.MYSQL: d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn) case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, cdrsIndexes, nil, 1) + d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s]", db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES)) diff --git a/integration_tests.sh b/integration_test.sh similarity index 100% rename from integration_tests.sh rename to integration_test.sh diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index b912a7ec5..0c0eb07bb 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -242,7 +242,8 @@ func (smg *SMGeneric) indexSession(s *SMGSession, passiveSessions bool) { if _, hasIt := ssRIdx[s.CGRID]; !hasIt { ssRIdx[s.CGRID] = make([]*riFieldNameVal, 0) } - ssRIdx[s.CGRID] = append(ssRIdx[s.CGRID], &riFieldNameVal{runID: s.RunID, fieldName: fieldName, fieldValue: fieldVal}) + riFlds := ssRIdx[s.CGRID] // attempt to avoid map concurrency write panic + ssRIdx[s.CGRID] = append(riFlds, &riFieldNameVal{runID: s.RunID, fieldName: fieldName, fieldValue: fieldVal}) } return } diff --git a/utils/consts.go b/utils/consts.go index 1985c6aaa..1f0a28663 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -334,4 +334,7 @@ const ( EVT_ACTION_TRIGGER_FIRED = "ACTION_TRIGGER_FIRED" EVT_ACTION_TIMING_FIRED = "ACTION_TRIGGER_FIRED" SMAsterisk = "sm_asterisk" + TariffPlanDB = "tariffplan_db" + DataDB = "data_db" + StorDB = "stor_db" )