Implement a model for get/set CDRs for InternalDB

This commit is contained in:
TeoV
2019-10-03 19:33:03 +03:00
committed by Dan Christian Bogos
parent f9d8ac49f8
commit cf7a397c4c
5 changed files with 207 additions and 5 deletions

View File

@@ -75,7 +75,7 @@ const CGRATES_CFG_JSON = `
"max_open_conns": 100, // maximum database connections opened, not applying for mongo
"max_idle_conns": 10, // maximum database connections idle, not applying for mongo
"conn_max_lifetime": 0, // maximum amount of time in seconds a connection may be reused (0 for unlimited), not applying for mongo
"cdrs_indexes": [], // indexes on cdrs table to speed up queries, used only in case of mongo
"cdrs_indexes": [], // indexes on cdrs table to speed up queries, used only in case of *mongo and *internal
"query_timeout":"10s",
},

View File

@@ -0,0 +1,8 @@
{
// CGRateS Configuration file used for testing mysql implementation
"stor_db": { // database used to store offline tariff plans and CDRs
"db_type": "internal", // stor database type to use: <mysql|postgres>
},
}

View File

@@ -79,6 +79,22 @@ func TestITCDRsMongo(t *testing.T) {
}
}
func TestITCDRsInternal(t *testing.T) {
cfg, err := config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "storage", "internal"))
if err != nil {
t.Error(err)
}
if err := testGetCDRs(cfg); err != nil {
t.Error(err)
}
if err := testSetCDR(cfg); err != nil {
t.Error(err)
}
if err := testSMCosts(cfg); err != nil {
t.Error(err)
}
}
// helper function to populate CDRs and check if they were stored in storDb
func testSetCDR(cfg *config.CGRConfig) error {
if err := InitStorDb(cfg); err != nil {

View File

@@ -837,16 +837,196 @@ func (iDB *InternalDB) SetTPDispatcherHosts(dpps []*utils.TPDispatcherHost) (err
//implement CdrStorage interface
func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) {
return utils.ErrNotImplemented
if !allowUpdate {
x, ok := iDB.db.Get(utils.CDRsTBL, utils.ConcatenatedKey(utils.CDRsTBL, cdr.CGRID, cdr.RunID, cdr.OriginID))
if ok && x != nil {
return utils.ErrExists
}
}
//idxs := utils.StringMap
// internal indexes
groups := []string{
utils.ConcatenatedKey(utils.CGRID, cdr.CGRID),
utils.ConcatenatedKey(utils.RunID, cdr.RunID),
utils.ConcatenatedKey(utils.OriginHost, cdr.OriginHost),
utils.ConcatenatedKey(utils.Source, cdr.Source),
utils.ConcatenatedKey(utils.OriginID, cdr.OriginID),
utils.ConcatenatedKey(utils.ToR, cdr.ToR),
utils.ConcatenatedKey(utils.RequestType, cdr.RequestType),
utils.ConcatenatedKey(utils.Tenant, cdr.Tenant),
utils.ConcatenatedKey(utils.Category, cdr.Category),
utils.ConcatenatedKey(utils.Account, cdr.Account),
utils.ConcatenatedKey(utils.Subject, cdr.Subject),
utils.ConcatenatedKey(utils.Destination, cdr.Destination), // include the whole Destination
}
// split destination and add it to the groups
dstGroup := make([]string, len(cdr.Destination))
for i := len(cdr.Destination) - 1; i > 0; i-- {
dstGroup[i] = utils.ConcatenatedKey(utils.Destination, cdr.Destination[:i])
}
groups = append(groups, dstGroup...)
iDB.db.Set(utils.CDRsTBL, utils.ConcatenatedKey(utils.CDRsTBL, cdr.CGRID, cdr.RunID, cdr.OriginID), cdr, groups,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
func (iDB *InternalDB) RemoveSMCost(smc *SMCost) (err error) {
return utils.ErrNotImplemented
}
func (iDB *InternalDB) RemoveSMCosts(qryFltr *utils.SMCostFilter) error {
return utils.ErrNotImplemented
}
func (iDB *InternalDB) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*CDR, count int64, err error) {
return nil, 0, utils.ErrNotImplemented
var cdrMpIDs utils.StringMap
// Apply string filter
for _, fltrSlc := range []struct {
key string
ids []string
}{
{utils.CGRID, filter.CGRIDs},
{utils.RunID, filter.RunIDs},
{utils.OriginID, filter.OriginIDs},
{utils.OriginHost, filter.OriginHosts},
{utils.Source, filter.Sources},
{utils.ToR, filter.ToRs},
{utils.RequestType, filter.RequestTypes},
{utils.Tenant, filter.Tenants},
{utils.Category, filter.Categories},
{utils.Account, filter.Accounts},
{utils.Subject, filter.Subjects},
{utils.Destination, filter.DestinationPrefixes},
} {
if len(fltrSlc.ids) == 0 {
continue
}
grpMpIDs := make(utils.StringMap)
for _, id := range fltrSlc.ids {
grpIDs := iDB.db.GetGroupItemIDs(utils.CDRsTBL, utils.ConcatenatedKey(fltrSlc.key, id))
for _, id := range grpIDs {
grpMpIDs[id] = true
}
}
if len(grpMpIDs) == 0 {
return nil, 0, utils.ErrNotFound
}
if cdrMpIDs == nil {
cdrMpIDs = grpMpIDs
} else {
for id := range cdrMpIDs {
if !grpMpIDs.HasKey(id) {
delete(cdrMpIDs, id)
if len(cdrMpIDs) == 0 {
return nil, 0, utils.ErrNotFound
}
}
}
}
}
if cdrMpIDs == nil {
cdrMpIDs = utils.StringMapFromSlice(iDB.db.GetItemIDs(utils.CDRsTBL, utils.EmptyString))
}
// check for Not filters
for _, fltrSlc := range []struct {
key string
ids []string
}{
{utils.CGRID, filter.NotCGRIDs},
{utils.RunID, filter.NotRunIDs},
{utils.OriginID, filter.NotOriginIDs},
{utils.OriginHost, filter.NotOriginHosts},
{utils.Source, filter.NotSources},
{utils.ToR, filter.NotToRs},
{utils.RequestType, filter.NotRequestTypes},
{utils.Tenant, filter.NotTenants},
{utils.Category, filter.NotCategories},
{utils.Account, filter.NotAccounts},
{utils.Subject, filter.NotSubjects},
{utils.Destination, filter.NotDestinationPrefixes},
} {
if len(fltrSlc.ids) == 0 {
continue
}
for _, id := range fltrSlc.ids {
grpIDs := iDB.db.GetGroupItemIDs(utils.CDRsTBL, utils.ConcatenatedKey(fltrSlc.key, id))
for _, id := range grpIDs {
if cdrMpIDs.HasKey(id) {
delete(cdrMpIDs, id)
if len(cdrMpIDs) == 0 {
return nil, 0, utils.ErrNotFound
}
}
}
}
}
//cdr IDs filtered by string and not string
// //
// var cdrIDs []string
// // convert cdrMpIDs to []string so we can apply Paginator
// if len(cdrMpIDs) == 0 { // it means we have other type of filters or we don't have filters
// cdrIDs =
// } else {
// cdrIDs = cdrMpIDs.Slice()
// }
// if len(cdrIDs) == 0 {
// return nil, 0, utils.ErrNotFound
// }
// //apply paginator
// cdrIDs = filter.Paginator.PaginateStringSlice(cdrIDs)
// if filter.Count {
// return nil, int64(len(cdrIDs)), nil
// }
// if remove {
// for _, cdrID := range cdrIDs {
// iDB.db.Remove(utils.CDRsTBL, cdrID,
// cacheCommit(utils.NonTransactional), utils.NonTransactional)
// }
// return nil, 0, nil
// }
// for _, cdrID := range cdrIDs {
// for _, fltrSlc := range fltrSlcs {
// if len(fltrSlc.ids) == 0 {
// continue
// }
// grpMpIDs := make(utils.StringMap)
// for _, id := range fltrSlc.ids {
// grpIDs := iDB.db.GetGroupItemIDs(utils.CDRsTBL, utils.ConcatenatedKey(fltrSlc.key, id))
// for _, id := range grpIDs {
// grpMpIDs[id] = true
// }
// }
// if len(grpMpIDs) == 0 {
// return nil, 0, utils.ErrNotFound
// }
// if len(cdrMpIDs) == 0 {
// cdrMpIDs = grpMpIDs
// } else {
// for id := range cdrMpIDs {
// if !grpMpIDs.HasKey(id) {
// delete(cdrMpIDs, id)
// }
// }
// }
// if len(cdrMpIDs) == 0 {
// return nil, 0, utils.ErrNotFound
// }
// }
// cdrs = append(cdrs, x.(*CDR))
// }
return
}
func (iDB *InternalDB) GetSMCosts(cgrid, runid, originHost, originIDPrfx string) (smCosts []*SMCost, err error) {

View File

@@ -52,11 +52,9 @@ func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler strin
dm = NewDataManager(d.(DataDB))
case utils.INTERNAL:
if marshaler == utils.JSON {
//d, err = NewMapStorageJson()
d = NewInternalDBJson()
} else {
d = NewInternalDB()
//d, err = NewMapStorage()
}
dm = NewDataManager(d.(DataDB))
default: