diff --git a/agents/agentreq_test.go b/agents/agentreq_test.go index 066a19a86..1bd400487 100644 --- a/agents/agentreq_test.go +++ b/agents/agentreq_test.go @@ -38,7 +38,7 @@ import ( func TestAgReqAsNavigableMap(t *testing.T) { data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) @@ -134,7 +134,7 @@ func TestAgReqAsNavigableMap(t *testing.T) { func TestAgReqMaxCost(t *testing.T) { data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) @@ -178,7 +178,7 @@ func TestAgReqParseFieldDiameter(t *testing.T) { //create diameterDataProvider dP := newDADataProvider(nil, m) data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request @@ -228,7 +228,7 @@ func TestAgReqParseFieldRadius(t *testing.T) { //create radiusDataProvider dP := newRADataProvider(pkt) data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request @@ -268,7 +268,7 @@ Host: api.cgrates.org //create radiusDataProvider dP, _ := newHTTPUrlDP(req) data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request @@ -339,7 +339,7 @@ func TestAgReqParseFieldHttpXml(t *testing.T) { //create radiusDataProvider dP, _ := newHTTPXmlDP(req) data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request @@ -368,7 +368,7 @@ func TestAgReqParseFieldHttpXml(t *testing.T) { func TestAgReqEmptyFilter(t *testing.T) { data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) @@ -411,7 +411,7 @@ func TestAgReqEmptyFilter(t *testing.T) { func TestAgReqMetaExponent(t *testing.T) { data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) @@ -437,7 +437,7 @@ func TestAgReqMetaExponent(t *testing.T) { func TestAgReqCGRActiveRequest(t *testing.T) { data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) @@ -480,7 +480,7 @@ func TestAgReqCGRActiveRequest(t *testing.T) { func TestAgReqFieldAsNone(t *testing.T) { data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) @@ -517,7 +517,7 @@ func TestAgReqFieldAsNone(t *testing.T) { func TestAgReqFieldAsNone2(t *testing.T) { data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) @@ -557,7 +557,7 @@ func TestAgReqFieldAsNone2(t *testing.T) { func TestAgReqAsNavigableMap2(t *testing.T) { data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) @@ -614,7 +614,7 @@ func TestAgReqAsNavigableMap2(t *testing.T) { func TestAgReqFieldAsInterface(t *testing.T) { data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) @@ -661,7 +661,7 @@ func TestAgReqFieldAsInterface(t *testing.T) { func TestAgReqNewARWithCGRRplyAndRply(t *testing.T) { data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) @@ -711,7 +711,7 @@ func TestAgReqNewARWithCGRRplyAndRply(t *testing.T) { func TestAgReqSetCGRReplyWithError(t *testing.T) { data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) @@ -752,7 +752,7 @@ func (ev myEv) AsNavigableMap(tpl []*config.FCTemplate) (*config.NavigableMap, e func TestAgReqSetCGRReplyWithoutError(t *testing.T) { data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) @@ -814,7 +814,7 @@ func TestAgReqParseFieldMetaCCUsage(t *testing.T) { //create diameterDataProvider dP := newDADataProvider(nil, m) data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request @@ -892,7 +892,7 @@ func TestAgReqParseFieldMetaUsageDifference(t *testing.T) { //create diameterDataProvider dP := newDADataProvider(nil, m) data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request @@ -958,7 +958,7 @@ func TestAgReqParseFieldMetaSum(t *testing.T) { //create diameterDataProvider dP := newDADataProvider(nil, m) data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request @@ -1002,7 +1002,7 @@ func TestAgReqParseFieldMetaDifference(t *testing.T) { //create diameterDataProvider dP := newDADataProvider(nil, m) data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request @@ -1046,7 +1046,7 @@ func TestAgReqParseFieldMetaValueExponent(t *testing.T) { //create diameterDataProvider dP := newDADataProvider(nil, m) data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request diff --git a/agents/diamagent_test.go b/agents/diamagent_test.go index e4d169c69..d84f06ecf 100644 --- a/agents/diamagent_test.go +++ b/agents/diamagent_test.go @@ -50,7 +50,7 @@ func (s *testMockSessionConn) Call(method string, arg interface{}, rply interfac func TestProcessRequest(t *testing.T) { data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) filters := engine.NewFilterS(config.CgrConfig(), nil, nil, nil, dm) // no need for filterS but stiil try to configure the dm :D cgrRplyNM := config.NewNavigableMap(nil) diff --git a/apier/v1/accounts_test.go b/apier/v1/accounts_test.go index 75155d3a1..4425e52ac 100644 --- a/apier/v1/accounts_test.go +++ b/apier/v1/accounts_test.go @@ -34,7 +34,7 @@ var ( func init() { apierAcntsAcntStorage, _ = engine.NewMapStorage() cfg, _ := config.NewDefaultCGRConfig() - apierAcnts = &ApierV1{DataManager: engine.NewDataManager(apierAcntsAcntStorage, config.CgrConfig().CacheCfg()), Config: cfg} + apierAcnts = &ApierV1{DataManager: engine.NewDataManager(apierAcntsAcntStorage, config.CgrConfig().CacheCfg(), nil, nil), Config: cfg} } func TestSetAccounts(t *testing.T) { diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 4835f878e..fdba09e5a 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -200,9 +200,12 @@ func (self *ApierV1) LoadDestination(attrs AttrLoadDestination, reply *string) e if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, + dbReader, err := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, attrs.TPid, self.Config.GeneralCfg().DefaultTimezone, self.CacheS, self.SchedulerS) + if err != nil { + return utils.NewErrServerError(err) + } if loaded, err := dbReader.LoadDestinationsFiltered(attrs.ID); err != nil { return utils.NewErrServerError(err) } else if !loaded { @@ -225,9 +228,12 @@ func (self *ApierV1) LoadRatingPlan(attrs AttrLoadRatingPlan, reply *string) err if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, + dbReader, err := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, attrs.TPid, self.Config.GeneralCfg().DefaultTimezone, self.CacheS, self.SchedulerS) + if err != nil { + return utils.NewErrServerError(err) + } if loaded, err := dbReader.LoadRatingPlansFiltered(attrs.RatingPlanId); err != nil { return utils.NewErrServerError(err) } else if !loaded { @@ -242,9 +248,12 @@ func (self *ApierV1) LoadRatingProfile(attrs utils.TPRatingProfile, reply *strin if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, + dbReader, err := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, attrs.TPid, self.Config.GeneralCfg().DefaultTimezone, self.CacheS, self.SchedulerS) + if err != nil { + return utils.NewErrServerError(err) + } if err := dbReader.LoadRatingProfilesFiltered(&attrs); err != nil { return utils.NewErrServerError(err) } @@ -262,9 +271,12 @@ func (self *ApierV1) LoadSharedGroup(attrs AttrLoadSharedGroup, reply *string) e if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, + dbReader, err := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, attrs.TPid, self.Config.GeneralCfg().DefaultTimezone, self.CacheS, self.SchedulerS) + if err != nil { + return utils.NewErrServerError(err) + } if err := dbReader.LoadSharedGroupsFiltered(attrs.SharedGroupId, true); err != nil { return utils.NewErrServerError(err) } @@ -285,9 +297,12 @@ func (self *ApierV1) LoadTariffPlanFromStorDb(attrs AttrLoadTpFromStorDb, reply if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, + dbReader, err := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, attrs.TPid, self.Config.GeneralCfg().DefaultTimezone, self.CacheS, self.SchedulerS) + if err != nil { + return utils.NewErrServerError(err) + } if err := dbReader.LoadAll(); err != nil { return utils.NewErrServerError(err) } @@ -766,9 +781,12 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, + dbReader, err := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, attrs.TPid, self.Config.GeneralCfg().DefaultTimezone, self.CacheS, self.SchedulerS) + if err != nil { + return utils.NewErrServerError(err) + } if _, err := guardian.Guardian.Guard(func() (interface{}, error) { return 0, dbReader.LoadAccountActionsFiltered(&attrs) }, config.CgrConfig().GeneralCfg().LockingTimeout, attrs.LoadId); err != nil { @@ -800,9 +818,12 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, } // create the TpReader - loader := engine.NewTpReader(self.DataManager.DataDB(), + loader, err := engine.NewTpReader(self.DataManager.DataDB(), engine.NewFileCSVStorage(utils.CSV_SEP, attrs.FolderPath, attrs.Recursive), "", self.Config.GeneralCfg().DefaultTimezone, self.CacheS, self.SchedulerS) + if err != nil { + return utils.NewErrServerError(err) + } //Load the data if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) @@ -854,9 +875,12 @@ func (self *ApierV1) RemoveTPFromFolder(attrs utils.AttrLoadTpFromFolder, reply } // create the TpReader - loader := engine.NewTpReader(self.DataManager.DataDB(), + loader, err := engine.NewTpReader(self.DataManager.DataDB(), engine.NewFileCSVStorage(utils.CSV_SEP, attrs.FolderPath, attrs.Recursive), "", self.Config.GeneralCfg().DefaultTimezone, self.CacheS, self.SchedulerS) + if err != nil { + return utils.NewErrServerError(err) + } //Load the data if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) @@ -896,9 +920,12 @@ func (self *ApierV1) RemoveTPFromStorDB(attrs AttrLoadTpFromStorDb, reply *strin if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, + dbReader, err := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, attrs.TPid, self.Config.GeneralCfg().DefaultTimezone, self.CacheS, self.SchedulerS) + if err != nil { + return utils.NewErrServerError(err) + } if err := dbReader.LoadAll(); err != nil { return utils.NewErrServerError(err) } diff --git a/apier/v1/debit_test.go b/apier/v1/debit_test.go index d7be1366b..4016112b3 100644 --- a/apier/v1/debit_test.go +++ b/apier/v1/debit_test.go @@ -38,7 +38,7 @@ func init() { apierDebitStorage, _ = engine.NewMapStorage() cfg, _ := config.NewDefaultCGRConfig() responder := &engine.Responder{MaxComputedUsage: cfg.RalsCfg().MaxComputedUsage} - dm = engine.NewDataManager(apierDebitStorage, config.CgrConfig().CacheCfg()) + dm = engine.NewDataManager(apierDebitStorage, config.CgrConfig().CacheCfg(), nil, nil) engine.SetDataStorage(dm) apierDebit = &ApierV1{ DataManager: dm, diff --git a/apier/v2/apier.go b/apier/v2/apier.go index a1dd96fef..eb6e6cc82 100644 --- a/apier/v2/apier.go +++ b/apier/v2/apier.go @@ -55,9 +55,12 @@ func (self *ApierV2) LoadRatingProfile(attrs AttrLoadRatingProfile, reply *strin return utils.NewErrMandatoryIeMissing("TPid") } tpRpf := &utils.TPRatingProfile{TPid: attrs.TPid} - dbReader := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, + dbReader, err := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, attrs.TPid, self.Config.GeneralCfg().DefaultTimezone, self.CacheS, self.SchedulerS) + if err != nil { + return utils.NewErrServerError(err) + } if err := dbReader.LoadRatingProfilesFiltered(tpRpf); err != nil { return utils.NewErrServerError(err) } @@ -75,9 +78,12 @@ func (self *ApierV2) LoadAccountActions(attrs AttrLoadAccountActions, reply *str if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, + dbReader, err := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb, attrs.TPid, self.Config.GeneralCfg().DefaultTimezone, self.CacheS, self.SchedulerS) + if err != nil { + return utils.NewErrServerError(err) + } tpAa := &utils.TPAccountActions{TPid: attrs.TPid} tpAa.SetAccountActionsId(attrs.AccountActionsId) if _, err := guardian.Guardian.Guard(func() (interface{}, error) { @@ -105,9 +111,12 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, } else if !fi.IsDir() { return utils.ErrInvalidPath } - loader := engine.NewTpReader(self.DataManager.DataDB(), + loader, err := engine.NewTpReader(self.DataManager.DataDB(), engine.NewFileCSVStorage(utils.CSV_SEP, attrs.FolderPath, attrs.Recursive), "", self.Config.GeneralCfg().DefaultTimezone, self.CacheS, self.SchedulerS) + if err != nil { + return utils.NewErrServerError(err) + } if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) } diff --git a/apier/v2/apierv2_it_test.go b/apier/v2/apierv2_it_test.go index 01d2d7ff8..e9dd2d878 100644 --- a/apier/v2/apierv2_it_test.go +++ b/apier/v2/apierv2_it_test.go @@ -75,7 +75,7 @@ func TestApierV2itConnectDataDB(t *testing.T) { utils.REDIS_MAX_CONNS, ""); err != nil { t.Fatal("Could not connect to Redis", err.Error()) } else { - dm = engine.NewDataManager(rdsITdb, config.CgrConfig().CacheCfg()) + dm = engine.NewDataManager(rdsITdb, config.CgrConfig().CacheCfg(), nil, nil) } } diff --git a/cdrc/csv_test.go b/cdrc/csv_test.go index b1adf7b20..9a34a93da 100644 --- a/cdrc/csv_test.go +++ b/cdrc/csv_test.go @@ -75,7 +75,7 @@ func TestCsvDataMultiplyFactor(t *testing.T) { cgrConfig, _ := config.NewDefaultCGRConfig() cdrcConfig := cgrConfig.CdrcProfiles["/var/spool/cgrates/cdrc/in"][0] data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, cgrConfig.CacheCfg()) + dm := engine.NewDataManager(data, cgrConfig.CacheCfg(), nil, nil) filterS := engine.NewFilterS(cgrConfig, nil, nil, nil, dm) cdrcConfig.CdrSourceId = "TEST_CDRC" cdrcConfig.ContentFields = []*config.FCTemplate{ @@ -160,7 +160,7 @@ func TestCsvSecondUsage(t *testing.T) { cgrConfig, _ := config.NewDefaultCGRConfig() cdrcConfig := cgrConfig.CdrcProfiles["/var/spool/cgrates/cdrc/in"][0] data, _ := engine.NewMapStorage() - dm := engine.NewDataManager(data, cgrConfig.CacheCfg()) + dm := engine.NewDataManager(data, cgrConfig.CacheCfg(), nil, nil) filterS := engine.NewFilterS(cgrConfig, nil, nil, nil, dm) cdrcConfig.CdrSourceId = "TEST_CDRC" cdrcConfig.ContentFields = []*config.FCTemplate{ diff --git a/cdrc/xml_test.go b/cdrc/xml_test.go index a7dc06fb6..7d3dd78b1 100644 --- a/cdrc/xml_test.go +++ b/cdrc/xml_test.go @@ -321,7 +321,7 @@ func TestXMLRPProcessWithNewFilters(t *testing.T) { } xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft), utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC", true, - cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, nil, engine.NewDataManager(data, defaultCfg.CacheCfg()))) + cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, nil, engine.NewDataManager(data, defaultCfg.CacheCfg(), nil, nil))) if err != nil { t.Error(err) } @@ -588,7 +588,7 @@ func TestXMLRPNestingSeparator(t *testing.T) { } xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(xmlContent), utils.HierarchyPath([]string{"File", "CDRs", "Call"}), "UTC", true, - cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, nil, engine.NewDataManager(data, defaultCfg.CacheCfg()))) + cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, nil, engine.NewDataManager(data, defaultCfg.CacheCfg(), nil, nil))) if err != nil { t.Error(err) } diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 075626546..87015e78a 100755 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -239,7 +239,34 @@ func main() { if err != nil { log.Fatalf("Coud not open dataDB connection: %s", err.Error()) } - dm = engine.NewDataManager(d, config.CgrConfig().CacheCfg()) + var rmtDBConns, rplDBConns []engine.DataDB + if len(ldrCfg.DataDbCfg().RmtDataDBCfgs) != 0 { + rmtDBConns = make([]engine.DataDB, len(ldrCfg.DataDbCfg().RmtDataDBCfgs)) + for i, dbCfg := range ldrCfg.DataDbCfg().RmtDataDBCfgs { + rmtDBConns[i], err = engine.NewDataDBConn(dbCfg.DataDbType, + dbCfg.DataDbHost, dbCfg.DataDbPort, + dbCfg.DataDbName, dbCfg.DataDbUser, + dbCfg.DataDbPass, ldrCfg.GeneralCfg().DBDataEncoding, + dbCfg.DataDbSentinelName) + if err != nil { + log.Fatalf("Coud not open dataDB connection: %s", err.Error()) + } + } + } + if len(ldrCfg.DataDbCfg().RplDataDBCfgs) != 0 { + rplDBConns = make([]engine.DataDB, len(ldrCfg.DataDbCfg().RplDataDBCfgs)) + for i, dbCfg := range ldrCfg.DataDbCfg().RplDataDBCfgs { + rplDBConns[i], err = engine.NewDataDBConn(dbCfg.DataDbType, + dbCfg.DataDbHost, dbCfg.DataDbPort, + dbCfg.DataDbName, dbCfg.DataDbUser, + dbCfg.DataDbPass, ldrCfg.GeneralCfg().DBDataEncoding, + dbCfg.DataDbSentinelName) + if err != nil { + log.Fatalf("Coud not open dataDB connection: %s", err.Error()) + } + } + } + dm = engine.NewDataManager(d, config.CgrConfig().CacheCfg(), rmtDBConns, rplDBConns) defer dm.DataDB().Close() } @@ -321,9 +348,11 @@ func main() { } } - tpReader := engine.NewTpReader(dm.DataDB(), loader, ldrCfg.LoaderCgrCfg().TpID, + tpReader, err := engine.NewTpReader(dm.DataDB(), loader, ldrCfg.LoaderCgrCfg().TpID, ldrCfg.GeneralCfg().DefaultTimezone, cacheS, schedulerS) - + if err != nil { + log.Fatal(err) + } if err = tpReader.LoadAll(); err != nil { log.Fatal(err) } diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index 3d19d288c..d345b42b1 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -79,7 +79,7 @@ func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) { if err != nil { return nilDuration, fmt.Errorf("Could not connect to data database: %s", err.Error()) } - dm := engine.NewDataManager(dbConn, cgrConfig.CacheCfg()) // for the momentn we use here "" for sentinelName + dm := engine.NewDataManager(dbConn, cgrConfig.CacheCfg(), nil, nil) // for the momentn we use here "" for sentinelName defer dm.DataDB().Close() engine.SetDataStorage(dm) if err := dm.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, diff --git a/engine/attributes_test.go b/engine/attributes_test.go index 420daad85..f9733dc00 100644 --- a/engine/attributes_test.go +++ b/engine/attributes_test.go @@ -154,7 +154,7 @@ func TestAttributePopulateAttrService(t *testing.T) { t.Error(err) } data := NewInternalDB(nil, nil) - dmAtr = NewDataManager(data, config.CgrConfig().CacheCfg()) + dmAtr = NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) defaultCfg, err := config.NewDefaultCGRConfig() defaultCfg.AttributeSCfg().ProcessRuns = 1 defaultCfg.AttributeSCfg().StringIndexedFields = nil @@ -1701,7 +1701,7 @@ func TestAttributeProcessEventValueExponent(t *testing.T) { func BenchmarkAttributeProcessEventConstant(b *testing.B) { data := NewInternalDB(nil, nil) - dmAtr = NewDataManager(data, config.CgrConfig().CacheCfg()) + dmAtr = NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) defaultCfg, err := config.NewDefaultCGRConfig() defaultCfg.AttributeSCfg().ProcessRuns = 1 if err != nil { @@ -1765,7 +1765,7 @@ func BenchmarkAttributeProcessEventConstant(b *testing.B) { func BenchmarkAttributeProcessEventVariable(b *testing.B) { data := NewInternalDB(nil, nil) - dmAtr = NewDataManager(data, config.CgrConfig().CacheCfg()) + dmAtr = NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) defaultCfg, err := config.NewDefaultCGRConfig() defaultCfg.AttributeSCfg().ProcessRuns = 1 if err != nil { diff --git a/engine/calldesc.go b/engine/calldesc.go index 61c677bac..25a7ed92f 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -49,7 +49,7 @@ func init() { } data, _ = NewMapStorage() } - dm = NewDataManager(data, config.CgrConfig().CacheCfg()) + dm = NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) } var ( diff --git a/engine/cdr_test.go b/engine/cdr_test.go index 9c030b924..27413ccc9 100644 --- a/engine/cdr_test.go +++ b/engine/cdr_test.go @@ -663,7 +663,7 @@ func TestCDRAsExportRecord(t *testing.T) { t.Errorf("Expecting:\n%s\nReceived:\n%s", "1", expRecord[0]) } data, _ := NewMapStorage() - dmForCDR := NewDataManager(data, config.CgrConfig().CacheCfg()) + dmForCDR := NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) defaultCfg, err := config.NewDefaultCGRConfig() if err != nil { t.Errorf("Error: %+v", err) diff --git a/engine/chargers_test.go b/engine/chargers_test.go index 306f5cc32..d9975b327 100755 --- a/engine/chargers_test.go +++ b/engine/chargers_test.go @@ -102,7 +102,7 @@ var ( func TestChargerPopulateChargerService(t *testing.T) { data := NewInternalDB(nil, nil) - dmCharger = NewDataManager(data, config.CgrConfig().CacheCfg()) + dmCharger = NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) defaultCfg, err := config.NewDefaultCGRConfig() if err != nil { t.Errorf("Error: %+v", err) diff --git a/engine/datamanager.go b/engine/datamanager.go index d8d42d4ef..975aee9f1 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -90,10 +90,12 @@ var ( ) // NewDataManager returns a new DataManager -func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg) *DataManager { +func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg, rmtDataDBs, rplDataDBs []DataDB) *DataManager { return &DataManager{ - dataDB: dataDB, - cacheCfg: cacheCfg, + dataDB: dataDB, + cacheCfg: cacheCfg, + rmtDataDBs: rmtDataDBs, + rplDataDBs: rplDataDBs, } } diff --git a/engine/datamanager_it_test.go b/engine/datamanager_it_test.go index bbb46f7ea..c8a1bab45 100644 --- a/engine/datamanager_it_test.go +++ b/engine/datamanager_it_test.go @@ -48,7 +48,7 @@ func TestDMitRedis(t *testing.T) { if err != nil { t.Fatal("Could not connect to Redis", err.Error()) } - dm2 = NewDataManager(dataDB, config.CgrConfig().CacheCfg()) + dm2 = NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil, nil) for _, stest := range sTestsDMit { t.Run("TestDMitRedis", stest) } @@ -67,7 +67,7 @@ func TestDMitMongo(t *testing.T) { if err != nil { t.Fatal("Could not connect to Mongo", err.Error()) } - dm2 = NewDataManager(dataDB, config.CgrConfig().CacheCfg()) + dm2 = NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil, nil) for _, stest := range sTestsDMit { t.Run("TestDMitMongo", stest) } diff --git a/engine/filterhelpers_test.go b/engine/filterhelpers_test.go index 40e8f48b8..bee0f7a89 100644 --- a/engine/filterhelpers_test.go +++ b/engine/filterhelpers_test.go @@ -37,7 +37,7 @@ func TestFilterMatchingItemIDsForEvent(t *testing.T) { stringFilterID := "stringFilterID" prefixFilterID := "prefixFilterID" data := NewInternalDB(nil, nil) - dmMatch = NewDataManager(data, config.CgrConfig().CacheCfg()) + dmMatch = NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) context := utils.MetaRating x, err := NewFilterRule(utils.MetaString, "~Field", []string{"profile"}) if err != nil { diff --git a/engine/filterindexer_it_test.go b/engine/filterindexer_it_test.go index c87dcae1d..3cc0d8227 100644 --- a/engine/filterindexer_it_test.go +++ b/engine/filterindexer_it_test.go @@ -73,7 +73,7 @@ func TestFilterIndexerITRedis(t *testing.T) { t.Fatal("Could not connect to Redis", err.Error()) } cfgDBName = cfg.DataDbCfg().DataDbName - dataManager = NewDataManager(redisDB, config.CgrConfig().CacheCfg()) + dataManager = NewDataManager(redisDB, config.CgrConfig().CacheCfg(), nil, nil) for _, stest := range sTests { t.Run("TestITRedis", stest) } @@ -93,14 +93,14 @@ func TestFilterIndexerITMongo(t *testing.T) { t.Fatal(err) } cfgDBName = mgoITCfg.StorDbCfg().Name - dataManager = NewDataManager(mongoDB, config.CgrConfig().CacheCfg()) + dataManager = NewDataManager(mongoDB, config.CgrConfig().CacheCfg(), nil, nil) for _, stest := range sTests { t.Run("TestITMongo", stest) } } func TestFilterIndexerITInternal(t *testing.T) { - dataManager = NewDataManager(NewInternalDB(nil, nil), config.CgrConfig().CacheCfg()) + dataManager = NewDataManager(NewInternalDB(nil, nil), config.CgrConfig().CacheCfg(), nil, nil) for _, stest := range sTests { t.Run("TestITInternal", stest) } diff --git a/engine/filters_test.go b/engine/filters_test.go index 4522f5216..08dcb25a0 100644 --- a/engine/filters_test.go +++ b/engine/filters_test.go @@ -605,7 +605,7 @@ func TestFilterNewRequestFilter(t *testing.T) { func TestInlineFilterPassFiltersForEvent(t *testing.T) { data := NewInternalDB(nil, nil) - dmFilterPass := NewDataManager(data, config.CgrConfig().CacheCfg()) + dmFilterPass := NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := FilterS{ cfg: cfg, @@ -793,7 +793,7 @@ func TestInlineFilterPassFiltersForEvent(t *testing.T) { func TestPassFiltersForEventWithEmptyFilter(t *testing.T) { data := NewInternalDB(nil, nil) - dmFilterPass := NewDataManager(data, config.CgrConfig().CacheCfg()) + dmFilterPass := NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := FilterS{ cfg: cfg, @@ -883,7 +883,7 @@ func TestPassFiltersForEventWithEmptyFilter(t *testing.T) { func TestPassFilterMaxCost(t *testing.T) { data := NewInternalDB(nil, nil) - dmFilterPass := NewDataManager(data, config.CgrConfig().CacheCfg()) + dmFilterPass := NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := FilterS{ cfg: cfg, @@ -929,7 +929,7 @@ func TestPassFilterMaxCost(t *testing.T) { func TestPassFilterMissingField(t *testing.T) { data := NewInternalDB(nil, nil) - dmFilterPass := NewDataManager(data, config.CgrConfig().CacheCfg()) + dmFilterPass := NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) cfg, _ := config.NewDefaultCGRConfig() filterS := FilterS{ cfg: cfg, diff --git a/engine/libtest.go b/engine/libtest.go index cf537597f..30b24ae57 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -292,7 +292,34 @@ func InitDataDb(cfg *config.CGRConfig) error { if err != nil { return err } - dm := NewDataManager(d, cfg.CacheCfg()) + var rmtDBConns, rplDBConns []DataDB + if len(cfg.DataDbCfg().RmtDataDBCfgs) != 0 { + rmtDBConns = make([]DataDB, len(cfg.DataDbCfg().RmtDataDBCfgs)) + for i, dbCfg := range cfg.DataDbCfg().RmtDataDBCfgs { + rmtDBConns[i], err = NewDataDBConn(dbCfg.DataDbType, + dbCfg.DataDbHost, dbCfg.DataDbPort, + dbCfg.DataDbName, dbCfg.DataDbUser, + dbCfg.DataDbPass, cfg.GeneralCfg().DBDataEncoding, + dbCfg.DataDbSentinelName) + if err != nil { + return err + } + } + } + if len(cfg.DataDbCfg().RplDataDBCfgs) != 0 { + rplDBConns = make([]DataDB, len(cfg.DataDbCfg().RplDataDBCfgs)) + for i, dbCfg := range cfg.DataDbCfg().RplDataDBCfgs { + rplDBConns[i], err = NewDataDBConn(dbCfg.DataDbType, + dbCfg.DataDbHost, dbCfg.DataDbPort, + dbCfg.DataDbName, dbCfg.DataDbUser, + dbCfg.DataDbPass, cfg.GeneralCfg().DBDataEncoding, + dbCfg.DataDbSentinelName) + if err != nil { + return err + } + } + } + dm := NewDataManager(d, cfg.CacheCfg(), rmtDBConns, rplDBConns) if err := dm.DataDB().Flush(""); err != nil { return err @@ -377,7 +404,10 @@ func StopStartEngine(cfgPath string, waitEngine int) (*exec.Cmd, error) { func LoadTariffPlanFromFolder(tpPath, timezone string, dm *DataManager, disable_reverse bool, cacheS rpcclient.RpcClientConnection, schedulerS rpcclient.RpcClientConnection) error { - loader := NewTpReader(dm.dataDB, NewFileCSVStorage(utils.CSV_SEP, tpPath, false), "", timezone, cacheS, schedulerS) + loader, err := NewTpReader(dm.dataDB, NewFileCSVStorage(utils.CSV_SEP, tpPath, false), "", timezone, cacheS, schedulerS) + if err != nil { + return utils.NewErrServerError(err) + } if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) } diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index d3d382c85..6f5854776 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -35,11 +35,14 @@ var ( var csvr *TpReader func init() { - csvr = NewTpReader(dm.dataDB, NewStringCSVStorage(',', DestinationsCSVContent, TimingsCSVContent, RatesCSVContent, DestinationRatesCSVContent, + var err error + csvr, err = NewTpReader(dm.dataDB, NewStringCSVStorage(',', DestinationsCSVContent, TimingsCSVContent, RatesCSVContent, DestinationRatesCSVContent, RatingPlansCSVContent, RatingProfilesCSVContent, SharedGroupsCSVContent, ActionsCSVContent, ActionPlansCSVContent, ActionTriggersCSVContent, AccountActionsCSVContent, ResourcesCSVContent, StatsCSVContent, ThresholdsCSVContent, FiltersCSVContent, SuppliersCSVContent, AttributesCSVContent, ChargersCSVContent, DispatcherCSVContent, DispatcherHostCSVContent), testTPID, "", nil, nil) - + if err != nil { + log.Print("error when creating TpReader:", err) + } if err := csvr.LoadDestinations(); err != nil { log.Print("error in LoadDestinations:", err) } diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index 27cf0ac2b..251d77d8b 100644 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -50,7 +50,7 @@ func TestLoaderITConnDataDbs(t *testing.T) { if err != nil { t.Fatal("Error on dataDb connection: ", err.Error()) } - dataDbCsv = NewDataManager(dbConn, nil) + dataDbCsv = NewDataManager(dbConn, nil, nil, nil) dbConn, err = NewDataDBConn(lCfg.DataDbCfg().DataDbType, lCfg.DataDbCfg().DataDbHost, lCfg.DataDbCfg().DataDbPort, "8", lCfg.DataDbCfg().DataDbUser, lCfg.DataDbCfg().DataDbPass, @@ -58,7 +58,7 @@ func TestLoaderITConnDataDbs(t *testing.T) { if err != nil { t.Fatal("Error on dataDb connection: ", err.Error()) } - dataDbStor = NewDataManager(dbConn, nil) + dataDbStor = NewDataManager(dbConn, nil, nil, nil) dbConn, err = NewDataDBConn(lCfg.DataDbCfg().DataDbType, lCfg.DataDbCfg().DataDbHost, lCfg.DataDbCfg().DataDbPort, "9", lCfg.DataDbCfg().DataDbUser, lCfg.DataDbCfg().DataDbPass, @@ -66,7 +66,7 @@ func TestLoaderITConnDataDbs(t *testing.T) { if err != nil { t.Fatal("Error on dataDb connection: ", err.Error()) } - dataDbApier = NewDataManager(dbConn, nil) + dataDbApier = NewDataManager(dbConn, nil, nil, nil) for _, db := range []Storage{dataDbCsv.DataDB(), dataDbStor.DataDB(), dataDbApier.DataDB(), dataDbCsv.DataDB(), dataDbStor.DataDB(), dataDbApier.DataDB()} { if err = db.Flush(""); err != nil { @@ -95,14 +95,16 @@ func TestLoaderITCreateStorTpTables(t *testing.T) { // Loads data from csv files in tp scenario to dataDbCsv func TestLoaderITRemoveLoad(t *testing.T) { - /*var err error - for fn, v := range FileValidators { + var err error + /*for fn, v := range FileValidators { if err = ValidateCSVData(path.Join(*dataDir, "tariffplans", *tpCsvScenario, fn), v.Rule); err != nil { t.Error("Failed validating data: ", err.Error()) } }*/ - loader = NewTpReader(dataDbCsv.DataDB(), NewFileCSVStorage(utils.CSV_SEP, path.Join(*dataDir, "tariffplans", *tpCsvScenario), false), "", "", nil, nil) - + loader, err = NewTpReader(dataDbCsv.DataDB(), NewFileCSVStorage(utils.CSV_SEP, path.Join(*dataDir, "tariffplans", *tpCsvScenario), false), "", "", nil, nil) + if err != nil { + t.Error(err) + } if err = loader.LoadDestinations(); err != nil { t.Error("Failed loading destinations: ", err.Error()) } @@ -170,14 +172,16 @@ func TestLoaderITRemoveLoad(t *testing.T) { // Loads data from csv files in tp scenario to dataDbCsv func TestLoaderITLoadFromCSV(t *testing.T) { - /*var err error - for fn, v := range FileValidators { + var err error + /*for fn, v := range FileValidators { if err = ValidateCSVData(path.Join(*dataDir, "tariffplans", *tpCsvScenario, fn), v.Rule); err != nil { t.Error("Failed validating data: ", err.Error()) } }*/ - loader = NewTpReader(dataDbCsv.DataDB(), NewFileCSVStorage(utils.CSV_SEP, path.Join(*dataDir, "tariffplans", *tpCsvScenario), false), "", "", nil, nil) - + loader, err = NewTpReader(dataDbCsv.DataDB(), NewFileCSVStorage(utils.CSV_SEP, path.Join(*dataDir, "tariffplans", *tpCsvScenario), false), "", "", nil, nil) + if err != nil { + t.Error(err) + } if err = loader.LoadDestinations(); err != nil { t.Error("Failed loading destinations: ", err.Error()) } @@ -478,7 +482,7 @@ func TestLoaderITImportToStorDb(t *testing.T) { // Loads data from storDb into dataDb func TestLoaderITLoadFromStorDb(t *testing.T) { - loader := NewTpReader(dataDbStor.DataDB(), storDb, utils.TEST_SQL, "", nil, nil) + loader, _ := NewTpReader(dataDbStor.DataDB(), storDb, utils.TEST_SQL, "", nil, nil) if err := loader.LoadDestinations(); err != nil && err.Error() != utils.NotFoundCaps { t.Error("Failed loading destinations: ", err.Error()) } @@ -512,7 +516,7 @@ func TestLoaderITLoadFromStorDb(t *testing.T) { } func TestLoaderITLoadIndividualProfiles(t *testing.T) { - loader := NewTpReader(dataDbApier.DataDB(), storDb, utils.TEST_SQL, "", nil, nil) + loader, _ := NewTpReader(dataDbApier.DataDB(), storDb, utils.TEST_SQL, "", nil, nil) // Load ratingPlans. This will also set destination keys if rps, err := storDb.GetTPRatingPlans(utils.TEST_SQL, "", nil); err != nil { t.Fatal("Could not retrieve rating plans") diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 125273128..d3edb2d23 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -92,7 +92,7 @@ func TestOnStorITRedis(t *testing.T) { t.Fatal("Could not connect to Redis", err.Error()) } onStorCfg = cfg.DataDbCfg().DataDbName - onStor = NewDataManager(rdsITdb, config.CgrConfig().CacheCfg()) + onStor = NewDataManager(rdsITdb, config.CgrConfig().CacheCfg(), nil, nil) for _, stest := range sTestsOnStorIT { t.Run("TestOnStorITRedis", stest) } @@ -112,7 +112,7 @@ func TestOnStorITMongo(t *testing.T) { t.Fatal(err) } onStorCfg = mgoITCfg.StorDbCfg().Name - onStor = NewDataManager(mgoITdb, config.CgrConfig().CacheCfg()) + onStor = NewDataManager(mgoITdb, config.CgrConfig().CacheCfg(), nil, nil) for _, stest := range sTestsOnStorIT { t.Run("TestOnStorITMongo", stest) } @@ -120,7 +120,7 @@ func TestOnStorITMongo(t *testing.T) { func TestOnStorITInternal(t *testing.T) { sleepDelay = 10 * time.Millisecond - onStor = NewDataManager(NewInternalDB(nil, nil), config.CgrConfig().CacheCfg()) + onStor = NewDataManager(NewInternalDB(nil, nil), config.CgrConfig().CacheCfg(), nil, nil) for _, stest := range sTestsOnStorIT { t.Run("TestOnStorITInternal", stest) } diff --git a/engine/resources_test.go b/engine/resources_test.go index d62f091fa..6b699b304 100644 --- a/engine/resources_test.go +++ b/engine/resources_test.go @@ -364,7 +364,7 @@ func TestRSCacheSetGet(t *testing.T) { func TestResourcePopulateResourceService(t *testing.T) { data := NewInternalDB(nil, nil) - dmRES = NewDataManager(data, config.CgrConfig().CacheCfg()) + dmRES = NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) defaultCfg, err := config.NewDefaultCGRConfig() if err != nil { t.Errorf("Error: %+v", err) @@ -726,7 +726,7 @@ func TestResourceCaching(t *testing.T) { Cache.Clear(nil) // start fresh with new dataManager data := NewInternalDB(nil, nil) - dmRES = NewDataManager(data, config.CgrConfig().CacheCfg()) + dmRES = NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) defaultCfg, err := config.NewDefaultCGRConfig() if err != nil { t.Errorf("Error: %+v", err) diff --git a/engine/stats_test.go b/engine/stats_test.go index 948fa7826..4c87a2a64 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -143,7 +143,7 @@ var ( func TestStatQueuesPopulateService(t *testing.T) { data := NewInternalDB(nil, nil) - dmSTS = NewDataManager(data, config.CgrConfig().CacheCfg()) + dmSTS = NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) defaultCfg, err := config.NewDefaultCGRConfig() if err != nil { t.Errorf("Error: %+v", err) diff --git a/engine/suppliers_test.go b/engine/suppliers_test.go index 2e943af72..43ca9a10a 100644 --- a/engine/suppliers_test.go +++ b/engine/suppliers_test.go @@ -294,7 +294,7 @@ func TestSuppliersPopulateSupplierService(t *testing.T) { t.Error(err) } data := NewInternalDB(nil, nil) - dmSPP = NewDataManager(data, config.CgrConfig().CacheCfg()) + dmSPP = NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) defaultCfg, err := config.NewDefaultCGRConfig() if err != nil { t.Errorf("Error: %+v", err) diff --git a/engine/thresholds_test.go b/engine/thresholds_test.go index 0b54d6ee1..dafe00ff3 100644 --- a/engine/thresholds_test.go +++ b/engine/thresholds_test.go @@ -143,7 +143,7 @@ func TestThresholdsSort(t *testing.T) { func TestThresholdsPopulateThresholdService(t *testing.T) { data := NewInternalDB(nil, nil) - dmTH = NewDataManager(data, config.CgrConfig().CacheCfg()) + dmTH = NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) defaultCfg, err := config.NewDefaultCGRConfig() if err != nil { t.Errorf("Error: %+v", err) diff --git a/engine/tpreader.go b/engine/tpreader.go index 03db2da89..d0715d8c3 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -67,11 +67,39 @@ type TpReader struct { } func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string, - cacheS rpcclient.RpcClientConnection, schedulerS rpcclient.RpcClientConnection) *TpReader { + cacheS rpcclient.RpcClientConnection, schedulerS rpcclient.RpcClientConnection) (*TpReader, error) { + var err error + var rmtDBConns, rplDBConns []DataDB + if len(config.CgrConfig().DataDbCfg().RmtDataDBCfgs) != 0 { + rmtDBConns = make([]DataDB, len(config.CgrConfig().DataDbCfg().RmtDataDBCfgs)) + for i, dbCfg := range config.CgrConfig().DataDbCfg().RmtDataDBCfgs { + rmtDBConns[i], err = NewDataDBConn(dbCfg.DataDbType, + dbCfg.DataDbHost, dbCfg.DataDbPort, + dbCfg.DataDbName, dbCfg.DataDbUser, + dbCfg.DataDbPass, config.CgrConfig().GeneralCfg().DBDataEncoding, + dbCfg.DataDbSentinelName) + if err != nil { + return nil, err + } + } + } + if len(config.CgrConfig().DataDbCfg().RplDataDBCfgs) != 0 { + rplDBConns = make([]DataDB, len(config.CgrConfig().DataDbCfg().RplDataDBCfgs)) + for i, dbCfg := range config.CgrConfig().DataDbCfg().RplDataDBCfgs { + rplDBConns[i], err = NewDataDBConn(dbCfg.DataDbType, + dbCfg.DataDbHost, dbCfg.DataDbPort, + dbCfg.DataDbName, dbCfg.DataDbUser, + dbCfg.DataDbPass, config.CgrConfig().GeneralCfg().DBDataEncoding, + dbCfg.DataDbSentinelName) + if err != nil { + return nil, err + } + } + } tpr := &TpReader{ tpid: tpid, timezone: timezone, - dm: NewDataManager(db, config.CgrConfig().CacheCfg()), // ToDo: add ChacheCfg as parameter to the NewTpReader + dm: NewDataManager(db, config.CgrConfig().CacheCfg(), rmtDBConns, rplDBConns), // ToDo: add ChacheCfg as parameter to the NewTpReader lr: lr, cacheS: cacheS, schedulerS: schedulerS, @@ -114,7 +142,7 @@ func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string, StartTime: utils.MetaHourly, EndTime: "", } - return tpr + return tpr, nil } func (tpr *TpReader) Init() { diff --git a/engine/versions_it_test.go b/engine/versions_it_test.go index 76eb265e7..c2eafb2b4 100644 --- a/engine/versions_it_test.go +++ b/engine/versions_it_test.go @@ -53,7 +53,7 @@ func TestVersionsITMongo(t *testing.T) { if err != nil { log.Fatal(err) } - dm3 = NewDataManager(dbConn, cfg.CacheCfg()) + dm3 = NewDataManager(dbConn, cfg.CacheCfg(), nil, nil) storageDb, err = NewStorDBConn(cfg.StorDbCfg().Type, cfg.StorDbCfg().Host, cfg.StorDbCfg().Port, cfg.StorDbCfg().Name, cfg.StorDbCfg().User, @@ -83,7 +83,7 @@ func TestVersionsITRedisMYSQL(t *testing.T) { if err != nil { log.Fatal(err) } - dm3 = NewDataManager(dbConn, cfg.CacheCfg()) + dm3 = NewDataManager(dbConn, cfg.CacheCfg(), nil, nil) storageDb, err = NewStorDBConn(cfg.StorDbCfg().Type, cfg.StorDbCfg().Host, cfg.StorDbCfg().Port, @@ -114,7 +114,7 @@ func TestVersionsITRedisPostgres(t *testing.T) { if err != nil { log.Fatal(err) } - dm3 = NewDataManager(dbConn, cfg.CacheCfg()) + dm3 = NewDataManager(dbConn, cfg.CacheCfg(), nil, nil) storageDb, err = NewStorDBConn(cfg.StorDbCfg().Type, cfg.StorDbCfg().Host, cfg.StorDbCfg().Port, cfg.StorDbCfg().Name, cfg.StorDbCfg().User, diff --git a/general_tests/acntacts_test.go b/general_tests/acntacts_test.go index 0680cd750..73bff05e0 100644 --- a/general_tests/acntacts_test.go +++ b/general_tests/acntacts_test.go @@ -30,7 +30,7 @@ var dbAcntActs *engine.DataManager func TestAcntActsSetStorage(t *testing.T) { data, _ := engine.NewMapStorageJson() - dbAcntActs = engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dbAcntActs = engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) engine.SetDataStorage(dbAcntActs) } @@ -55,10 +55,13 @@ ENABLE_ACNT,*enable_account,,,,,,,,,,,,,false,false,10` suppliers := `` attrProfiles := `` chargerProfiles := `` - csvr := engine.NewTpReader(dbAcntActs.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, + csvr, err := engine.NewTpReader(dbAcntActs.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans, actionTriggers, accountActions, resLimits, stats, thresholds, filters, suppliers, attrProfiles, chargerProfiles, ``, ""), "", "", nil, nil) + if err != nil { + t.Error(err) + } if err := csvr.LoadAll(); err != nil { t.Fatal(err) } diff --git a/general_tests/auth_test.go b/general_tests/auth_test.go index 6587ae92e..e84c58e88 100644 --- a/general_tests/auth_test.go +++ b/general_tests/auth_test.go @@ -32,7 +32,7 @@ var rsponder *engine.Responder func TestAuthSetStorage(t *testing.T) { config.CgrConfig().CacheCfg()[utils.CacheRatingPlans].Precache = true // precache rating plan data, _ := engine.NewMapStorageJson() - dbAuth = engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dbAuth = engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) engine.SetDataStorage(dbAuth) rsponder = &engine.Responder{ MaxComputedUsage: config.CgrConfig().RalsCfg().MaxComputedUsage} @@ -62,9 +62,12 @@ cgrates.org,call,*any,2013-01-06T00:00:00Z,RP_ANY,` suppliers := `` attrProfiles := `` chargerProfiles := `` - csvr := engine.NewTpReader(dbAuth.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, + csvr, err := engine.NewTpReader(dbAuth.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans, actionTriggers, accountActions, resLimits, stats, thresholds, filters, suppliers, attrProfiles, chargerProfiles, ``, ""), "", "", nil, nil) + if err != nil { + t.Error(err) + } if err := csvr.LoadAll(); err != nil { t.Fatal(err) } diff --git a/general_tests/costs1_test.go b/general_tests/costs1_test.go index 9bbf3ed80..bca3df37e 100644 --- a/general_tests/costs1_test.go +++ b/general_tests/costs1_test.go @@ -29,7 +29,7 @@ func TestCosts1SetStorage(t *testing.T) { config.CgrConfig().CacheCfg()[utils.CacheRatingPlans].Precache = true // precache rating plan data, _ := engine.NewMapStorageJson() - dataDB = engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dataDB = engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) engine.SetDataStorage(dataDB) } @@ -53,9 +53,11 @@ RP_SMS1,DR_SMS_1,ALWAYS,10` ratingProfiles := `cgrates.org,call,*any,2012-01-01T00:00:00Z,RP_RETAIL, cgrates.org,data,*any,2012-01-01T00:00:00Z,RP_DATA1, cgrates.org,sms,*any,2012-01-01T00:00:00Z,RP_SMS1,` - csvr := engine.NewTpReader(dataDB.DataDB(), engine.NewStringCSVStorage(',', dests, timings, rates, destinationRates, ratingPlans, ratingProfiles, + csvr, err := engine.NewTpReader(dataDB.DataDB(), engine.NewStringCSVStorage(',', dests, timings, rates, destinationRates, ratingPlans, ratingProfiles, "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "", nil, nil) - + if err != nil { + t.Error(err) + } if err := csvr.LoadTimings(); err != nil { t.Fatal(err) } diff --git a/general_tests/datachrg1_test.go b/general_tests/datachrg1_test.go index 650d88585..856e3979a 100644 --- a/general_tests/datachrg1_test.go +++ b/general_tests/datachrg1_test.go @@ -28,7 +28,7 @@ import ( func TestSetStorageDtChrg1(t *testing.T) { data, _ := engine.NewMapStorageJson() - dataDB = engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dataDB = engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) engine.SetDataStorage(dataDB) } @@ -42,8 +42,11 @@ DR_DATA_2,*any,RT_DATA_1c,*up,4,0,` ratingPlans := `RP_DATA1,DR_DATA_1,TM1,10 RP_DATA1,DR_DATA_2,TM2,10` ratingProfiles := `cgrates.org,data,*any,2012-01-01T00:00:00Z,RP_DATA1,` - csvr := engine.NewTpReader(dataDB.DataDB(), engine.NewStringCSVStorage(',', "", timings, rates, destinationRates, ratingPlans, ratingProfiles, + csvr, err := engine.NewTpReader(dataDB.DataDB(), engine.NewStringCSVStorage(',', "", timings, rates, destinationRates, ratingPlans, ratingProfiles, "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "", nil, nil) + if err != nil { + t.Error(err) + } if err := csvr.LoadTimings(); err != nil { t.Fatal(err) } diff --git a/general_tests/ddazmbl1_test.go b/general_tests/ddazmbl1_test.go index c4fd33cc8..f46181767 100644 --- a/general_tests/ddazmbl1_test.go +++ b/general_tests/ddazmbl1_test.go @@ -31,7 +31,7 @@ var dataDB *engine.DataManager func TestDZ1SetStorage(t *testing.T) { data, _ := engine.NewMapStorageJson() - dataDB = engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dataDB = engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) engine.SetDataStorage(dataDB) } @@ -62,12 +62,15 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` suppliers := `` attrProfiles := `` chargerProfiles := `` - csvr := engine.NewTpReader(dataDB.DataDB(), + csvr, err := engine.NewTpReader(dataDB.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans, actionTriggers, accountActions, resLimits, stats, thresholds, filters, suppliers, attrProfiles, chargerProfiles, ``, ""), "", "", nil, nil) + if err != nil { + t.Error(err) + } if err := csvr.LoadDestinations(); err != nil { t.Fatal(err) } diff --git a/general_tests/ddazmbl2_test.go b/general_tests/ddazmbl2_test.go index 4aaa68cdd..3045dfa0c 100644 --- a/general_tests/ddazmbl2_test.go +++ b/general_tests/ddazmbl2_test.go @@ -31,7 +31,7 @@ var dataDB2 *engine.DataManager func TestSetStorage2(t *testing.T) { data, _ := engine.NewMapStorageJson() - dataDB2 = engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dataDB2 = engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) engine.SetDataStorage(dataDB2) } @@ -62,10 +62,13 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` suppliers := `` attrProfiles := `` chargerProfiles := `` - csvr := engine.NewTpReader(dataDB2.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, + csvr, err := engine.NewTpReader(dataDB2.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans, actionTriggers, accountActions, resLimits, stats, thresholds, filters, suppliers, attrProfiles, chargerProfiles, ``, ""), "", "", nil, nil) + if err != nil { + t.Error(err) + } if err := csvr.LoadDestinations(); err != nil { t.Fatal(err) } diff --git a/general_tests/ddazmbl3_test.go b/general_tests/ddazmbl3_test.go index 742ae1b57..406071f9c 100644 --- a/general_tests/ddazmbl3_test.go +++ b/general_tests/ddazmbl3_test.go @@ -31,7 +31,7 @@ var dataDB3 *engine.DataManager func TestSetStorage3(t *testing.T) { data, _ := engine.NewMapStorageJson() - dataDB3 = engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dataDB3 = engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) engine.SetDataStorage(dataDB3) } @@ -60,10 +60,13 @@ cgrates.org,call,discounted_minutes,2013-01-06T00:00:00Z,RP_UK_Mobile_BIG5_PKG,` suppliers := `` attrProfiles := `` chargerProfiles := `` - csvr := engine.NewTpReader(dataDB3.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, rates, + csvr, err := engine.NewTpReader(dataDB3.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans, actionTriggers, accountActions, resLimits, stats, thresholds, filters, suppliers, attrProfiles, chargerProfiles, ``, ""), "", "", nil, nil) + if err != nil { + t.Error(err) + } if err := csvr.LoadDestinations(); err != nil { t.Fatal(err) } diff --git a/general_tests/smschrg1_test.go b/general_tests/smschrg1_test.go index 6439cfed8..412e24923 100644 --- a/general_tests/smschrg1_test.go +++ b/general_tests/smschrg1_test.go @@ -29,7 +29,7 @@ import ( func TestSMSSetStorageSmsChrg1(t *testing.T) { config.CgrConfig().CacheCfg()[utils.CacheRatingPlans].Precache = true // precache rating plan data, _ := engine.NewMapStorageJson() - dataDB = engine.NewDataManager(data, config.CgrConfig().CacheCfg()) + dataDB = engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil) engine.SetDataStorage(dataDB) } @@ -39,8 +39,11 @@ func TestSMSLoadCsvTpSmsChrg1(t *testing.T) { destinationRates := `DR_SMS_1,*any,RT_SMS_5c,*up,4,0,` ratingPlans := `RP_SMS1,DR_SMS_1,ALWAYS,10` ratingProfiles := `cgrates.org,sms,*any,2012-01-01T00:00:00Z,RP_SMS1,` - csvr := engine.NewTpReader(dataDB.DataDB(), engine.NewStringCSVStorage(',', "", timings, rates, destinationRates, ratingPlans, ratingProfiles, + csvr, err := engine.NewTpReader(dataDB.DataDB(), engine.NewStringCSVStorage(',', "", timings, rates, destinationRates, ratingPlans, ratingProfiles, "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "", nil, nil) + if err != nil { + t.Error(err) + } if err := csvr.LoadTimings(); err != nil { t.Fatal(err) } diff --git a/loaders/loader_test.go b/loaders/loader_test.go index ff1ad7356..818cc1a09 100644 --- a/loaders/loader_test.go +++ b/loaders/loader_test.go @@ -37,7 +37,7 @@ func TestLoaderProcessContentSingleFile(t *testing.T) { ldr := &Loader{ ldrID: "TestLoaderProcessContent", bufLoaderData: make(map[string][]LoaderData), - dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg()), + dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil), timezone: "UTC", } ldr.dataTpls = map[string][]*config.FCTemplate{ @@ -143,7 +143,7 @@ func TestLoaderProcessContentMultiFiles(t *testing.T) { ldr := &Loader{ ldrID: "TestLoaderProcessContentMultiFiles", bufLoaderData: make(map[string][]LoaderData), - dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg()), + dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil), timezone: "UTC", } ldr.dataTpls = map[string][]*config.FCTemplate{ @@ -221,7 +221,7 @@ func TestLoaderProcessResource(t *testing.T) { ldr := &Loader{ ldrID: "TestLoaderProcessResources", bufLoaderData: make(map[string][]LoaderData), - dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg()), + dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil), timezone: "UTC", } ldr.dataTpls = map[string][]*config.FCTemplate{ @@ -339,7 +339,7 @@ func TestLoaderProcessFilters(t *testing.T) { ldr := &Loader{ ldrID: "TestLoaderProcessFilters", bufLoaderData: make(map[string][]LoaderData), - dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg()), + dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil), timezone: "UTC", } ldr.dataTpls = map[string][]*config.FCTemplate{ @@ -450,7 +450,7 @@ func TestLoaderProcessThresholds(t *testing.T) { ldr := &Loader{ ldrID: "TestLoaderProcessContent", bufLoaderData: make(map[string][]LoaderData), - dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg()), + dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil), timezone: "UTC", } ldr.dataTpls = map[string][]*config.FCTemplate{ @@ -548,7 +548,7 @@ func TestLoaderProcessStats(t *testing.T) { ldr := &Loader{ ldrID: "TestLoaderProcessContent", bufLoaderData: make(map[string][]LoaderData), - dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg()), + dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil), timezone: "UTC", } ldr.dataTpls = map[string][]*config.FCTemplate{ @@ -670,7 +670,7 @@ func TestLoaderProcessSuppliers(t *testing.T) { ldr := &Loader{ ldrID: "TestLoaderProcessContent", bufLoaderData: make(map[string][]LoaderData), - dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg()), + dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil), timezone: "UTC", } ldr.dataTpls = map[string][]*config.FCTemplate{ @@ -797,7 +797,7 @@ func TestLoaderProcessChargers(t *testing.T) { ldr := &Loader{ ldrID: "TestLoaderProcessContent", bufLoaderData: make(map[string][]LoaderData), - dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg()), + dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil), timezone: "UTC", } ldr.dataTpls = map[string][]*config.FCTemplate{ @@ -875,7 +875,7 @@ func TestLoaderProcessDispatches(t *testing.T) { ldr := &Loader{ ldrID: "TestLoaderProcessContent", bufLoaderData: make(map[string][]LoaderData), - dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg()), + dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil), timezone: "UTC", } ldr.dataTpls = map[string][]*config.FCTemplate{ @@ -1027,7 +1027,7 @@ func TestLoaderProcessDispatcheHosts(t *testing.T) { ldr := &Loader{ ldrID: "TestLoaderProcessContent", bufLoaderData: make(map[string][]LoaderData), - dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg()), + dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil), timezone: "UTC", } ldr.dataTpls = map[string][]*config.FCTemplate{ @@ -1115,7 +1115,7 @@ func TestLoaderRemoveContentSingleFile(t *testing.T) { ldr := &Loader{ ldrID: "TestLoaderProcessContent", bufLoaderData: make(map[string][]LoaderData), - dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg()), + dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil, nil), timezone: "UTC", } ldr.dataTpls = map[string][]*config.FCTemplate{ diff --git a/migrator/migrator_utils.go b/migrator/migrator_utils.go index cbdbff2b8..da7b1c854 100644 --- a/migrator/migrator_utils.go +++ b/migrator/migrator_utils.go @@ -35,7 +35,7 @@ func NewMigratorDataDB(db_type, host, port, name, user, pass, marshaler string, if err != nil { return nil, err } - dm := engine.NewDataManager(dbCon, cacheCfg) + dm := engine.NewDataManager(dbCon, cacheCfg, nil, nil) var d MigratorDataDB switch db_type { case utils.REDIS: diff --git a/services/datadb.go b/services/datadb.go index fc7af8aca..d71faaeae 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -20,6 +20,7 @@ package services import ( "fmt" + "log" "sync" "github.com/cgrates/cgrates/config" @@ -33,7 +34,7 @@ func NewDataDBService(cfg *config.CGRConfig) *DataDBService { return &DataDBService{ cfg: cfg, dbchan: make(chan *engine.DataManager, 1), - db: engine.NewDataManager(nil, cfg.CacheCfg()), // to be removed + db: engine.NewDataManager(nil, cfg.CacheCfg(), nil, nil), // to be removed } } @@ -67,7 +68,34 @@ func (db *DataDBService) Start() (err error) { utils.Logger.Warning(fmt.Sprintf("Could not configure dataDb: %s.Some SessionS APIs will not work", err)) return } - db.db = engine.NewDataManager(d, db.cfg.CacheCfg()) + var rmtDBConns, rplDBConns []engine.DataDB + if len(db.cfg.DataDbCfg().RmtDataDBCfgs) != 0 { + rmtDBConns = make([]engine.DataDB, len(db.cfg.DataDbCfg().RmtDataDBCfgs)) + for i, dbCfg := range db.cfg.DataDbCfg().RmtDataDBCfgs { + rmtDBConns[i], err = engine.NewDataDBConn(dbCfg.DataDbType, + dbCfg.DataDbHost, dbCfg.DataDbPort, + dbCfg.DataDbName, dbCfg.DataDbUser, + dbCfg.DataDbPass, db.cfg.GeneralCfg().DBDataEncoding, + dbCfg.DataDbSentinelName) + if err != nil { + log.Fatalf("Coud not open dataDB connection: %s", err.Error()) + } + } + } + if len(db.cfg.DataDbCfg().RplDataDBCfgs) != 0 { + rplDBConns = make([]engine.DataDB, len(db.cfg.DataDbCfg().RplDataDBCfgs)) + for i, dbCfg := range db.cfg.DataDbCfg().RplDataDBCfgs { + rplDBConns[i], err = engine.NewDataDBConn(dbCfg.DataDbType, + dbCfg.DataDbHost, dbCfg.DataDbPort, + dbCfg.DataDbName, dbCfg.DataDbUser, + dbCfg.DataDbPass, db.cfg.GeneralCfg().DBDataEncoding, + dbCfg.DataDbSentinelName) + if err != nil { + log.Fatalf("Coud not open dataDB connection: %s", err.Error()) + } + } + } + db.db = engine.NewDataManager(d, db.cfg.CacheCfg(), rmtDBConns, rplDBConns) engine.SetDataStorage(db.db) if err = engine.CheckVersions(db.db.DataDB()); err != nil { fmt.Println(err) @@ -148,6 +176,7 @@ func (db *DataDBService) GetDM() *engine.DataManager { // needsConnectionReload returns if the DB connection needs to reloaded func (db *DataDBService) needsConnectionReload() bool { + if db.oldDBCfg.DataDbType != db.cfg.DataDbCfg().DataDbType || db.oldDBCfg.DataDbHost != db.cfg.DataDbCfg().DataDbHost || db.oldDBCfg.DataDbName != db.cfg.DataDbCfg().DataDbName || diff --git a/sessions/sessions_test.go b/sessions/sessions_test.go index 2e2e7d211..c9c723cf1 100644 --- a/sessions/sessions_test.go +++ b/sessions/sessions_test.go @@ -1311,7 +1311,7 @@ func TestSessionSNewV1AuthorizeArgsWithArgDispatcher2(t *testing.T) { func TestSessionSGetIndexedFilters(t *testing.T) { sSCfg, _ := config.NewDefaultCGRConfig() mpStr, _ := engine.NewMapStorage() - sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg()), "UTC") + sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil, nil), "UTC") expIndx := map[string][]string{} expUindx := []*engine.FilterRule{ &engine.FilterRule{ @@ -1329,7 +1329,7 @@ func TestSessionSGetIndexedFilters(t *testing.T) { sSCfg.SessionSCfg().SessionIndexes = utils.StringMap{ "ToR": true, } - sS = NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg()), "UTC") + sS = NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil, nil), "UTC") expIndx = map[string][]string{(utils.DynamicDataPrefix + utils.ToR): []string{utils.VOICE}} expUindx = nil if rplyindx, rplyUnindx := sS.getIndexedFilters("", fltrs); !reflect.DeepEqual(expIndx, rplyindx) {