Move cdrs to its own package

Moved engine/cdr.go to utils/cdr.go.
Moved engine/cdrs.go and engine/cdrs_test.go to cdrs package.
Moved api functions from cdrs/cdrs.go to cdrs/apis.go. Same for the
tests.
Deleted CDRsV1 type and its methods.
This commit is contained in:
ionutboangiu
2023-12-11 07:17:40 -05:00
committed by Dan Christian Bogos
parent 1cec3751e4
commit d29f84169c
19 changed files with 1290 additions and 1408 deletions

View File

@@ -28,7 +28,7 @@ import (
)
// GetCDRs retrieves a list of CDRs matching the specified filters.
func (admS AdminSv1) GetCDRs(ctx *context.Context, args *engine.CDRFilters, reply *[]*engine.CDR) error {
func (admS AdminSv1) GetCDRs(ctx *context.Context, args *utils.CDRFilters, reply *[]*utils.CDR) error {
if args.Tenant == utils.EmptyString {
args.Tenant = admS.cfg.GeneralCfg().DefaultTenant
}
@@ -45,7 +45,7 @@ func (admS AdminSv1) GetCDRs(ctx *context.Context, args *engine.CDRFilters, repl
}
// RemoveCDRs removes CDRs matching the specified filters.
func (admS AdminSv1) RemoveCDRs(ctx *context.Context, args *engine.CDRFilters, reply *string) (err error) {
func (admS AdminSv1) RemoveCDRs(ctx *context.Context, args *utils.CDRFilters, reply *string) (err error) {
if args.Tenant == utils.EmptyString {
args.Tenant = admS.cfg.GeneralCfg().DefaultTenant
}
@@ -59,32 +59,3 @@ func (admS AdminSv1) RemoveCDRs(ctx *context.Context, args *engine.CDRFilters, r
*reply = utils.OK
return
}
// NewCDRsV1 constructs the RPC Object for CDRsV1
func NewCDRsV1(cdrS *engine.CDRServer) *CDRsV1 {
return &CDRsV1{cdrS: cdrS}
}
// CDRsV1 Exports RPC from CDRs
type CDRsV1 struct {
ping
cdrS *engine.CDRServer
}
// ProcessEvent will process the CGREvent
func (cdrSv1 *CDRsV1) ProcessEvent(ctx *context.Context, args *utils.CGREvent,
reply *string) error {
return cdrSv1.cdrS.V1ProcessEvent(ctx, args, reply)
}
// ProcessEventWithGet has the same logic with V1ProcessEvent except it adds the proccessed events to the reply
func (cdrSv1 *CDRsV1) ProcessEventWithGet(ctx *context.Context, args *utils.CGREvent,
reply *[]*utils.EventsWithOpts) error {
return cdrSv1.cdrS.V1ProcessEventWithGet(ctx, args, reply)
}
// ProcessStoredEvents processes stored events based on provided filters.
func (cdrSv1 *CDRsV1) ProcessStoredEvents(ctx *context.Context, args *engine.CDRFilters,
reply *string) error {
return cdrSv1.cdrS.V1ProcessStoredEvents(ctx, args, reply)
}

View File

@@ -1,91 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package apis
import (
"reflect"
"testing"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func TestCDRsProcessEvent(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
connMgr := engine.NewConnManager(cfg)
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
storDB := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
storDBChan := make(chan engine.StorDB, 1)
storDBChan <- storDB
cdrS := engine.NewCDRServer(cfg, dm, engine.NewFilterS(cfg, connMgr, dm), connMgr, storDBChan)
cdr := NewCDRsV1(cdrS)
var reply string
args := &utils.CGREvent{
ID: "TestMatchingAccountsForEvent",
Tenant: "cgrates.org",
Event: map[string]any{
utils.AccountField: "1001",
},
}
if err := cdr.ProcessEvent(context.Background(), args, &reply); err != nil {
t.Error(err)
}
if reply != utils.OK {
t.Errorf("Expected %v\n but received %v", utils.OK, reply)
}
}
func TestCDRsProcessEventWithGet(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
connMgr := engine.NewConnManager(cfg)
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
storDB := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
storDBChan := make(chan engine.StorDB, 1)
storDBChan <- storDB
cdrS := engine.NewCDRServer(cfg, dm, engine.NewFilterS(cfg, connMgr, dm), connMgr, storDBChan)
cdr := NewCDRsV1(cdrS)
var reply []*utils.EventsWithOpts
args := &utils.CGREvent{
ID: "TestMatchingAccountsForEvent",
Tenant: "cgrates.org",
Event: map[string]any{
utils.AccountField: "1001",
},
}
if err := cdr.ProcessEventWithGet(context.Background(), args, &reply); err != nil {
t.Error(err)
}
exp := []utils.EventsWithOpts{
{
Event: map[string]any{
utils.AccountField: "1001",
},
Opts: map[string]any{},
},
}
if !reflect.DeepEqual(exp[0].Event, reply[0].Event) {
t.Errorf("Expected %v \n but received %v", exp, reply)
}
}

144
cdrs/apis.go Normal file
View File

@@ -0,0 +1,144 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package cdrs
import (
"errors"
"fmt"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
// V1ProcessEvent will process the CGREvent
func (cdrS *CDRServer) V1ProcessEvent(ctx *context.Context, args *utils.CGREvent, reply *string) (err error) {
if args.ID == utils.EmptyString {
args.ID = utils.GenUUID()
}
if args.Tenant == utils.EmptyString {
args.Tenant = cdrS.cfg.GeneralCfg().DefaultTenant
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessEvent, args.ID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
if _, err = cdrS.processEvents(ctx, []*utils.CGREvent{args}); err != nil {
return
}
*reply = utils.OK
return nil
}
// V1ProcessEventWithGet has the same logic with V1ProcessEvent except it adds the proccessed events to the reply
func (cdrS *CDRServer) V1ProcessEventWithGet(ctx *context.Context, args *utils.CGREvent, evs *[]*utils.EventsWithOpts) (err error) {
if args.ID == utils.EmptyString {
args.ID = utils.GenUUID()
}
if args.Tenant == utils.EmptyString {
args.Tenant = cdrS.cfg.GeneralCfg().DefaultTenant
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessEventWithGet, args.ID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*evs = *cachedResp.Result.(*[]*utils.EventsWithOpts)
}
return cachedResp.Error
}
defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: evs, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
var procEvs []*utils.EventsWithOpts
if procEvs, err = cdrS.processEvents(ctx, []*utils.CGREvent{args}); err != nil {
return
}
*evs = procEvs
return nil
}
// V1ProcessStoredEvents processes stored events based on provided filters.
func (cdrS *CDRServer) V1ProcessStoredEvents(ctx *context.Context, args *utils.CDRFilters, reply *string) (err error) {
if args.ID == utils.EmptyString {
args.ID = utils.GenUUID()
}
if args.Tenant == utils.EmptyString {
args.Tenant = cdrS.cfg.GeneralCfg().DefaultTenant
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessStoredEvents, args.ID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey)
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
fltrs, err := engine.GetFilters(ctx, args.FilterIDs, args.Tenant, cdrS.dm)
if err != nil {
return fmt.Errorf("preparing filters failed: %w", err)
}
cdrs, err := cdrS.db.GetCDRs(ctx, fltrs, args.APIOpts)
if err != nil {
return fmt.Errorf("retrieving CDRs failed: %w", err)
}
_, err = cdrS.processEvents(ctx, utils.CDRsToCGREvents(cdrs))
if err != nil && !errors.Is(err, utils.ErrPartiallyExecuted) {
return fmt.Errorf("processing events failed: %w", err)
}
*reply = utils.OK
return err
}

581
cdrs/apis_test.go Normal file
View File

@@ -0,0 +1,581 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package cdrs
import (
"reflect"
"testing"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func TestCDRsV1ProcessEventMock(t *testing.T) {
testCache := engine.Cache
tmpC := config.CgrConfig()
defer func() {
engine.Cache = testCache
config.SetCgrConfig(tmpC)
}()
cfg := config.NewDefaultCGRConfig()
cfg.CdrsCfg().EEsConns = []string{utils.ConcatenatedKey(utils.MetaInternal,
utils.MetaEEs)}
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
connMng := engine.NewConnManager(cfg)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
engine.Cache = engine.NewCacheS(cfg, dm, nil, nil)
storDB := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
storDBChan := make(chan engine.StorDB, 1)
storDBChan <- storDB
newCDRSrv := NewCDRServer(cfg, dm, fltrs, connMng, storDBChan)
ccM := &ccMock{
calls: map[string]func(ctx *context.Context, args any, reply any) error{
utils.EeSv1ProcessEvent: func(ctx *context.Context, args, reply any) error {
*reply.(*map[string]map[string]any) = map[string]map[string]any{}
return utils.ErrNotFound
},
},
}
rpcInternal := make(chan birpc.ClientConnector, 1)
rpcInternal <- ccM
newCDRSrv.connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal,
utils.MetaEEs), utils.ThresholdSv1, rpcInternal)
cgrEv := &utils.CGREvent{
Event: map[string]any{
"Resources": "ResourceProfile1",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"UsageInterval": "1s",
"PddInterval": "1s",
utils.Weight: "20.0",
utils.Usage: 135 * time.Second,
utils.Cost: 123.0,
},
APIOpts: map[string]any{
utils.OptsCDRsExport: true,
"*context": utils.MetaCDRs,
},
}
var rply string
err := newCDRSrv.V1ProcessEvent(context.Background(), cgrEv, &rply)
if err != nil {
t.Errorf("\nExpected <%+v> \n, received <%+v>", nil, err)
}
expected := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testID",
Event: map[string]any{
"Resources": "ResourceProfile1",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"UsageInterval": "1s",
"PddInterval": "1s",
utils.Weight: "20.0",
utils.Usage: 135 * time.Second,
utils.Cost: 123.0,
},
APIOpts: map[string]any{
utils.OptsCDRsExport: true,
"*context": utils.MetaCDRs,
},
}
cgrEv.ID = "testID"
delete(cgrEv.APIOpts, utils.MetaCDRID) // ignore autogenerated *cdr field when comparing
if !reflect.DeepEqual(expected, cgrEv) {
t.Errorf("\nExpected <%+v> \n,received <%+v>", utils.ToJSON(expected), utils.ToJSON(cgrEv))
}
}
func TestCDRsV1ProcessEventMockErr(t *testing.T) {
testCache := engine.Cache
tmpC := config.CgrConfig()
defer func() {
engine.Cache = testCache
config.SetCgrConfig(tmpC)
}()
cfg := config.NewDefaultCGRConfig()
cfg.CdrsCfg().EEsConns = []string{utils.ConcatenatedKey(utils.MetaInternal,
utils.MetaEEs)}
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
connMng := engine.NewConnManager(cfg)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
engine.Cache = engine.NewCacheS(cfg, dm, nil, nil)
storDB := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
storDBChan := make(chan engine.StorDB, 1)
storDBChan <- storDB
newCDRSrv := NewCDRServer(cfg, dm, fltrs, connMng, storDBChan)
ccM := &ccMock{
calls: map[string]func(ctx *context.Context, args any, reply any) error{
utils.EeSv1ProcessEvent: func(ctx *context.Context, args, reply any) error {
*reply.(*map[string]map[string]any) = map[string]map[string]any{}
return utils.ErrNotFound
},
},
}
rpcInternal := make(chan birpc.ClientConnector, 1)
rpcInternal <- ccM
newCDRSrv.connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal,
utils.MetaEEs), utils.ThresholdSv1, rpcInternal)
cgrEv := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testID",
Event: map[string]any{
"Resources": "ResourceProfile1",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"UsageInterval": "1s",
"PddInterval": "1s",
utils.Weight: "20.0",
utils.Usage: 135 * time.Second,
utils.Cost: 123.0,
},
APIOpts: map[string]any{
utils.MetaStats: true,
utils.OptsCDRsExport: true,
"*context": utils.MetaCDRs,
},
}
var rply string
err := newCDRSrv.V1ProcessEvent(context.Background(), cgrEv, &rply)
if err == nil || err.Error() != "PARTIALLY_EXECUTED" {
t.Errorf("\nExpected <%+v> \n, received <%+v>", "PARTIALLY_EXECUTED", err)
}
expected := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testID",
Event: map[string]any{
"Resources": "ResourceProfile1",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"UsageInterval": "1s",
"PddInterval": "1s",
utils.Weight: "20.0",
utils.Usage: 135 * time.Second,
utils.Cost: 123.0,
},
APIOpts: map[string]any{
utils.MetaStats: true,
utils.OptsCDRsExport: true,
"*context": utils.MetaCDRs,
},
}
delete(cgrEv.APIOpts, utils.MetaCDRID) // ignore autogenerated *cdr field when comparing
if !reflect.DeepEqual(expected, cgrEv) {
t.Errorf("\nExpected <%+v> \n,received <%+v>", expected, cgrEv)
}
}
func TestCDRsV1ProcessEventMockCache(t *testing.T) {
testCache := engine.Cache
tmpC := config.CgrConfig()
defer func() {
engine.Cache = testCache
config.SetCgrConfig(tmpC)
}()
cfg := config.NewDefaultCGRConfig()
cfg.CdrsCfg().EEsConns = []string{utils.ConcatenatedKey(utils.MetaInternal,
utils.MetaEEs)}
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
connMng := engine.NewConnManager(cfg)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
engine.Cache = engine.NewCacheS(cfg, dm, nil, nil)
storDB := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
storDBChan := make(chan engine.StorDB, 1)
storDBChan <- storDB
newCDRSrv := NewCDRServer(cfg, dm, fltrs, connMng, storDBChan)
ccM := &ccMock{
calls: map[string]func(ctx *context.Context, args any, reply any) error{
utils.EeSv1ProcessEvent: func(ctx *context.Context, args, reply any) error {
*reply.(*map[string]map[string]any) = map[string]map[string]any{}
return utils.ErrNotFound
},
},
}
rpcInternal := make(chan birpc.ClientConnector, 1)
rpcInternal <- ccM
newCDRSrv.connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal,
utils.MetaEEs), utils.ThresholdSv1, rpcInternal)
cgrEv := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testID",
Event: map[string]any{
"Resources": "ResourceProfile1",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"UsageInterval": "1s",
"PddInterval": "1s",
utils.Weight: "20.0",
utils.Usage: 135 * time.Second,
utils.Cost: 123.0,
},
APIOpts: map[string]any{
utils.OptsCDRsExport: true,
"*context": utils.MetaCDRs,
},
}
defaultConf := config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses]
config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit = 1
defer func() {
config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses] = defaultConf
}()
var rply string
err := newCDRSrv.V1ProcessEvent(context.Background(), cgrEv, &rply)
if err != nil {
t.Errorf("\nExpected <%+v> \n, received <%+v>", nil, err)
}
expected := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testID",
Event: map[string]any{
"Resources": "ResourceProfile1",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"UsageInterval": "1s",
"PddInterval": "1s",
utils.Weight: "20.0",
utils.Usage: 135 * time.Second,
utils.Cost: 123.0,
},
APIOpts: map[string]any{
utils.OptsCDRsExport: true,
"*context": utils.MetaCDRs,
},
}
delete(cgrEv.APIOpts, utils.MetaCDRID) // ignore autogenerated *cdr field when comparing
if !reflect.DeepEqual(expected, cgrEv) {
t.Errorf("\nExpected <%+v> \n,received <%+v>", expected, cgrEv)
}
}
func TestCDRsV1ProcessEventWithGetMockCache(t *testing.T) {
testCache := engine.Cache
tmpC := config.CgrConfig()
defer func() {
engine.Cache = testCache
config.SetCgrConfig(tmpC)
}()
cfg := config.NewDefaultCGRConfig()
cfg.CdrsCfg().EEsConns = []string{utils.ConcatenatedKey(utils.MetaInternal,
utils.MetaEEs)}
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
connMng := engine.NewConnManager(cfg)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
engine.Cache = engine.NewCacheS(cfg, dm, nil, nil)
storDB := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
storDBChan := make(chan engine.StorDB, 1)
storDBChan <- storDB
newCDRSrv := NewCDRServer(cfg, dm, fltrs, connMng, storDBChan)
ccM := &ccMock{
calls: map[string]func(ctx *context.Context, args any, reply any) error{
utils.EeSv1ProcessEvent: func(ctx *context.Context, args, reply any) error {
*reply.(*map[string]map[string]any) = map[string]map[string]any{}
return utils.ErrNotFound
},
},
}
rpcInternal := make(chan birpc.ClientConnector, 1)
rpcInternal <- ccM
newCDRSrv.connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal,
utils.MetaEEs), utils.ThresholdSv1, rpcInternal)
cgrEv := &utils.CGREvent{
Event: map[string]any{
"Resources": "ResourceProfile1",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"UsageInterval": "1s",
"PddInterval": "1s",
utils.Weight: "20.0",
utils.Usage: 135 * time.Second,
utils.Cost: 123.0,
},
APIOpts: map[string]any{
utils.OptsCDRsExport: true,
"*context": utils.MetaCDRs,
},
}
defaultConf := config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses]
config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit = 1
defer func() {
config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses] = defaultConf
}()
var rply []*utils.EventsWithOpts
err := newCDRSrv.V1ProcessEventWithGet(context.Background(), cgrEv, &rply)
if err != nil {
t.Errorf("\nExpected <%+v> \n, received <%+v>", nil, err)
}
expected := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testID",
Event: map[string]any{
"Resources": "ResourceProfile1",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"UsageInterval": "1s",
"PddInterval": "1s",
utils.Weight: "20.0",
utils.Usage: 135 * time.Second,
utils.Cost: 123.0,
},
APIOpts: map[string]any{
utils.OptsCDRsExport: true,
"*context": utils.MetaCDRs,
},
}
cgrEv.ID = "testID"
delete(cgrEv.APIOpts, utils.MetaCDRID) // ignore autogenerated *cdr field when comparing
if !reflect.DeepEqual(expected, cgrEv) {
t.Errorf("\nExpected <%+v> \n,received <%+v>", expected, cgrEv)
}
}
func TestCDRsV1ProcessEventWithGetMockCacheErr(t *testing.T) {
testCache := engine.Cache
tmpC := config.CgrConfig()
defer func() {
engine.Cache = testCache
config.SetCgrConfig(tmpC)
}()
cfg := config.NewDefaultCGRConfig()
cfg.CdrsCfg().EEsConns = []string{utils.ConcatenatedKey(utils.MetaInternal,
utils.MetaEEs)}
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
connMng := engine.NewConnManager(cfg)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
engine.Cache = engine.NewCacheS(cfg, dm, nil, nil)
storDB := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
storDBChan := make(chan engine.StorDB, 1)
storDBChan <- storDB
newCDRSrv := NewCDRServer(cfg, dm, fltrs, connMng, storDBChan)
ccM := &ccMock{
calls: map[string]func(ctx *context.Context, args any, reply any) error{
utils.EeSv1ProcessEvent: func(ctx *context.Context, args, reply any) error {
*reply.(*map[string]map[string]any) = map[string]map[string]any{}
return utils.ErrNotFound
},
},
}
rpcInternal := make(chan birpc.ClientConnector, 1)
rpcInternal <- ccM
newCDRSrv.connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal,
utils.MetaEEs), utils.ThresholdSv1, rpcInternal)
cgrEv := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testID",
Event: map[string]any{
"Resources": "ResourceProfile1",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"UsageInterval": "1s",
"PddInterval": "1s",
utils.Weight: "20.0",
utils.Usage: 135 * time.Second,
utils.Cost: 123.0,
},
APIOpts: map[string]any{
utils.OptsCDRsExport: true,
utils.MetaAttributes: time.Second,
"*context": utils.MetaCDRs,
},
}
defaultConf := config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses]
config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit = 1
defer func() {
config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses] = defaultConf
}()
expectedErr := `retrieving *attributes option failed: cannot convert field: 1s to bool`
var rply []*utils.EventsWithOpts
err := newCDRSrv.V1ProcessEventWithGet(context.Background(), cgrEv, &rply)
if err == nil || err.Error() != expectedErr {
t.Errorf("expected <%v>, received <%v>", expectedErr, err)
}
}
func TestCDRsV1ProcessEventCacheGet(t *testing.T) {
testCache := engine.Cache
tmpC := config.CgrConfig()
defer func() {
engine.Cache = testCache
config.SetCgrConfig(tmpC)
}()
cfg := config.NewDefaultCGRConfig()
cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit = 1
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
engine.Cache = engine.NewCacheS(cfg, dm, nil, nil)
storDB := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
storDBChan := make(chan engine.StorDB, 1)
storDBChan <- storDB
newCDRSrv := NewCDRServer(cfg, dm, fltrs, nil, storDBChan)
cgrEv := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testID",
Event: map[string]any{
utils.Cost: 123,
},
}
rply := "string"
engine.Cache.Set(context.Background(), utils.CacheRPCResponses, "CDRsV1.ProcessEvent:testID",
&utils.CachedRPCResponse{Result: &rply, Error: nil},
nil, true, utils.NonTransactional)
err := newCDRSrv.V1ProcessEvent(context.Background(), cgrEv, &rply)
if err != nil {
t.Errorf("\nExpected <%+v> \n, received <%+v>", nil, err)
}
expected := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testID",
Event: map[string]any{
utils.Cost: 123,
},
}
if !reflect.DeepEqual(expected, cgrEv) {
t.Errorf("\nExpected <%+v> \n,received <%+v>", expected, cgrEv)
}
}
func TestCDRsV1ProcessEventWithGetCacheGet(t *testing.T) {
testCache := engine.Cache
tmpC := config.CgrConfig()
defer func() {
engine.Cache = testCache
config.SetCgrConfig(tmpC)
}()
cfg := config.NewDefaultCGRConfig()
cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit = 1
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
engine.Cache = engine.NewCacheS(cfg, dm, nil, nil)
storDB := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
storDBChan := make(chan engine.StorDB, 1)
storDBChan <- storDB
newCDRSrv := NewCDRServer(cfg, dm, fltrs, nil, storDBChan)
cgrEv := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testID",
Event: map[string]any{
utils.Cost: 123,
},
}
rply := []*utils.EventsWithOpts{}
engine.Cache.Set(context.Background(), utils.CacheRPCResponses, "CDRsV1.ProcessEvent:testID",
&utils.CachedRPCResponse{Result: &rply, Error: nil},
nil, true, utils.NonTransactional)
err := newCDRSrv.V1ProcessEventWithGet(context.Background(), cgrEv, &rply)
if err != nil {
t.Errorf("\nExpected <%+v> \n, received <%+v>", nil, err)
}
expected := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testID",
Event: map[string]any{
utils.Cost: 123,
},
APIOpts: map[string]any{},
}
delete(cgrEv.APIOpts, utils.MetaCDRID) // ignore autogenerated *cdr field when comparing
if !reflect.DeepEqual(expected, cgrEv) {
t.Errorf("\nExpected <%+v> \n,received <%+v>", expected, cgrEv)
}
}
func TestCDRsV1ProcessEventWithGetMockCacheErrResp(t *testing.T) {
testCache := engine.Cache
tmpC := config.CgrConfig()
defer func() {
engine.Cache = testCache
config.SetCgrConfig(tmpC)
}()
engine.Cache.Clear(nil)
cfg := config.NewDefaultCGRConfig()
cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit = 1
config.SetCgrConfig(cfg)
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
engine.Cache = engine.NewCacheS(cfg, dm, nil, nil)
storDB := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
storDBChan := make(chan engine.StorDB, 1)
storDBChan <- storDB
newCDRSrv := NewCDRServer(cfg, dm, fltrs, nil, storDBChan)
cgrEv := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testID",
Event: map[string]any{
utils.Cost: 123,
},
APIOpts: map[string]any{},
}
evs := []*utils.EventsWithOpts{
{
Event: map[string]any{
utils.Cost: 666,
},
},
}
engine.Cache.Set(context.Background(), utils.CacheRPCResponses, "CDRsV1.ProcessEventWithGet:testID",
&utils.CachedRPCResponse{Result: &evs, Error: nil},
nil, true, utils.NonTransactional)
var reply []*utils.EventsWithOpts
err := newCDRSrv.V1ProcessEventWithGet(context.Background(), cgrEv, &reply)
if err != nil {
t.Errorf("\nExpected <%+v> \n, received <%+v>", nil, err)
}
expectedVal := []*utils.EventsWithOpts{
{
Event: map[string]any{
utils.Cost: 666,
},
},
}
if !reflect.DeepEqual(expectedVal, reply) {
t.Errorf("Expected %v, received %v", utils.ToJSON(expectedVal), utils.ToJSON(reply))
}
}

