Updated cache reload

This commit is contained in:
Trial97
2020-08-17 10:36:40 +03:00
committed by Dan Christian Bogos
parent 309b9c9983
commit 23746ab9dc
6 changed files with 68 additions and 43 deletions

View File

@@ -1959,7 +1959,7 @@ func TestDfEventExporterCfg(t *testing.T) {
}
eCfg := &EEsJsonCfg{
Enabled: utils.BoolPointer(false),
Attributes_conns: &[]string{utils.MetaInternal},
Attributes_conns: &[]string{},
Cache: &map[string]*CacheParamJsonCfg{
utils.MetaFileCSV: {
Limit: utils.IntPointer(-1),

View File

@@ -1949,9 +1949,9 @@ func TestCgrCdfEventReader(t *testing.T) {
func TestCgrCdfEventExporter(t *testing.T) {
eCfg := &EEsCfg{
Enabled: false,
AttributeSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes)},
AttributeSConns: []string{},
Cache: map[string]*CacheParamCfg{
utils.MetaFileCSV: &CacheParamCfg{
utils.MetaFileCSV: {
Limit: -1,
TTL: time.Duration(5 * time.Second),
StaticTTL: false,
@@ -1959,7 +1959,7 @@ func TestCgrCdfEventExporter(t *testing.T) {
},
Templates: map[string][]*FCTemplate{},
Exporters: []*EventExporterCfg{
&EventExporterCfg{
{
ID: utils.MetaDefault,
Type: utils.META_NONE,
FieldSep: ",",

View File

@@ -703,7 +703,7 @@ func testDspSessionProcessEvent2(t *testing.T) {
}
eAttrs := &engine.AttrSProcessEventReply{
MatchedProfiles: []string{"ATTR_1001_SIMPLEAUTH"},
AlteredFields: []string{"*req.Password", "*req.EventName"},
AlteredFields: []string{"*req.EventName", "*req.Password"},
CGREventWithOpts: &utils.CGREventWithOpts{
Opts: map[string]interface{}{
utils.OptsAPIKey: "pse12345",
@@ -734,6 +734,7 @@ func testDspSessionProcessEvent2(t *testing.T) {
eAttrs.CGREvent.Event[utils.SetupTime] = args.CGREvent.Event[utils.SetupTime]
eAttrs.CGREvent.Event[utils.AnswerTime] = args.CGREvent.Event[utils.AnswerTime]
}
sort.Strings(rply.Attributes.AlteredFields)
if !reflect.DeepEqual(eAttrs, rply.Attributes) {
t.Errorf("expecting: %+v, received: %+v",
utils.ToJSON(eAttrs), utils.ToJSON(rply.Attributes))

View File

@@ -329,7 +329,7 @@ func (chS *CacheS) V1RemoveGroup(args *utils.ArgsGetGroupWithOpts,
func (chS *CacheS) V1ReloadCache(attrs utils.AttrReloadCacheWithOpts, reply *string) (err error) {
for key, ids := range attrs.ArgsCache {
if prfx, has := utils.ArgCacheToPrefix[key]; has && len(ids) != 0 {
if prfx, has := utils.ArgCacheToPrefix[key]; has {
if err = chS.dm.CacheDataFromDB(prfx, ids, true); err != nil {
return
}

View File

@@ -131,27 +131,23 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b
return
}
if ids == nil {
keyIDs, err := dm.DataDB().GetKeysForPrefix(prfx)
if err != nil {
return utils.NewCGRError(utils.DataManager,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("DataManager error <%s> querying keys for prefix: <%s>", err.Error(), prfx))
}
for _, keyID := range keyIDs {
if mustBeCached { // Only consider loading ids which are already in cache
if _, hasIt := Cache.Get(utils.CachePrefixToInstance[prfx], keyID[len(prfx):]); !hasIt {
continue
}
if mustBeCached {
ids = Cache.GetItemIDs(utils.CachePrefixToInstance[prfx], utils.EmptyString)
} else {
if ids, err = dm.DataDB().GetKeysForPrefix(prfx); err != nil {
return utils.NewCGRError(utils.DataManager,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("DataManager error <%s> querying keys for prefix: <%s>", err.Error(), prfx))
}
if cCfg, has := dm.cacheCfg.Partitions[utils.CachePrefixToInstance[prfx]]; has &&
cCfg.Limit >= 0 &&
cCfg.Limit < len(ids) {
ids = ids[:cCfg.Limit]
}
for i := range ids {
ids[i] = strings.TrimPrefix(ids[i], prfx)
}
ids = append(ids, keyID[len(prfx):])
}
var nrItems int
if cCfg, has := dm.cacheCfg.Partitions[utils.CachePrefixToInstance[prfx]]; has {
nrItems = cCfg.Limit
}
if nrItems > 0 && nrItems < len(ids) { // More ids than cache config allows it, limit here
ids = ids[:nrItems]
}
}
for _, dataID := range ids {
@@ -281,18 +277,16 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b
_, err = dm.GetItemLoadIDs(utils.EmptyString, true)
}
if err != nil {
if err == utils.ErrNotFound {
if errCh := Cache.Remove(utils.CachePrefixToInstance[prfx], dataID,
cacheCommit(utils.NonTransactional), utils.NonTransactional); errCh != nil {
return errCh
}
err = nil
} else {
if err != utils.ErrNotFound {
return utils.NewCGRError(utils.DataManager,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error <%s> querying DataManager for category: <%s>, dataID: <%s>", err.Error(), prfx, dataID))
}
if err = Cache.Remove(utils.CachePrefixToInstance[prfx], dataID,
cacheCommit(utils.NonTransactional), utils.NonTransactional); err != nil {
return
}
}
}
return

View File

@@ -455,10 +455,13 @@ func testGetCDRs(cfg *config.CGRConfig) error {
},
}
// Store all CDRs
for _, cdr := range cdrs {
for i, cdr := range cdrs {
if err := cdrStorage.SetCDR(cdr, false); err != nil {
return fmt.Errorf("testGetCDRs #4 CDR: %+v, err: %v", cdr, err)
}
if *dbType == utils.MetaMySQL {
cdr.OrderID = int64(i + 1)
}
}
// All CDRs, no filter
if CDRs, _, err := cdrStorage.GetCDRs(new(utils.CDRsFilter), false); err != nil {
@@ -823,28 +826,55 @@ func testGetCDRs(cfg *config.CGRConfig) error {
//Filter by OrderID with paginator
if CDRs, _, err := cdrStorage.GetCDRs(&utils.CDRsFilter{OrderBy: "OrderID", Paginator: utils.Paginator{Limit: utils.IntPointer(3)}}, false); err != nil {
return fmt.Errorf("testGetCDRs #101, err: %v", err)
} else if !reflect.DeepEqual(cdrs[:3], CDRs) {
return fmt.Errorf("testGetCDRs #102 Expected %+v received %+v \n", cdrs, CDRs)
} else {
for i, cdr := range CDRs {
cdr.SetupTime = cdr.SetupTime.UTC()
cdr.AnswerTime = cdr.AnswerTime.UTC()
if *dbType == utils.MetaMongo {
cdrs[i].OrderID = cdr.OrderID
}
}
if !reflect.DeepEqual(cdrs[:3], CDRs) {
return fmt.Errorf("testGetCDRs #102 Expected %+v received %+v \n", utils.ToJSON(cdrs[:3]), utils.ToJSON(CDRs))
}
}
if CDRs, _, err := cdrStorage.GetCDRs(&utils.CDRsFilter{OrderBy: "OrderID", Paginator: utils.Paginator{Limit: utils.IntPointer(5)}}, false); err != nil {
return fmt.Errorf("testGetCDRs #103, err: %v", err)
} else if !reflect.DeepEqual(cdrs[:5], CDRs) {
return fmt.Errorf("testGetCDRs #104 Expected %+v received %+v \n", cdrs, CDRs)
} else {
for i, cdr := range CDRs {
cdr.SetupTime = cdr.SetupTime.UTC()
cdr.AnswerTime = cdr.AnswerTime.UTC()
if *dbType == utils.MetaMongo {
cdrs[i].OrderID = cdr.OrderID
}
}
if !reflect.DeepEqual(cdrs[:5], CDRs) {
return fmt.Errorf("testGetCDRs #104 Expected %+v received %+v \n", utils.ToJSON(cdrs[:5]), utils.ToJSON(CDRs))
}
}
if CDRs, _, err := cdrStorage.GetCDRs(&utils.CDRsFilter{OrderBy: "OrderID", Paginator: utils.Paginator{Limit: utils.IntPointer(3), Offset: utils.IntPointer(2)}}, false); err != nil {
return fmt.Errorf("testGetCDRs #103, err: %v", err)
} else if !reflect.DeepEqual(cdrs[2:5], CDRs) {
return fmt.Errorf("testGetCDRs #104 Expected %+v received %+v \n", utils.ToJSON(cdrs[2:5]), utils.ToJSON(CDRs))
return fmt.Errorf("testGetCDRs #105, err: %v", err)
} else {
for i, cdr := range CDRs {
cdr.SetupTime = cdr.SetupTime.UTC()
cdr.AnswerTime = cdr.AnswerTime.UTC()
if *dbType == utils.MetaMongo {
cdrs[i+2].OrderID = cdr.OrderID
}
}
if !reflect.DeepEqual(cdrs[2:5], CDRs) {
return fmt.Errorf("testGetCDRs #106 Expected %+v received %+v \n", utils.ToJSON(cdrs[2:5]), utils.ToJSON(CDRs))
}
}
if _, _, err := cdrStorage.GetCDRs(&utils.CDRsFilter{Paginator: utils.Paginator{Limit: utils.IntPointer(3), Offset: utils.IntPointer(20)}}, false); err != utils.ErrNotFound {
return fmt.Errorf("testGetCDRs #105, err: %v", err)
return fmt.Errorf("testGetCDRs #107, err: %v", err)
}
if _, _, err := cdrStorage.GetCDRs(&utils.CDRsFilter{OrderBy: "OrderID", Paginator: utils.Paginator{Limit: utils.IntPointer(3), Offset: utils.IntPointer(20)}}, false); err != utils.ErrNotFound {
return fmt.Errorf("testGetCDRs #105, err: %v", err)
return fmt.Errorf("testGetCDRs #108, err: %v", err)
}
return nil
}