View File

@@ -16,27 +16,27 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
package cdrs
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
func newMapEventFromReqForm(r *http.Request) (mp MapEvent, err error) {
func newMapEventFromReqForm(r *http.Request) (mp engine.MapEvent, err error) {
if r.Form == nil {
if err = r.ParseForm(); err != nil {
return
}
}
mp = MapEvent{utils.Source: r.RemoteAddr}
mp = engine.MapEvent{utils.Source: r.RemoteAddr}
for k, vals := range r.Form {
mp[k] = vals[0] // We only support the first value for now, if more are provided it is considered remote's fault
}
@@ -44,8 +44,8 @@ func newMapEventFromReqForm(r *http.Request) (mp MapEvent, err error) {
}
// NewCDRServer is a constructor for CDRServer
func NewCDRServer(cfg *config.CGRConfig, dm *DataManager, filterS *FilterS, connMgr *ConnManager,
storDBChan chan StorDB) *CDRServer {
func NewCDRServer(cfg *config.CGRConfig, dm *engine.DataManager, filterS *engine.FilterS, connMgr *engine.ConnManager,
storDBChan chan engine.StorDB) *CDRServer {
cdrDB := <-storDBChan
return &CDRServer{
cfg: cfg,
@@ -61,12 +61,12 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataManager, filterS *FilterS, conn
// CDRServer stores and rates CDRs
type CDRServer struct {
cfg *config.CGRConfig
dm *DataManager
db StorDB
dm *engine.DataManager
db engine.StorDB
guard *guardian.GuardianLocker
fltrS *FilterS
connMgr *ConnManager
dbChan chan StorDB
fltrS *engine.FilterS
connMgr *engine.ConnManager
dbChan chan engine.StorDB
}
// ListenAndServe listen for storbd reload
@@ -85,7 +85,7 @@ func (cdrS *CDRServer) ListenAndServe(stopChan chan struct{}) {
// chrgrSProcessEvent forks CGREventWithOpts into multiples based on matching ChargerS profiles
func (cdrS *CDRServer) chrgrSProcessEvent(ctx *context.Context, cgrEv *utils.CGREvent) (cgrEvs []*utils.CGREvent, err error) {
var chrgrs []*ChrgSProcessEventReply
var chrgrs []*engine.ChrgSProcessEventReply
if err = cdrS.connMgr.Call(ctx, cdrS.cfg.CdrsCfg().ChargerSConns,
utils.ChargerSv1ProcessEvent,
cgrEv, &chrgrs); err != nil {
@@ -103,7 +103,7 @@ func (cdrS *CDRServer) chrgrSProcessEvent(ctx *context.Context, cgrEv *utils.CGR
// attrSProcessEvent will send the event to StatS if the connection is configured
func (cdrS *CDRServer) attrSProcessEvent(ctx *context.Context, cgrEv *utils.CGREvent) (err error) {
var rplyEv AttrSProcessEventReply
var rplyEv engine.AttrSProcessEventReply
if cgrEv.APIOpts == nil {
cgrEv.APIOpts = make(map[string]any)
}
@@ -166,7 +166,7 @@ func (cdrS *CDRServer) thdSProcessEvent(ctx *context.Context, cgrEv *utils.CGREv
var tIDs []string
// we clone the CGREvent so we can add EventType without being propagated
cgrEv = cgrEv.Clone()
cgrEv.APIOpts[utils.MetaEventType] = utils.CDR
cgrEv.APIOpts[utils.MetaEventType] = utils.CDRKey
if err = cdrS.connMgr.Call(ctx, cdrS.cfg.CdrsCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent,
cgrEv, &tIDs); err != nil &&
@@ -204,7 +204,7 @@ func (cdrS *CDRServer) eeSProcessEvent(ctx *context.Context, cgrEv *utils.CGREve
// In case of partially executed, both the error and the events will be returned.
func (cdrS *CDRServer) processEvents(ctx *context.Context, evs []*utils.CGREvent) ([]*utils.EventsWithOpts, error) {
for _, ev := range evs {
attrS, err := GetBoolOpts(ctx, ev.Tenant, ev.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Attributes,
attrS, err := engine.GetBoolOpts(ctx, ev.Tenant, ev.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Attributes,
config.CDRsAttributesDftOpt, utils.MetaAttributes)
if err != nil {
return nil, fmt.Errorf("retrieving %s option failed: %w", utils.MetaAttributes, err)
@@ -224,7 +224,7 @@ func (cdrS *CDRServer) processEvents(ctx *context.Context, evs []*utils.CGREvent
cgrEvs := make([]*utils.CGREvent, 0, len(evs))
for _, ev := range evs {
chrgS, err := GetBoolOpts(ctx, ev.Tenant, ev.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Chargers,
chrgS, err := engine.GetBoolOpts(ctx, ev.Tenant, ev.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Chargers,
config.CDRsChargersDftOpt, utils.MetaChargers)
if err != nil {
return nil, fmt.Errorf("retrieving %s option failed: %w", utils.MetaChargers, err)
@@ -246,7 +246,7 @@ func (cdrS *CDRServer) processEvents(ctx *context.Context, evs []*utils.CGREvent
var partiallyExecuted bool // from here actions are optional and a general error is returned
for _, cgrEv := range cgrEvs {
rateS, err := GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Rates,
rateS, err := engine.GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Rates,
config.CDRsRatesDftOpt, utils.MetaRates)
if err != nil {
return nil, fmt.Errorf("retrieving %s option failed: %w", utils.MetaRates, err)
@@ -263,7 +263,7 @@ func (cdrS *CDRServer) processEvents(ctx *context.Context, evs []*utils.CGREvent
}
for _, cgrEv := range cgrEvs {
acntS, err := GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Accounts,
acntS, err := engine.GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Accounts,
config.CDRsAccountsDftOpt, utils.MetaAccounts)
if err != nil {
return nil, fmt.Errorf("retrieving %s option failed: %w", utils.MetaAccounts, err)
@@ -311,7 +311,7 @@ func (cdrS *CDRServer) processEvents(ctx *context.Context, evs []*utils.CGREvent
}
for _, cgrEv := range cgrEvs {
store, err := GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Store,
store, err := engine.GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Store,
config.CDRsStoreDftOpt, utils.MetaStore)
if err != nil {
return nil, fmt.Errorf("retrieving %s option failed: %w", utils.MetaStore, err)
@@ -319,23 +319,29 @@ func (cdrS *CDRServer) processEvents(ctx *context.Context, evs []*utils.CGREvent
if !store {
continue
}
rerate, err := GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Rerate,
rerate, err := engine.GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Rerate,
config.CDRsRerateDftOpt, utils.MetaRerate)
if err != nil {
return nil, fmt.Errorf("retrieving %s option failed: %w", utils.MetaRerate, err)
}
cdrID := GetUniqueCDRID(cgrEv)
// Prevent 'assignment to entry in nil map' panic.
if cgrEv.APIOpts == nil {
cgrEv.APIOpts = make(map[string]any)
}
cgrEv.APIOpts[utils.MetaCDRID] = cdrID
if err := cdrS.db.SetCDR(ctx, cdrID, cgrEv, false); err != nil {
// Make sure *cdrID key exists in opts, as it's needed to identify CDRs during CRUD operations.
if _, ok := cgrEv.APIOpts[utils.MetaCDRID]; !ok {
cgrEv.APIOpts[utils.MetaCDRID] = utils.GetUniqueCDRID(cgrEv)
}
if err := cdrS.db.SetCDR(ctx, cgrEv, false); err != nil {
if err != utils.ErrExists || !rerate {
// ToDo: add refund logic
return nil, fmt.Errorf("storing CDR %s failed: %w", utils.ToJSON(cgrEv), err)
}
if err = cdrS.db.SetCDR(ctx, cdrID, cgrEv, true); err != nil {
if err = cdrS.db.SetCDR(ctx, cgrEv, true); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> updating CDR %+v",
utils.CDRs, err.Error(), utils.ToJSON(cgrEv)))
@@ -345,7 +351,7 @@ func (cdrS *CDRServer) processEvents(ctx *context.Context, evs []*utils.CGREvent
}
for _, cgrEv := range cgrEvs {
export, err := GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Export,
export, err := engine.GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Export,
config.CDRsExportDftOpt, utils.OptsCDRsExport)
if err != nil {
return nil, fmt.Errorf("retrieving %s option failed: %w", utils.OptsCDRsExport, err)
@@ -366,7 +372,7 @@ func (cdrS *CDRServer) processEvents(ctx *context.Context, evs []*utils.CGREvent
}
for _, cgrEv := range cgrEvs {
thdS, err := GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Thresholds,
thdS, err := engine.GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Thresholds,
config.CDRsThresholdsDftOpt, utils.MetaThresholds)
if err != nil {
return nil, fmt.Errorf("retrieving %s option failed: %w", utils.MetaThresholds, err)
@@ -383,7 +389,7 @@ func (cdrS *CDRServer) processEvents(ctx *context.Context, evs []*utils.CGREvent
}
for _, cgrEv := range cgrEvs {
stS, err := GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Stats,
stS, err := engine.GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Stats,
config.CDRsStatsDftOpt, utils.MetaStats)
if err != nil {
return nil, fmt.Errorf("retrieving %s option failed: %w", utils.MetaStats, err)
@@ -414,76 +420,6 @@ func (cdrS *CDRServer) processEvents(ctx *context.Context, evs []*utils.CGREvent
return outEvs, nil
}
// V1ProcessEvent will process the CGREvent
func (cdrS *CDRServer) V1ProcessEvent(ctx *context.Context, args *utils.CGREvent, reply *string) (err error) {
if args.ID == utils.EmptyString {
args.ID = utils.GenUUID()
}
if args.Tenant == utils.EmptyString {
args.Tenant = cdrS.cfg.GeneralCfg().DefaultTenant
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessEvent, args.ID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer Cache.Set(ctx, utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
if _, err = cdrS.processEvents(ctx, []*utils.CGREvent{args}); err != nil {
return
}
*reply = utils.OK
return nil
}
// V1ProcessEventWithGet has the same logic with V1ProcessEvent except it adds the proccessed events to the reply
func (cdrS *CDRServer) V1ProcessEventWithGet(ctx *context.Context, args *utils.CGREvent, evs *[]*utils.EventsWithOpts) (err error) {
if args.ID == utils.EmptyString {
args.ID = utils.GenUUID()
}
if args.Tenant == utils.EmptyString {
args.Tenant = cdrS.cfg.GeneralCfg().DefaultTenant
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessEventWithGet, args.ID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*evs = *cachedResp.Result.(*[]*utils.EventsWithOpts)
}
return cachedResp.Error
}
defer Cache.Set(ctx, utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: evs, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
var procEvs []*utils.EventsWithOpts
if procEvs, err = cdrS.processEvents(ctx, []*utils.CGREvent{args}); err != nil {
return
}
*evs = procEvs
return nil
}
func populateCost(cgrOpts map[string]any) *utils.Decimal {
// if the cost is already present, get out
if _, has := cgrOpts[utils.MetaCost]; has {
@@ -499,47 +435,3 @@ func populateCost(cgrOpts map[string]any) *utils.Decimal {
}
return nil
}
// V1ProcessStoredEvents processes stored events based on provided filters.
func (cdrS *CDRServer) V1ProcessStoredEvents(ctx *context.Context, args *CDRFilters, reply *string) (err error) {
if args.ID == utils.EmptyString {
args.ID = utils.GenUUID()
}
if args.Tenant == utils.EmptyString {
args.Tenant = cdrS.cfg.GeneralCfg().DefaultTenant
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessStoredEvents, args.ID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey)
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer Cache.Set(ctx, utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
fltrs, err := GetFilters(ctx, args.FilterIDs, args.Tenant, cdrS.dm)
if err != nil {
return fmt.Errorf("preparing filters failed: %w", err)
}
cdrs, err := cdrS.db.GetCDRs(ctx, fltrs, args.APIOpts)
if err != nil {
return fmt.Errorf("retrieving CDRs failed: %w", err)
}
_, err = cdrS.processEvents(ctx, CDRsToCGREvents(cdrs))
if err != nil && !errors.Is(err, utils.ErrPartiallyExecuted) {
return fmt.Errorf("processing events failed: %w", err)
}
*reply = utils.OK
return err
}

File diff suppressed because it is too large Load Diff

View File

@@ -37,6 +37,7 @@ import (
"github.com/cgrates/cgrates/actions"
"github.com/cgrates/cgrates/analyzers"
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/cdrs"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/ees"
@@ -63,7 +64,7 @@ func main() {
{"actions.go", "MetaActions", new(actions.ActionS), utils.EmptyString},
{"attributes.go", "MetaAttributes", new(engine.AttributeS), utils.EmptyString},
{"caches.go", "MetaCaches", engine.Cache, utils.EmptyString},
{"cdrs.go", "MetaCDRs", new(engine.CDRServer), utils.CDRs},
{"cdrs.go", "MetaCDRs", new(cdrs.CDRServer), utils.CDRs},
{"chargers.go", "MetaChargers", new(engine.ChargerS), utils.EmptyString},
{"config.go", "MetaConfig", new(config.CGRConfig), utils.ConfigS},
{"rates.go", "RateS", new(rates.RateS), utils.EmptyString},

View File

@@ -135,7 +135,7 @@ func testHTTPJsonMapExportEvent(t *testing.T) {
},
APIOpts: map[string]any{
utils.MetaOriginID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()),
utils.MetaEventType: utils.CDR,
utils.MetaEventType: utils.CDRKey,
utils.RunID: utils.MetaDefault,
},
},
@@ -167,7 +167,7 @@ func testHTTPJsonMapExportEvent(t *testing.T) {
},
APIOpts: map[string]any{
utils.MetaOriginID: utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()),
utils.MetaEventType: utils.CDR,
utils.MetaEventType: utils.CDRKey,
utils.RunID: utils.MetaDefault,
},
},
@@ -200,7 +200,7 @@ func testHTTPJsonMapExportEvent(t *testing.T) {
},
APIOpts: map[string]any{
utils.MetaOriginID: utils.Sha1("sdfwer", time.Unix(1383813745, 0).UTC().String()),
utils.MetaEventType: utils.CDR,
utils.MetaEventType: utils.CDRKey,
utils.RunID: utils.MetaDefault,
},
},
@@ -239,7 +239,7 @@ func testHTTPJsonMapExportEvent(t *testing.T) {
utils.Subject: utils.IfaceAsString(eventVoice.Event[utils.Subject]),
utils.Destination: utils.IfaceAsString(eventVoice.Event[utils.Destination]),
utils.Cost: utils.IfaceAsString(eventVoice.Event[utils.Cost]),
utils.EventType: utils.CDR,
utils.EventType: utils.CDRKey,
} {
if rcv := httpJsonMap[key]; rcv != strVal {
t.Errorf("Expected %+v, received: %+v", strVal, rcv)
@@ -262,7 +262,7 @@ func testHTTPJsonMapExportEvent(t *testing.T) {
utils.Subject: utils.IfaceAsString(eventData.Event[utils.Subject]),
utils.Destination: utils.IfaceAsString(eventData.Event[utils.Destination]),
utils.Cost: utils.IfaceAsString(eventData.Event[utils.Cost]),
utils.EventType: utils.CDR,
utils.EventType: utils.CDRKey,
} {
if rcv := httpJsonMap[key]; rcv != strVal {
t.Errorf("Expected %+v, received: %+v", strVal, rcv)
@@ -285,7 +285,7 @@ func testHTTPJsonMapExportEvent(t *testing.T) {
utils.Subject: utils.IfaceAsString(eventSMS.Event[utils.Subject]),
utils.Destination: utils.IfaceAsString(eventSMS.Event[utils.Destination]),
utils.Cost: utils.IfaceAsString(eventSMS.Event[utils.Cost]),
utils.EventType: utils.CDR,
utils.EventType: utils.CDRKey,
} {
if rcv := httpJsonMap[key]; rcv != strVal {
t.Errorf("Expected %+v, received: %+v", strVal, rcv)

View File

@@ -590,41 +590,6 @@ func TestCMDeadLock(t *testing.T) {
}
*/
func TestCMEnableDispatcher(t *testing.T) {
tmp := Cache
defer func() {
Cache = tmp
}()
Cache.Clear(nil)
cfg := config.NewDefaultCGRConfig()
cM := NewConnManager(cfg)
data := NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
dm := NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := NewFilterS(cfg, nil, dm)
Cache = NewCacheS(cfg, dm, nil, nil)
var storDB StorDB
storDBChan := make(chan StorDB, 1)
storDBChan <- storDB
newCDRSrv := NewCDRServer(cfg, dm, fltrs, nil, storDBChan)
srvcNames := []string{utils.AccountS, utils.ActionS, utils.AttributeS,
utils.CacheS, utils.ChargerS, utils.ConfigS, utils.DispatcherS,
utils.GuardianS, utils.RateS, utils.ResourceS, utils.RouteS,
utils.SessionS, utils.StatS, utils.ThresholdS, utils.CDRs,
utils.ReplicatorS, utils.EeS, utils.CoreS, utils.AnalyzerS,
utils.AdminS, utils.LoaderS, utils.ServiceManager}
for _, name := range srvcNames {
newSrvcWName, err := NewServiceWithName(newCDRSrv, name, true)
if err != nil {
t.Error(err)
}
cM.EnableDispatcher(newSrvcWName)
}
}
func TestCMGetInternalChan(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
@@ -657,80 +622,115 @@ func TestCMGetDispInternalChan(t *testing.T) {
}
func TestCMDisableDispatcher(t *testing.T) {
tmp := Cache
defer func() {
Cache = tmp
}()
Cache.Clear(nil)
cfg := config.NewDefaultCGRConfig()
// func TestCMEnableDispatcher(t *testing.T) {
// tmp := Cache
// defer func() {
// Cache = tmp
// }()
// Cache.Clear(nil)
// cfg := config.NewDefaultCGRConfig()
// cM := NewConnManager(cfg)
// data := NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
// dm := NewDataManager(data, cfg.CacheCfg(), nil)
// fltrs := NewFilterS(cfg, nil, dm)
// Cache = NewCacheS(cfg, dm, nil, nil)
// var storDB StorDB
// storDBChan := make(chan StorDB, 1)
// storDBChan <- storDB
// newCDRSrv := NewCDRServer(cfg, dm, fltrs, nil, storDBChan)
cM := &ConnManager{
cfg: cfg,
connCache: ltcache.NewCache(-1, 0, true, nil),
}
cM.connCache.Set("itmID1", "value of first item", nil)
data := NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
dm := NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := NewFilterS(cfg, nil, dm)
var storDB StorDB
storDBChan := make(chan StorDB, 1)
storDBChan <- storDB
newCDRSrv := NewCDRServer(cfg, dm, fltrs, nil, storDBChan)
newSrvcWName, err := NewServiceWithName(newCDRSrv, utils.AccountS, true)
if err != nil {
t.Error(err)
}
cM.EnableDispatcher(newSrvcWName)
// srvcNames := []string{utils.AccountS, utils.ActionS, utils.AttributeS,
// utils.CacheS, utils.ChargerS, utils.ConfigS, utils.DispatcherS,
// utils.GuardianS, utils.RateS, utils.ResourceS, utils.RouteS,
// utils.SessionS, utils.StatS, utils.ThresholdS, utils.CDRs,
// utils.ReplicatorS, utils.EeS, utils.CoreS, utils.AnalyzerS,
// utils.AdminS, utils.LoaderS, utils.ServiceManager}
Cache = NewCacheS(cfg, dm, cM, nil)
Cache.SetWithoutReplicate(utils.CacheRPCConnections, "itmID2",
"value of 2nd item", nil, true, utils.NonTransactional)
// for _, name := range srvcNames {
var exp []string
// newSrvcWName, err := NewServiceWithName(newCDRSrv, name, true)
// if err != nil {
// t.Error(err)
// }
// cM.EnableDispatcher(newSrvcWName)
cM.DisableDispatcher()
rcv1 := cM.connCache.GetItemIDs("itmID1")
rcv2 := Cache.GetItemIDs(utils.CacheRPCConnections, utils.EmptyString)
// }
// }
if !reflect.DeepEqual(rcv1, exp) {
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, rcv1)
} else if !reflect.DeepEqual(rcv2, exp) {
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, rcv2)
} else if cM.disp != nil || cM.dispIntCh != nil {
t.Errorf("\nexpected nil cM.disp and cM.dispIntCh, \nreceived cM.disp: <%+v>, \n cM.dispIntCh: <%+v>", cM.disp, cM.dispIntCh)
}
// func TestCMDisableDispatcher(t *testing.T) {
// tmp := Cache
// defer func() {
// Cache = tmp
// }()
// Cache.Clear(nil)
// cfg := config.NewDefaultCGRConfig()
}
// cM := &ConnManager{
// cfg: cfg,
// connCache: ltcache.NewCache(-1, 0, true, nil),
// }
// cM.connCache.Set("itmID1", "value of first item", nil)
// data := NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
// dm := NewDataManager(data, cfg.CacheCfg(), nil)
// fltrs := NewFilterS(cfg, nil, dm)
// var storDB StorDB
// storDBChan := make(chan StorDB, 1)
// storDBChan <- storDB
// newCDRSrv := NewCDRServer(cfg, dm, fltrs, nil, storDBChan)
// newSrvcWName, err := NewServiceWithName(newCDRSrv, utils.AccountS, true)
// if err != nil {
// t.Error(err)
// }
// cM.EnableDispatcher(newSrvcWName)
func TestCMgetInternalConnChanFromDisp(t *testing.T) {
tmp := Cache
defer func() {
Cache = tmp
}()
Cache.Clear(nil)
cfg := config.NewDefaultCGRConfig()
// Cache = NewCacheS(cfg, dm, cM, nil)
// Cache.SetWithoutReplicate(utils.CacheRPCConnections, "itmID2",
// "value of 2nd item", nil, true, utils.NonTransactional)
cM := &ConnManager{
cfg: cfg,
connCache: ltcache.NewCache(-1, 0, true, nil),
}
cM.connCache.Set("itmID1", "value of first item", nil)
data := NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
dm := NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := NewFilterS(cfg, nil, dm)
var storDB StorDB
storDBChan := make(chan StorDB, 1)
storDBChan <- storDB
newCDRSrv := NewCDRServer(cfg, dm, fltrs, nil, storDBChan)
newSrvcWName, err := NewServiceWithName(newCDRSrv, utils.AccountS, true)
if err != nil {
t.Error(err)
}
cM.EnableDispatcher(newSrvcWName)
// var exp []string
if rcv, ok := cM.getInternalConnChan(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts)); !ok {
t.Errorf("Unexpected error getting internalConnChan, Received <%+v>", rcv)
}
// cM.DisableDispatcher()
// rcv1 := cM.connCache.GetItemIDs("itmID1")
// rcv2 := Cache.GetItemIDs(utils.CacheRPCConnections, utils.EmptyString)
}
// if !reflect.DeepEqual(rcv1, exp) {
// t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, rcv1)
// } else if !reflect.DeepEqual(rcv2, exp) {
// t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, rcv2)
// } else if cM.disp != nil || cM.dispIntCh != nil {
// t.Errorf("\nexpected nil cM.disp and cM.dispIntCh, \nreceived cM.disp: <%+v>, \n cM.dispIntCh: <%+v>", cM.disp, cM.dispIntCh)
// }
// }
// func TestCMgetInternalConnChanFromDisp(t *testing.T) {
// tmp := Cache
// defer func() {
// Cache = tmp
// }()
// Cache.Clear(nil)
// cfg := config.NewDefaultCGRConfig()
// cM := &ConnManager{
// cfg: cfg,
// connCache: ltcache.NewCache(-1, 0, true, nil),
// }
// cM.connCache.Set("itmID1", "value of first item", nil)
// data := NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
// dm := NewDataManager(data, cfg.CacheCfg(), nil)
// fltrs := NewFilterS(cfg, nil, dm)
// var storDB StorDB
// storDBChan := make(chan StorDB, 1)
// storDBChan <- storDB
// newCDRSrv := NewCDRServer(cfg, dm, fltrs, nil, storDBChan)
// newSrvcWName, err := NewServiceWithName(newCDRSrv, utils.AccountS, true)
// if err != nil {
// t.Error(err)
// }
// cM.EnableDispatcher(newSrvcWName)
// if rcv, ok := cM.getInternalConnChan(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts)); !ok {
// t.Errorf("Unexpected error getting internalConnChan, Received <%+v>", rcv)
// }
// }

View File

@@ -109,7 +109,7 @@ type DataDBDriver interface {
type StorDB interface {
Storage
SetCDR(*context.Context, string, *utils.CGREvent, bool) error
GetCDRs(*context.Context, []*Filter, map[string]interface{}) ([]*CDR, error)
GetCDRs(*context.Context, []*Filter, map[string]interface{}) ([]*utils.CDR, error)
RemoveCDRs(*context.Context, []*Filter) error
}

View File

@@ -66,7 +66,7 @@ func (iDB *InternalDB) SetCDR(_ *context.Context, cdrID string, cgrEv *utils.CGR
return nil
}
func (iDB *InternalDB) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]interface{}) (cdrs []*CDR, err error) {
func (iDB *InternalDB) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]interface{}) (cdrs []*utils.CDR, err error) {
pairFltrs := make(map[string][]string)
notPairFltrs := make(map[string][]string)
notIndexed := []*FilterRule{}
@@ -154,9 +154,9 @@ func (iDB *InternalDB) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map
return nil, utils.ErrNotFound
}
// convert from event into CDRs
cdrs = make([]*CDR, len(events))
cdrs = make([]*utils.CDR, len(events))
for i, event := range events {
cdrs[i] = &CDR{
cdrs[i] = &utils.CDR{
Tenant: event.Tenant,
Opts: event.APIOpts,
Event: event.Event,

View File

@@ -121,7 +121,7 @@ func (ms *MongoStorage) SetCDR(ctx *context.Context, cdrID string, cdr *utils.CG
_, err := ms.getCol(ColCDRs).InsertOne(
sctx,
&CDR{
&utils.CDR{
Tenant: cdr.Tenant,
Opts: cdr.APIOpts,
Event: cdr.Event,
@@ -169,7 +169,7 @@ func isMongoDuplicateError(err error) bool {
return false
}
func (ms *MongoStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]interface{}) (cdrs []*CDR, err error) {
func (ms *MongoStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]interface{}) (cdrs []*utils.CDR, err error) {
fltrs := make(bson.M)
for _, fltr := range qryFltr {
for _, rule := range fltr.Rules {
@@ -209,7 +209,7 @@ func (ms *MongoStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts ma
return err
}
for cur.Next(sctx) {
cdr := CDR{}
cdr := utils.CDR{}
err := cur.Decode(&cdr)
if err != nil {
return err
@@ -372,7 +372,7 @@ func (ms *MongoStorage) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err
}
defer cur.Close(sctx)
for cur.Next(sctx) {
cdr := CDR{}
cdr := utils.CDR{}
if err := cur.Decode(&cdr); err != nil {
return err
}

View File

@@ -146,7 +146,7 @@ func (sqls *SQLStorage) SetCDR(_ *context.Context, cdrID string, cdr *utils.CGRE
if tx.Error != nil {
return tx.Error
}
cdrTable := &CDRSQLTable{
cdrTable := &utils.CDRSQLTable{
Tenant: cdr.Tenant,
Opts: cdr.APIOpts,
Event: cdr.Event,
@@ -166,9 +166,9 @@ func (sqls *SQLStorage) SetCDR(_ *context.Context, cdrID string, cdr *utils.CGRE
return tx.Error
}
updated := tx.Model(&CDRSQLTable{}).Where(
updated := tx.Model(&utils.CDRSQLTable{}).Where(
sqls.cdrIDQuery(cdrID)).Updates(
CDRSQLTable{Opts: cdr.APIOpts, Event: cdr.Event, UpdatedAt: time.Now()})
utils.CDRSQLTable{Opts: cdr.APIOpts, Event: cdr.Event, UpdatedAt: time.Now()})
if updated.Error != nil {
tx.Rollback()
return updated.Error
@@ -180,7 +180,7 @@ func (sqls *SQLStorage) SetCDR(_ *context.Context, cdrID string, cdr *utils.CGRE
// GetCDRs has ability to get the filtered CDRs, count them or simply return them
// qryFltr.Unscoped will ignore soft deletes or delete records permanently
func (sqls *SQLStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]interface{}) (cdrs []*CDR, err error) {
func (sqls *SQLStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]interface{}) (cdrs []*utils.CDR, err error) {
q := sqls.db.Table(utils.CDRsTBL)
var excludedCdrQueryFilterTypes []*FilterRule
for _, fltr := range qryFltr {
@@ -221,7 +221,7 @@ func (sqls *SQLStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts ma
q = q.Offset(offset)
// Execute query
results := make([]*CDRSQLTable, 0)
results := make([]*utils.CDRSQLTable, 0)
if err = q.Find(&results).Error; err != nil {
return
}
@@ -229,11 +229,11 @@ func (sqls *SQLStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts ma
return nil, utils.ErrNotFound
}
//convert into CDR
resultCdr := make([]*CDR, 0, len(results))
resultCdr := make([]*utils.CDR, 0, len(results))
for _, val := range results {
// here we wil do our filtration, meaning that we will filter those cdrs who cannot be filtered in the databes eg: *ai, *rsr..
if len(excludedCdrQueryFilterTypes) != 0 {
newCdr := &CDR{
newCdr := &utils.CDR{
Tenant: val.Tenant,
Opts: val.Opts,
Event: val.Event,
@@ -252,7 +252,7 @@ func (sqls *SQLStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts ma
continue
}
}
resultCdr = append(resultCdr, &CDR{
resultCdr = append(resultCdr, &utils.CDR{
Tenant: val.Tenant,
Opts: val.Opts,
Event: val.Event,
@@ -309,7 +309,7 @@ func (sqls *SQLStorage) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err
return
}
// in the other case, if we have such filters, check the results based on those filters
results := make([]*CDRSQLTable, 0)
results := make([]*utils.CDRSQLTable, 0)
if err = q.Find(&results).Error; err != nil {
return
}
@@ -322,7 +322,7 @@ func (sqls *SQLStorage) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err
remCdr := make([]string, 0, len(results)) // we will keep the *cdrID of every CDR taht matched the those filters
for _, cdr := range results {
if len(excludedCdrQueryFilterTypes) != 0 {
newCdr := &CDR{
newCdr := &utils.CDR{
Tenant: cdr.Tenant,
Opts: cdr.Opts,
Event: cdr.Event,

View File

@@ -105,3 +105,16 @@ func composeMongoURI(scheme, host, port, db, user, pass string) string {
}
return uri
}
// checkNestedFields checks if there are elements or values nested (e.g *opts.*rateSCost.Cost)
func checkNestedFields(elem string, values []string) bool {
if len(strings.Split(elem, utils.NestingSep)) > 2 {
return true
}
for _, val := range values {
if len(strings.Split(val, utils.NestingSep)) > 2 {
return true
}
}
return false
}

View File

@@ -26,6 +26,7 @@ import (
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/cdrs"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -1085,7 +1086,7 @@ func TestCDRProcessRatesCostForEvent(t *testing.T) {
storDB := engine.NewInternalDB(nil, nil, nil)
storDBChan := make(chan engine.StorDB, 1)
storDBChan <- storDB
cdrs := engine.NewCDRServer(cfg, dm, filters, connMgr, storDBChan)
cdrs := cdrs.NewCDRServer(cfg, dm, filters, connMgr, storDBChan)
ratesConns := make(chan birpc.ClientConnector, 1)
rateSrv, err := birpc.NewServiceWithMethodsRename(NewRateS(cfg, filters, dm), utils.RateSv1, true, func(key string) (newKey string) {
return strings.TrimPrefix(key, utils.V1Prfx)

View File

@@ -25,6 +25,7 @@ import (
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/cdrs"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
@@ -61,7 +62,7 @@ type CDRServer struct {
filterSChan chan *engine.FilterS
server *cores.Server
cdrS *engine.CDRServer
cdrS *cdrs.CDRServer
connChan chan birpc.ClientConnector
connMgr *engine.ConnManager
@@ -95,7 +96,7 @@ func (cdrService *CDRServer) Start(ctx *context.Context, _ context.CancelFunc) (
cdrService.Lock()
defer cdrService.Unlock()
cdrService.cdrS = engine.NewCDRServer(cdrService.cfg, datadb, filterS, cdrService.connMgr, storDBChan)
cdrService.cdrS = cdrs.NewCDRServer(cdrService.cfg, datadb, filterS, cdrService.connMgr, storDBChan)
go cdrService.cdrS.ListenAndServe(cdrService.stopChan)
runtime.Gosched()
utils.Logger.Info("Registering CDRS RPC service.")

View File

@@ -22,6 +22,7 @@ import (
"testing"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/cdrs"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
@@ -59,7 +60,7 @@ func TestCdrsCoverage(t *testing.T) {
stopChan: make(chan struct{}, 1),
anz: anz,
srvDep: srvDep,
cdrS: &engine.CDRServer{},
cdrS: &cdrs.CDRServer{},
}
cdrS2.connChan <- &testMockClients{}
cdrS2.stopChan <- struct{}{}

View File

@@ -16,17 +16,14 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
package utils
import (
"database/sql/driver"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/cgrates/cgrates/utils"
)
type CDR struct {
@@ -41,26 +38,26 @@ type CDR struct {
type CDRSQLTable struct {
ID int64 // this is used for incrementing while seting
Tenant string
Opts JSON `gorm:"type:jsonb"` //string
Event JSON `gorm:"type:jsonb"` //string
Opts JSONB `gorm:"type:jsonb"` //string
Event JSONB `gorm:"type:jsonb"` //string
CreatedAt time.Time `json:",omitempty"`
UpdatedAt time.Time `json:",omitempty"`
DeletedAt *time.Time `json:",omitempty"`
}
func (CDRSQLTable) TableName() string {
return utils.CDRsTBL
return CDRsTBL
}
// JSON type for storing maps of events and opts into gorm columns as jsob type
type JSON map[string]interface{}
// JSONB type for storing maps of events and opts into gorm columns as jsob type
type JSONB map[string]interface{}
func (j JSON) GormDataType() string {
func (j JSONB) GormDataType() string {
return "JSONB"
}
// Scan scan value into Jsonb, implements sql.Scanner interface
func (j *JSON) Scan(value interface{}) (err error) {
func (j *JSONB) Scan(value interface{}) (err error) {
switch v := value.(type) {
case []byte:
return json.Unmarshal(v, &j)
@@ -72,52 +69,38 @@ func (j *JSON) Scan(value interface{}) (err error) {
}
// Value return json value, implement driver.Valuer interface
func (j JSON) Value() (driver.Value, error) {
func (j JSONB) Value() (driver.Value, error) {
return json.Marshal(j)
}
func GetUniqueCDRID(cgrEv *utils.CGREvent) string {
if chargeId, ok := cgrEv.APIOpts[utils.MetaChargeID]; ok {
return utils.IfaceAsString(chargeId)
func GetUniqueCDRID(cgrEv *CGREvent) string {
if chargeId, ok := cgrEv.APIOpts[MetaChargeID]; ok {
return IfaceAsString(chargeId)
}
if originID, ok := cgrEv.APIOpts[utils.MetaOriginID]; ok {
return utils.IfaceAsString(originID)
if originID, ok := cgrEv.APIOpts[MetaOriginID]; ok {
return IfaceAsString(originID)
}
return utils.UUIDSha1Prefix()
return UUIDSha1Prefix()
}
func (cdr *CDR) CGREvent() *utils.CGREvent {
return &utils.CGREvent{
func (cdr *CDR) CGREvent() *CGREvent {
return &CGREvent{
Tenant: cdr.Tenant,
ID: utils.Sha1(),
ID: Sha1(),
Event: cdr.Event,
APIOpts: cdr.Opts,
}
}
// CDRsToCGREvents converts a slice of *CDR to a slice of *utils.CGREvent.
func CDRsToCGREvents(cdrs []*CDR) []*utils.CGREvent {
cgrEvs := make([]*utils.CGREvent, 0, len(cdrs))
func CDRsToCGREvents(cdrs []*CDR) []*CGREvent {
cgrEvs := make([]*CGREvent, 0, len(cdrs))
for _, cdr := range cdrs {
cgrEvs = append(cgrEvs, cdr.CGREvent())
}
return cgrEvs
}
// checkNestedFields checks if there are elements or values nested (e.g *opts.*rateSCost.Cost)
func checkNestedFields(elem string, values []string) bool {
if len(strings.Split(elem, utils.NestingSep)) > 2 {
return true
}
for _, val := range values {
if len(strings.Split(val, utils.NestingSep)) > 2 {
return true
}
}
return false
}
type CDRFilters struct {
Tenant string
ID string

View File

@@ -472,7 +472,7 @@ const (
BalanceUpdate = "BalanceUpdate"
StatUpdate = "StatUpdate"
ResourceUpdate = "ResourceUpdate"
CDR = "CDR"
CDRKey = "CDR"
CDRs = "CDRs"
ExpiryTime = "ExpiryTime"
AllowNegative = "AllowNegative"