From 61d2b4e9229f98fdfe0747280ef3d06400116e68 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 8 Oct 2020 17:35:05 +0300 Subject: [PATCH] Added *eventType to opts for process threhold --- apier/v1/filterindexecache_it_test.go | 51 +++++++++++++++++ apier/v1/replicate_it_test.go | 3 + apier/v1/thresholds_it_test.go | 54 +++++++++++++----- data/tariffplans/oldtutorial/Filters.csv | 2 +- data/tariffplans/precache/Filters.csv | 2 +- data/tariffplans/testtp/Filters.csv | 2 +- ees/ee.go | 8 +-- ees/eereq.go | 10 +++- ees/ees.go | 2 +- ees/elastic.go | 4 +- ees/filecsv.go | 10 ++-- ees/filefwv.go | 8 +-- ees/httpjsonmap.go | 4 +- ees/httppost.go | 4 +- ees/virtualee.go | 4 +- engine/account.go | 34 +++++++----- engine/action.go | 42 +++++++------- engine/balances.go | 43 +++++++++------ engine/cdrs.go | 5 +- engine/resources.go | 19 +++++-- engine/routes.go | 11 ++-- engine/routes_test.go | 12 ++-- engine/stats.go | 9 ++- engine/thresholds.go | 7 ++- engine/z_actions_it_test.go | 51 +++++++++-------- engine/z_resources_test.go | 70 +++++++++++++----------- general_tests/session3_it_test.go | 5 +- go.sum | 3 +- rates/rates.go | 5 +- utils/consts.go | 3 +- 30 files changed, 318 insertions(+), 169 deletions(-) diff --git a/apier/v1/filterindexecache_it_test.go b/apier/v1/filterindexecache_it_test.go index f8bc536ef..0a86939c2 100644 --- a/apier/v1/filterindexecache_it_test.go +++ b/apier/v1/filterindexecache_it_test.go @@ -153,6 +153,9 @@ func testV1FIdxCaProcessEventWithNotFound(t *testing.T) { utils.Account: "1001", }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.BalanceUpdate, + }, }, } var thIDs []string @@ -223,6 +226,9 @@ func testV1FIdxCaSetThresholdProfile(t *testing.T) { utils.Account: "1001", }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.BalanceUpdate, + }, }, } var thIDs []string @@ -251,6 +257,9 @@ func testV1FIdxCaGetThresholdFromTP(t *testing.T) { utils.Units: 12.3, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.BalanceUpdate, + }, }, } var thIDs []string @@ -323,6 +332,9 @@ func testV1FIdxCaUpdateThresholdProfile(t *testing.T) { utils.Account: "1001", }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, + }, }, } var thIDs []string @@ -343,6 +355,9 @@ func testV1FIdxCaUpdateThresholdProfile(t *testing.T) { utils.Account: "1002", }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, + }, }, } eIDs = []string{"TEST_PROFILE1"} @@ -412,6 +427,9 @@ func testV1FIdxCaUpdateThresholdProfileFromTP(t *testing.T) { utils.EventType: utils.BalanceUpdate, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.BalanceUpdate, + }, }, } var thIDs []string @@ -430,6 +448,9 @@ func testV1FIdxCaUpdateThresholdProfileFromTP(t *testing.T) { utils.EventType: utils.BalanceUpdate, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.BalanceUpdate, + }, }, } eIDs := []string{"THD_ACNT_BALANCE_1"} @@ -453,6 +474,9 @@ func testV1FIdxCaRemoveThresholdProfile(t *testing.T) { utils.EventType: utils.AccountUpdate, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, + }, }, } var thIDs []string @@ -473,6 +497,9 @@ func testV1FIdxCaRemoveThresholdProfile(t *testing.T) { utils.EventType: utils.BalanceUpdate, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.BalanceUpdate, + }, }, } eIDs = []string{"THD_ACNT_BALANCE_1"} @@ -531,6 +558,9 @@ func testV1FIdxCaGetStatQueuesWithNotFound(t *testing.T) { utils.Account: "1001", }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, + }, }, } if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &reply); err == nil || @@ -608,6 +638,9 @@ func testV1FIdxCaSetStatQueueProfile(t *testing.T) { "Val": 10, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, + }, }, } var reply []string @@ -675,6 +708,9 @@ func testV1FIdxCaGetStatQueuesFromTP(t *testing.T) { utils.Cost: 12.1, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, + }, }, } if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, &tEv, &reply); err != nil { @@ -695,6 +731,9 @@ func testV1FIdxCaGetStatQueuesFromTP(t *testing.T) { utils.Cost: 12.1, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, + }, }, } if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, &tEv2, &reply); err != nil { @@ -772,6 +811,9 @@ func testV1FIdxCaUpdateStatQueueProfile(t *testing.T) { "Val": 10, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.BalanceUpdate, + }, }, } if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &reply); err != nil { @@ -835,6 +877,9 @@ func testV1FIdxCaUpdateStatQueueProfileFromTP(t *testing.T) { utils.Cost: 12.1, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, + }, }, } var ids []string @@ -861,6 +906,9 @@ func testV1FIdxCaRemoveStatQueueProfile(t *testing.T) { "Val": 10, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.BalanceUpdate, + }, }, } if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &reply); err != nil { @@ -882,6 +930,9 @@ func testV1FIdxCaRemoveStatQueueProfile(t *testing.T) { utils.Cost: 12.1, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, + }, }, } if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv2, &reply); err != nil { diff --git a/apier/v1/replicate_it_test.go b/apier/v1/replicate_it_test.go index 3acbb677e..a1a1d67cc 100644 --- a/apier/v1/replicate_it_test.go +++ b/apier/v1/replicate_it_test.go @@ -1234,6 +1234,9 @@ func testInternalReplicateITThreshold(t *testing.T) { utils.Disabled: false, utils.Units: 12.3}, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, + }, }, } //set Actions diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index a285dd278..d821b4b7f 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -52,6 +52,9 @@ var ( utils.Disabled: false, utils.Units: 12.3}, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, + }, }, }, { @@ -67,6 +70,9 @@ var ( utils.ExpiryTime: time.Date(2009, 11, 10, 23, 00, 0, 0, time.UTC), }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.BalanceUpdate, + }, }, }, { @@ -86,6 +92,9 @@ var ( "PDD": "2s", }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.StatUpdate, + }, }, }, { @@ -102,6 +111,9 @@ var ( "TCD": "1h", }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.StatUpdate, + }, }, }, { @@ -117,19 +129,8 @@ var ( "TCD": "3h1s", }, }, - }, - }, - { - CGREventWithOpts: &utils.CGREventWithOpts{ - CGREvent: &utils.CGREvent{ // hitting THD_RES_1 - Tenant: "cgrates.org", - ID: "event6", - Event: map[string]interface{}{ - utils.EventType: utils.ResourceUpdate, - utils.Account: "1002", - utils.ResourceID: "RES_GRP_1", - utils.Usage: 10.0, - }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.StatUpdate, }, }, }, @@ -145,6 +146,9 @@ var ( utils.Usage: 10.0, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.ResourceUpdate, + }, }, }, { @@ -159,6 +163,26 @@ var ( utils.Usage: 10.0, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.ResourceUpdate, + }, + }, + }, + { + CGREventWithOpts: &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ // hitting THD_RES_1 + Tenant: "cgrates.org", + ID: "event6", + Event: map[string]interface{}{ + utils.EventType: utils.ResourceUpdate, + utils.Account: "1002", + utils.ResourceID: "RES_GRP_1", + utils.Usage: 10.0, + }, + }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.ResourceUpdate, + }, }, }, { @@ -167,7 +191,6 @@ var ( Tenant: "cgrates.org", ID: "cdrev1", Event: map[string]interface{}{ - utils.EventType: utils.CDR, "field_extr1": "val_extr1", "fieldextr2": "valextr2", utils.CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), @@ -191,6 +214,9 @@ var ( utils.COST: -1.0, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.CDR, + }, }, }, } diff --git a/data/tariffplans/oldtutorial/Filters.csv b/data/tariffplans/oldtutorial/Filters.csv index 01427eb0a..9e07b08ec 100644 --- a/data/tariffplans/oldtutorial/Filters.csv +++ b/data/tariffplans/oldtutorial/Filters.csv @@ -29,5 +29,5 @@ cgrates.org,FLTR_DST_FS,*string,~*req.Account,1001;1002;dan,2014-07-29T15:00:00Z cgrates.org,FLTR_DST_FS,*destinations,~*req.Destination,DST_FS, cgrates.org,FLTR_RES_GR3,*string,~*req.Account,3001,2014-07-29T15:00:00Z cgrates.org,FLTR_STS1,*string,~*req.Account,1001;1002,2014-07-29T15:00:00Z -cgrates.org,FLTR_CDR_UPDATE,*string,~*req.EventType,CDR,2014-07-29T15:00:00Z +cgrates.org,FLTR_CDR_UPDATE,*string,~*opts.*eventType,CDR,2014-07-29T15:00:00Z diff --git a/data/tariffplans/precache/Filters.csv b/data/tariffplans/precache/Filters.csv index 01427eb0a..9e07b08ec 100644 --- a/data/tariffplans/precache/Filters.csv +++ b/data/tariffplans/precache/Filters.csv @@ -29,5 +29,5 @@ cgrates.org,FLTR_DST_FS,*string,~*req.Account,1001;1002;dan,2014-07-29T15:00:00Z cgrates.org,FLTR_DST_FS,*destinations,~*req.Destination,DST_FS, cgrates.org,FLTR_RES_GR3,*string,~*req.Account,3001,2014-07-29T15:00:00Z cgrates.org,FLTR_STS1,*string,~*req.Account,1001;1002,2014-07-29T15:00:00Z -cgrates.org,FLTR_CDR_UPDATE,*string,~*req.EventType,CDR,2014-07-29T15:00:00Z +cgrates.org,FLTR_CDR_UPDATE,*string,~*opts.*eventType,CDR,2014-07-29T15:00:00Z diff --git a/data/tariffplans/testtp/Filters.csv b/data/tariffplans/testtp/Filters.csv index dabdc7316..cbfab7979 100644 --- a/data/tariffplans/testtp/Filters.csv +++ b/data/tariffplans/testtp/Filters.csv @@ -27,4 +27,4 @@ cgrates.org,FLTR_RES_1,*gte,~*req.Usage,10.0, cgrates.org,FLTR_DST_FS,*destinations,~*req.Destination,DST_FS,2014-07-29T15:00:00Z cgrates.org,FLTR_RES_GR3,*string,~*req.Account,3001,2014-07-29T15:00:00Z cgrates.org,FLTR_STS1,*string,~*req.Account,1001;1002,2014-07-29T15:00:00Z -cgrates.org,FLTR_CDR_UPDATE,*string,~*req.EventType,CDR,2014-07-29T15:00:00Z +cgrates.org,FLTR_CDR_UPDATE,*string,~*opts.*eventType,CDR,2014-07-29T15:00:00Z diff --git a/ees/ee.go b/ees/ee.go index ae694aa92..173e43b31 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -27,10 +27,10 @@ import ( ) type EventExporter interface { - ID() string // return the exporter identificator - ExportEvent(cgrEv *utils.CGREvent) (err error) // called on each event to be exported - OnEvicted(itmID string, value interface{}) // called when the exporter needs to terminate - GetMetrics() utils.MapStorage // called to get metrics + ID() string // return the exporter identificator + ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) // called on each event to be exported + OnEvicted(itmID string, value interface{}) // called when the exporter needs to terminate + GetMetrics() utils.MapStorage // called to get metrics } // NewEventExporter produces exporters diff --git a/ees/eereq.go b/ees/eereq.go index e63ecd877..949207307 100644 --- a/ees/eereq.go +++ b/ees/eereq.go @@ -32,9 +32,9 @@ import ( ) // NewEventExporterRequest returns a new EventExporterRequest -func NewEventExporterRequest(req utils.DataProvider, dc utils.MapStorage, - tntTpl config.RSRParsers, - dfltTenant, timezone string, filterS *engine.FilterS) (eeR *EventExporterRequest) { +func NewEventExporterRequest(req utils.DataProvider, dc, opts utils.MapStorage, + tntTpl config.RSRParsers, dfltTenant, timezone string, + filterS *engine.FilterS) (eeR *EventExporterRequest) { eeR = &EventExporterRequest{ req: req, tmz: timezone, @@ -43,6 +43,7 @@ func NewEventExporterRequest(req utils.DataProvider, dc utils.MapStorage, hdr: utils.NewOrderedNavigableMap(), trl: utils.NewOrderedNavigableMap(), dc: dc, + opts: opts, } // populate tenant if tntIf, err := eeR.ParseField( @@ -66,6 +67,7 @@ type EventExporterRequest struct { hdr *utils.OrderedNavigableMap // Used in reply to access the request that was send trl *utils.OrderedNavigableMap // Used in reply to access the request that was send dc utils.MapStorage + opts utils.MapStorage filterS *engine.FilterS } @@ -95,6 +97,8 @@ func (eeR *EventExporterRequest) FieldAsInterface(fldPath []string) (val interfa } case utils.MetaDC: val, err = eeR.dc.FieldAsInterface(fldPath[1:]) + case utils.MetaOpts: + val, err = eeR.opts.FieldAsInterface(fldPath[1:]) } if err != nil { return diff --git a/ees/ees.go b/ees/ees.go index e0a26bc24..07f508c9a 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -218,7 +218,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma utils.EventExporterS, ee.ID())) } go func(evict, sync bool, ee EventExporter) { - if err := ee.ExportEvent(cgrEv.CGREvent); err != nil { + if err := ee.ExportEvent(cgrEv.CGREventWithOpts); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> with id <%s>, error: <%s>", utils.EventExporterS, ee.ID(), err.Error())) diff --git a/ees/elastic.go b/ees/elastic.go index df0ac0da8..4506a29b6 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -123,7 +123,7 @@ func (eEe *ElasticEe) OnEvicted(_ string, _ interface{}) { } // ExportEvent implements EventExporter -func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { +func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) { eEe.Lock() defer func() { if err != nil { @@ -140,7 +140,7 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { valMp = cgrEv.Event } else { req := utils.MapStorage(cgrEv.Event) - eeReq := NewEventExporterRequest(req, eEe.dc, + eeReq := NewEventExporterRequest(req, eEe.dc, cgrEv.Opts, eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Tenant, eEe.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone, diff --git a/ees/filecsv.go b/ees/filecsv.go index 1f5470191..04c096ec6 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -91,7 +91,7 @@ func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) { } // ExportEvent implements EventExporter -func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { +func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) { fCsv.Lock() defer func() { if err != nil { @@ -109,7 +109,8 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { var csvRecord []string req := utils.MapStorage(cgrEv.Event) - eeReq := NewEventExporterRequest(req, fCsv.dc, fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Tenant, + eeReq := NewEventExporterRequest(req, fCsv.dc, cgrEv.Opts, + fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Tenant, fCsv.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone, fCsv.cgrCfg.GeneralCfg().DefaultTimezone), @@ -136,7 +137,8 @@ func (fCsv *FileCSVee) composeHeader() (err error) { return } var csvRecord []string - eeReq := NewEventExporterRequest(nil, fCsv.dc, fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Tenant, + eeReq := NewEventExporterRequest(nil, fCsv.dc, nil, + fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Tenant, fCsv.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone, fCsv.cgrCfg.GeneralCfg().DefaultTimezone), @@ -160,7 +162,7 @@ func (fCsv *FileCSVee) composeTrailer() (err error) { return } var csvRecord []string - eeReq := NewEventExporterRequest(nil, fCsv.dc, + eeReq := NewEventExporterRequest(nil, fCsv.dc, nil, fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Tenant, fCsv.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone, diff --git a/ees/filefwv.go b/ees/filefwv.go index 6df052cdd..781987008 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -82,7 +82,7 @@ func (fFwv *FileFWVee) OnEvicted(_ string, _ interface{}) { } // ExportEvent implements EventExporter -func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { +func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) { fFwv.Lock() defer func() { if err != nil { @@ -95,7 +95,7 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { fFwv.dc[utils.NumberOfEvents] = fFwv.dc[utils.NumberOfEvents].(int64) + 1 var records []string req := utils.MapStorage(cgrEv.Event) - eeReq := NewEventExporterRequest(req, fFwv.dc, + eeReq := NewEventExporterRequest(req, fFwv.dc, cgrEv.Opts, fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Tenant, fFwv.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone, @@ -128,7 +128,7 @@ func (fFwv *FileFWVee) composeHeader() (err error) { return } var records []string - eeReq := NewEventExporterRequest(nil, fFwv.dc, + eeReq := NewEventExporterRequest(nil, fFwv.dc, nil, fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Tenant, fFwv.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone, @@ -158,7 +158,7 @@ func (fFwv *FileFWVee) composeTrailer() (err error) { return } var records []string - eeReq := NewEventExporterRequest(nil, fFwv.dc, + eeReq := NewEventExporterRequest(nil, fFwv.dc, nil, fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Tenant, fFwv.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone, diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index 4b59c17ab..da7c92074 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -85,7 +85,7 @@ func (pstrEE *PosterJSONMapEE) OnEvicted(string, interface{}) { } // ExportEvent implements EventExporter -func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) { +func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) { pstrEE.Lock() defer func() { if err != nil { @@ -102,7 +102,7 @@ func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) { if len(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ContentFields()) == 0 { valMp = cgrEv.Event } else { - eeReq := NewEventExporterRequest(utils.MapStorage(cgrEv.Event), pstrEE.dc, + eeReq := NewEventExporterRequest(utils.MapStorage(cgrEv.Event), pstrEE.dc, cgrEv.Opts, pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Tenant, pstrEE.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Timezone, diff --git a/ees/httppost.go b/ees/httppost.go index 103af2ce2..c5e93dc97 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -61,7 +61,7 @@ func (httpPost *HTTPPost) OnEvicted(_ string, _ interface{}) { } // ExportEvent implements EventExporter -func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) { +func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) { httpPost.Lock() defer func() { if err != nil { @@ -80,7 +80,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) { } } else { req := utils.MapStorage(cgrEv.Event) - eeReq := NewEventExporterRequest(req, httpPost.dc, + eeReq := NewEventExporterRequest(req, httpPost.dc, cgrEv.Opts, httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Tenant, httpPost.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Timezone, diff --git a/ees/virtualee.go b/ees/virtualee.go index a5f2ffe87..9bc2e5f1f 100644 --- a/ees/virtualee.go +++ b/ees/virtualee.go @@ -60,7 +60,7 @@ func (vEe *VirtualEe) OnEvicted(_ string, _ interface{}) { } // ExportEvent implements EventExporter -func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { +func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) { vEe.Lock() defer func() { if err != nil { @@ -73,7 +73,7 @@ func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { vEe.dc[utils.NumberOfEvents] = vEe.dc[utils.NumberOfEvents].(int64) + 1 req := utils.MapStorage(cgrEv.Event) - eeReq := NewEventExporterRequest(req, vEe.dc, + eeReq := NewEventExporterRequest(req, vEe.dc, cgrEv.Opts, vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Tenant, vEe.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Timezone, diff --git a/engine/account.go b/engine/account.go index 1415814aa..20f8dd5f3 100644 --- a/engine/account.go +++ b/engine/account.go @@ -555,6 +555,9 @@ func (acc *Account) debitCreditBalance(cd *CallDescriptor, count bool, dryRun bo utils.Units: defaultBalance.Value, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.BalanceUpdate, + }, }, } var tIDs []string @@ -1109,21 +1112,27 @@ func (acc *Account) AsAccountSummary() *AccountSummary { // Publish sends the account to stats and threshold func (acc *Account) Publish() { acntTnt := utils.NewTenantID(acc.ID) - cgrEv := &utils.CGREvent{ - Tenant: acntTnt.Tenant, - ID: utils.GenUUID(), - Event: map[string]interface{}{ - utils.EventType: utils.AccountUpdate, - utils.EventSource: utils.AccountService, - utils.Account: acntTnt.ID, - utils.AllowNegative: acc.AllowNegative, - utils.Disabled: acc.Disabled}} + cgrEv := &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: acntTnt.Tenant, + ID: utils.GenUUID(), + Event: map[string]interface{}{ + utils.EventType: utils.AccountUpdate, + utils.EventSource: utils.AccountService, + utils.Account: acntTnt.ID, + utils.AllowNegative: acc.AllowNegative, + utils.Disabled: acc.Disabled, + }, + }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, + }, + } if len(config.CgrConfig().RalsCfg().StatSConns) != 0 { go func() { var reply []string if err := connMgr.Call(config.CgrConfig().RalsCfg().StatSConns, nil, - utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREventWithOpts: &utils.CGREventWithOpts{ - CGREvent: cgrEv}}, &reply); err != nil && + utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREventWithOpts: cgrEv}, &reply); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf(" error: %s processing balance event %+v with StatS.", @@ -1136,8 +1145,7 @@ func (acc *Account) Publish() { var tIDs []string if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns, nil, utils.ThresholdSv1ProcessEvent, - &ThresholdsArgsProcessEvent{CGREventWithOpts: &utils.CGREventWithOpts{ - CGREvent: cgrEv}}, &tIDs); err != nil && + &ThresholdsArgsProcessEvent{CGREventWithOpts: cgrEv}, &tIDs); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf(" error: %s processing account event %+v with ThresholdS.", err.Error(), cgrEv)) diff --git a/engine/action.go b/engine/action.go index 6bafcc33f..4e3698390 100644 --- a/engine/action.go +++ b/engine/action.go @@ -1026,26 +1026,31 @@ func resetAccountCDR(ub *Account, action *Action, acts Actions, _ interface{}) e } func export(ub *Account, a *Action, acs Actions, extraData interface{}) (err error) { - var cgrEv *utils.CGREvent + var cgrEv *utils.CGREventWithOpts switch { case ub != nil: - cgrEv = &utils.CGREvent{ - Tenant: utils.NewTenantID(ub.ID).Tenant, - ID: utils.GenUUID(), - Event: map[string]interface{}{ - utils.Account: ub.ID, - utils.EventType: utils.AccountUpdate, - utils.EventSource: utils.AccountService, - utils.AllowNegative: ub.AllowNegative, - utils.Disabled: ub.Disabled, - utils.BalanceMap: ub.BalanceMap, - utils.UnitCounters: ub.UnitCounters, - utils.ActionTriggers: ub.ActionTriggers, - utils.UpdateTime: ub.UpdateTime, + cgrEv = &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: utils.NewTenantID(ub.ID).Tenant, + ID: utils.GenUUID(), + Event: map[string]interface{}{ + utils.Account: ub.ID, + utils.EventType: utils.AccountUpdate, + utils.EventSource: utils.AccountService, + utils.AllowNegative: ub.AllowNegative, + utils.Disabled: ub.Disabled, + utils.BalanceMap: ub.BalanceMap, + utils.UnitCounters: ub.UnitCounters, + utils.ActionTriggers: ub.ActionTriggers, + utils.UpdateTime: ub.UpdateTime, + }, + }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, }, } case extraData != nil: - ev, canCast := extraData.(*utils.CGREvent) + ev, canCast := extraData.(*utils.CGREventWithOpts) if !canCast { return } @@ -1054,11 +1059,8 @@ func export(ub *Account, a *Action, acs Actions, extraData interface{}) (err err return // nothing to post } args := &utils.CGREventWithIDs{ - IDs: strings.Split(a.ExtraParameters, utils.INFIELD_SEP), - CGREventWithOpts: &utils.CGREventWithOpts{ - Opts: make(map[string]interface{}), - CGREvent: cgrEv, - }, + IDs: strings.Split(a.ExtraParameters, utils.INFIELD_SEP), + CGREventWithOpts: cgrEv, } var rply map[string]map[string]interface{} return connMgr.Call(config.CgrConfig().ApierCfg().EEsConns, nil, diff --git a/engine/balances.go b/engine/balances.go index 7e0345fd7..e7e20349f 100644 --- a/engine/balances.go +++ b/engine/balances.go @@ -675,7 +675,7 @@ func (b *Balance) debitMoney(cd *CallDescriptor, ub *Account, moneyBalances Bala return cc, nil } -// Converts the balance towards compressed information to be displayed +// AsBalanceSummary converts the balance towards compressed information to be displayed func (b *Balance) AsBalanceSummary(typ string) *BalanceSummary { bd := &BalanceSummary{UUID: b.Uuid, ID: b.ID, Type: typ, Value: b.Value, Disabled: b.Disabled} if bd.ID == "" { @@ -688,17 +688,24 @@ func (b *Balance) Publish() { if b.account == nil { return } - accountId := b.account.ID - acntTnt := utils.NewTenantID(accountId) - cgrEv := &utils.CGREvent{ - Tenant: acntTnt.Tenant, - ID: utils.GenUUID(), - Event: map[string]interface{}{ - utils.EventType: utils.BalanceUpdate, - utils.EventSource: utils.AccountService, - utils.Account: acntTnt.ID, - utils.BalanceID: b.ID, - utils.Units: b.Value}} + accountID := b.account.ID + acntTnt := utils.NewTenantID(accountID) + cgrEv := &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: acntTnt.Tenant, + ID: utils.GenUUID(), + Event: map[string]interface{}{ + utils.EventType: utils.BalanceUpdate, + utils.EventSource: utils.AccountService, + utils.Account: acntTnt.ID, + utils.BalanceID: b.ID, + utils.Units: b.Value, + }, + }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.BalanceUpdate, + }, + } if !b.ExpirationDate.IsZero() { cgrEv.Event[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339) } @@ -707,8 +714,7 @@ func (b *Balance) Publish() { var reply []string if err := connMgr.Call(config.CgrConfig().RalsCfg().StatSConns, nil, utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{ - CGREventWithOpts: &utils.CGREventWithOpts{ - CGREvent: cgrEv}}, &reply); err != nil && + CGREventWithOpts: cgrEv}, &reply); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf(" error: %s processing balance event %+v with StatS.", @@ -721,8 +727,7 @@ func (b *Balance) Publish() { var tIDs []string if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns, nil, utils.ThresholdSv1ProcessEvent, &ThresholdsArgsProcessEvent{ - CGREventWithOpts: &utils.CGREventWithOpts{ - CGREvent: cgrEv}}, &tIDs); err != nil && + CGREventWithOpts: cgrEv}, &tIDs); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf(" error: %s processing balance event %+v with ThresholdS.", @@ -827,6 +832,9 @@ func (bc Balances) SaveDirtyBalances(acc *Account) { utils.Units: b.Value, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.BalanceUpdate, + }, }, } if !b.ExpirationDate.IsZero() { @@ -864,6 +872,9 @@ func (bc Balances) SaveDirtyBalances(acc *Account) { utils.Disabled: acnt.Disabled, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, + }, }, } var tIDs []string diff --git a/engine/cdrs.go b/engine/cdrs.go index 783baf2d9..cacab8adf 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -392,7 +392,10 @@ func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithOpts) (err erro thArgs := &ThresholdsArgsProcessEvent{ CGREventWithOpts: cgrEv.Clone(), } - thArgs.CGREvent.Event[utils.EventType] = utils.CDR + if thArgs.Opts == nil { + thArgs.Opts = make(map[string]interface{}) + } + thArgs.Opts[utils.MetaEventType] = utils.CDR if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().ThresholdSConns, nil, utils.ThresholdSv1ProcessEvent, thArgs, &tIDs); err != nil && diff --git a/engine/resources.go b/engine/resources.go index b02344346..f06c5a4d9 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -436,6 +436,10 @@ func (rS *ResourceService) processThresholds(r *Resource, opts map[string]interf } thIDs = r.rPrf.ThresholdIDs } + if opts == nil { + opts = make(map[string]interface{}) + } + opts[utils.MetaEventType] = utils.ResourceUpdate thEv := &ThresholdsArgsProcessEvent{ThresholdIDs: thIDs, CGREventWithOpts: &utils.CGREventWithOpts{ CGREvent: &utils.CGREvent{ @@ -462,12 +466,15 @@ func (rS *ResourceService) processThresholds(r *Resource, opts map[string]interf } // matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call -func (rS *ResourceService) matchingResourcesForEvent(ev *utils.CGREvent, +func (rS *ResourceService) matchingResourcesForEvent(ev *utils.CGREventWithOpts, evUUID string, usageTTL *time.Duration) (rs Resources, err error) { matchingResources := make(map[string]*Resource) var isCached bool var rIDs utils.StringSet - evNm := utils.MapStorage{utils.MetaReq: ev.Event} + evNm := utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.Opts, + } if x, ok := Cache.Get(utils.CacheEventResources, evUUID); ok { // The ResourceIDs were cached as utils.StringSet{"resID":bool} isCached = true if x == nil { @@ -593,7 +600,7 @@ func (rS *ResourceService) V1ResourcesForEvent(args utils.ArgRSv1ResourceUsage, // end of RPC caching var mtcRLs Resources - if mtcRLs, err = rS.matchingResourcesForEvent(args.CGREvent, args.UsageID, args.UsageTTL); err != nil { + if mtcRLs, err = rS.matchingResourcesForEvent(args.CGREventWithOpts, args.UsageID, args.UsageTTL); err != nil { return err } *reply = mtcRLs @@ -631,7 +638,7 @@ func (rS *ResourceService) V1AuthorizeResources(args utils.ArgRSv1ResourceUsage, // end of RPC caching var mtcRLs Resources - if mtcRLs, err = rS.matchingResourcesForEvent(args.CGREvent, args.UsageID, args.UsageTTL); err != nil { + if mtcRLs, err = rS.matchingResourcesForEvent(args.CGREventWithOpts, args.UsageID, args.UsageTTL); err != nil { return err } var alcMessage string @@ -680,7 +687,7 @@ func (rS *ResourceService) V1AllocateResource(args utils.ArgRSv1ResourceUsage, r // end of RPC caching var mtcRLs Resources - if mtcRLs, err = rS.matchingResourcesForEvent(args.CGREvent, args.UsageID, + if mtcRLs, err = rS.matchingResourcesForEvent(args.CGREventWithOpts, args.UsageID, args.UsageTTL); err != nil { return err } @@ -747,7 +754,7 @@ func (rS *ResourceService) V1ReleaseResource(args utils.ArgRSv1ResourceUsage, re // end of RPC caching var mtcRLs Resources - if mtcRLs, err = rS.matchingResourcesForEvent(args.CGREvent, args.UsageID, + if mtcRLs, err = rS.matchingResourcesForEvent(args.CGREventWithOpts, args.UsageID, args.UsageTTL); err != nil { return err } diff --git a/engine/routes.go b/engine/routes.go index ae81d13fd..cb2d9b538 100644 --- a/engine/routes.go +++ b/engine/routes.go @@ -153,8 +153,11 @@ func (rpS *RouteService) Shutdown() error { } // matchingRouteProfilesForEvent returns ordered list of matching resources which are active by the time of the call -func (rpS *RouteService) matchingRouteProfilesForEvent(ev *utils.CGREvent, singleResult bool) (matchingRPrf []*RouteProfile, err error) { - evNm := utils.MapStorage{utils.MetaReq: ev.Event} +func (rpS *RouteService) matchingRouteProfilesForEvent(ev *utils.CGREventWithOpts, singleResult bool) (matchingRPrf []*RouteProfile, err error) { + evNm := utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.Opts, + } rPrfIDs, err := MatchingItemIDsForEvent(evNm, rpS.cgrcfg.RouteSCfg().StringIndexedFields, rpS.cgrcfg.RouteSCfg().PrefixIndexedFields, @@ -521,7 +524,7 @@ func (rpS *RouteService) sortedRoutesForEvent(args *ArgsGetRoutes) (sortedRoutes args.CGREvent.Event[utils.Usage] = time.Duration(time.Minute) // make sure we have default set for Usage } var rPrfs []*RouteProfile - if rPrfs, err = rpS.matchingRouteProfilesForEvent(args.CGREvent, true); err != nil { + if rPrfs, err = rpS.matchingRouteProfilesForEvent(args.CGREventWithOpts, true); err != nil { return } rPrfl := rPrfs[0] @@ -689,7 +692,7 @@ func (rpS *RouteService) V1GetRouteProfilesForEvent(args *utils.CGREventWithOpts } else if args.CGREvent.Event == nil { return utils.NewErrMandatoryIeMissing(utils.Event) } - sPs, err := rpS.matchingRouteProfilesForEvent(args.CGREvent, false) + sPs, err := rpS.matchingRouteProfilesForEvent(args, false) if err != nil { if err != utils.ErrNotFound { err = utils.NewErrServerError(err) diff --git a/engine/routes_test.go b/engine/routes_test.go index b4d4a2084..4a8568a86 100644 --- a/engine/routes_test.go +++ b/engine/routes_test.go @@ -385,7 +385,7 @@ func TestRoutesCache(t *testing.T) { } func TestRoutesmatchingRouteProfilesForEvent(t *testing.T) { - sprf, err := routeService.matchingRouteProfilesForEvent(argsGetRoutes[0].CGREvent, true) + sprf, err := routeService.matchingRouteProfilesForEvent(argsGetRoutes[0].CGREventWithOpts, true) if err != nil { t.Errorf("Error: %+v", err) } @@ -393,7 +393,7 @@ func TestRoutesmatchingRouteProfilesForEvent(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", sppTest[0], sprf[0]) } - sprf, err = routeService.matchingRouteProfilesForEvent(argsGetRoutes[1].CGREvent, true) + sprf, err = routeService.matchingRouteProfilesForEvent(argsGetRoutes[1].CGREventWithOpts, true) if err != nil { t.Errorf("Error: %+v", err) } @@ -401,7 +401,7 @@ func TestRoutesmatchingRouteProfilesForEvent(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", sppTest[1], sprf[0]) } - sprf, err = routeService.matchingRouteProfilesForEvent(argsGetRoutes[2].CGREvent, true) + sprf, err = routeService.matchingRouteProfilesForEvent(argsGetRoutes[2].CGREventWithOpts, true) if err != nil { t.Errorf("Error: %+v", err) } @@ -635,7 +635,7 @@ func TestRoutesAsOptsGetRoutesMaxCost(t *testing.T) { func TestRoutesMatchWithIndexFalse(t *testing.T) { routeService.cgrcfg.RouteSCfg().IndexedSelects = false - sprf, err := routeService.matchingRouteProfilesForEvent(argsGetRoutes[0].CGREvent, true) + sprf, err := routeService.matchingRouteProfilesForEvent(argsGetRoutes[0].CGREventWithOpts, true) if err != nil { t.Errorf("Error: %+v", err) } @@ -643,7 +643,7 @@ func TestRoutesMatchWithIndexFalse(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", sppTest[0], sprf[0]) } - sprf, err = routeService.matchingRouteProfilesForEvent(argsGetRoutes[1].CGREvent, true) + sprf, err = routeService.matchingRouteProfilesForEvent(argsGetRoutes[1].CGREventWithOpts, true) if err != nil { t.Errorf("Error: %+v", err) } @@ -651,7 +651,7 @@ func TestRoutesMatchWithIndexFalse(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", sppTest[1], sprf[0]) } - sprf, err = routeService.matchingRouteProfilesForEvent(argsGetRoutes[2].CGREvent, true) + sprf, err = routeService.matchingRouteProfilesForEvent(argsGetRoutes[2].CGREventWithOpts, true) if err != nil { t.Errorf("Error: %+v", err) } diff --git a/engine/stats.go b/engine/stats.go index 8717b73bb..61d21c5ec 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -151,7 +151,10 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) { // matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) (sqs StatQueues, err error) { - evNm := utils.MapStorage{utils.MetaReq: args.Event} + evNm := utils.MapStorage{ + utils.MetaReq: args.Event, + utils.MetaOpts: args.Opts, + } sqIDs := utils.NewStringSet(args.StatIDs) if len(sqIDs) == 0 { sqIDs, err = MatchingItemIDsForEvent(evNm, @@ -268,6 +271,10 @@ func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs [ if len(matchSQs) == 0 { return nil, utils.ErrNotFound } + if args.Opts == nil { + args.Opts = make(map[string]interface{}) + } + args.Opts[utils.MetaEventType] = utils.StatUpdate var stsIDs []string var withErrors bool for _, sq := range matchSQs { diff --git a/engine/thresholds.go b/engine/thresholds.go index e4b0e01b7..ce6cd71c8 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -105,7 +105,7 @@ func (t *Threshold) ProcessEvent(args *ThresholdsArgsProcessEvent, dm *DataManag at := &ActionTiming{ Uuid: utils.GenUUID(), ActionsID: actionSetID, - ExtraData: args.CGREvent, + ExtraData: args.CGREventWithOpts, } if acntID != "" { at.accountIDs = utils.NewStringMap(acntID) @@ -240,7 +240,10 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) { // matchingThresholdsForEvent returns ordered list of matching thresholds which are active for an Event func (tS *ThresholdService) matchingThresholdsForEvent(args *ThresholdsArgsProcessEvent) (ts Thresholds, err error) { - evNm := utils.MapStorage{utils.MetaReq: args.Event} + evNm := utils.MapStorage{ + utils.MetaReq: args.Event, + utils.MetaOpts: args.Opts, + } tIDs := utils.NewStringSet(args.ThresholdIDs) if len(tIDs) == 0 { tIDs, err = MatchingItemIDsForEvent(evNm, diff --git a/engine/z_actions_it_test.go b/engine/z_actions_it_test.go index ee93247a6..004cf0b10 100644 --- a/engine/z_actions_it_test.go +++ b/engine/z_actions_it_test.go @@ -395,6 +395,9 @@ func testActionsitThresholdCDrLog(t *testing.T) { utils.COST: -1.0, }, }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.CDR, + }, }, } var ids []string @@ -647,33 +650,37 @@ func testActionsitThresholdPostEvent(t *testing.T) { } else if !reflect.DeepEqual(tPrfl.ThresholdProfile, thReply) { t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, thReply) } - ev := &utils.CGREvent{ - Tenant: "cgrates.org", - ID: "cdrev1", - Event: map[string]interface{}{ - utils.EventType: utils.CDR, - "field_extr1": "val_extr1", - "fieldextr2": "valextr2", - utils.CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), - utils.RunID: utils.MetaRaw, - utils.OrderID: 123, - utils.OriginHost: "192.168.1.1", - utils.Source: utils.UNIT_TEST, - utils.OriginID: "dsafdsaf", - utils.RequestType: utils.META_RATED, - utils.Tenant: "cgrates.org", - utils.SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC), - utils.PDD: time.Duration(0) * time.Second, - utils.AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), - utils.Usage: time.Duration(10) * time.Second, - utils.ROUTE: "SUPPL1", - utils.COST: -1.0, + ev := &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "cdrev1", + Event: map[string]interface{}{ + "field_extr1": "val_extr1", + "fieldextr2": "valextr2", + utils.CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), + utils.RunID: utils.MetaRaw, + utils.OrderID: 123, + utils.OriginHost: "192.168.1.1", + utils.Source: utils.UNIT_TEST, + utils.OriginID: "dsafdsaf", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", + utils.SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC), + utils.PDD: time.Duration(0) * time.Second, + utils.AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + utils.Usage: time.Duration(10) * time.Second, + utils.ROUTE: "SUPPL1", + utils.COST: -1.0, + }, + }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.CDR, }, } var ids []string eIDs := []string{"THD_PostEvent"} if err := actsLclRpc.Call(utils.ThresholdSv1ProcessEvent, &ThresholdsArgsProcessEvent{ - CGREventWithOpts: &utils.CGREventWithOpts{CGREvent: ev}}, &ids); err != nil { + CGREventWithOpts: ev}, &ids); err != nil { t.Error(err) } else if !reflect.DeepEqual(ids, eIDs) { t.Errorf("Expecting ids: %s, received: %s", eIDs, ids) diff --git a/engine/z_resources_test.go b/engine/z_resources_test.go index 342db72ee..9ed1ee5e5 100644 --- a/engine/z_resources_test.go +++ b/engine/z_resources_test.go @@ -97,38 +97,44 @@ var ( rPrf: resprf[2], }, } - resEvs = []*utils.CGREvent{ + resEvs = []*utils.CGREventWithOpts{ { - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - ID: "event1", - Event: map[string]interface{}{ - "Resources": "ResourceProfile1", - utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC), - "UsageInterval": "1s", - "PddInterval": "1s", - utils.Weight: "20.0", - utils.Usage: time.Duration(135 * time.Second), - utils.COST: 123.0, + CGREvent: &utils.CGREvent{ + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + ID: "event1", + Event: map[string]interface{}{ + "Resources": "ResourceProfile1", + utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC), + "UsageInterval": "1s", + "PddInterval": "1s", + utils.Weight: "20.0", + utils.Usage: time.Duration(135 * time.Second), + utils.COST: 123.0, + }, }, }, { - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - ID: "event2", - Event: map[string]interface{}{ - "Resources": "ResourceProfile2", - utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC), - "UsageInterval": "1s", - "PddInterval": "1s", - utils.Weight: "15.0", - utils.Usage: time.Duration(45 * time.Second), + CGREvent: &utils.CGREvent{ + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + ID: "event2", + Event: map[string]interface{}{ + "Resources": "ResourceProfile2", + utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC), + "UsageInterval": "1s", + "PddInterval": "1s", + utils.Weight: "15.0", + utils.Usage: time.Duration(45 * time.Second), + }, }, }, { - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - ID: "event3", - Event: map[string]interface{}{ - "Resources": "ResourceProfilePrefix", - utils.Usage: time.Duration(30 * time.Second), + CGREvent: &utils.CGREvent{ + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + ID: "event3", + Event: map[string]interface{}{ + "Resources": "ResourceProfilePrefix", + utils.Usage: time.Duration(30 * time.Second), + }, }, }, } @@ -767,12 +773,14 @@ func TestResourceCaching(t *testing.T) { t.Errorf("Expecting: nil, received: %s", err) } - ev := &utils.CGREvent{ - Tenant: "cgrates.org", - ID: utils.UUIDSha1Prefix(), - Event: map[string]interface{}{ - "Account": "1001", - "Destination": "3002"}, + ev := &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: utils.UUIDSha1Prefix(), + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "3002"}, + }, } mres, err := resService.matchingResourcesForEvent(ev, diff --git a/general_tests/session3_it_test.go b/general_tests/session3_it_test.go index 517a5b8d4..64fd237fc 100644 --- a/general_tests/session3_it_test.go +++ b/general_tests/session3_it_test.go @@ -169,8 +169,9 @@ func testSes3ItProcessEvent(t *testing.T) { AlteredFields: []string{"*req.OfficeGroup"}, CGREventWithOpts: &utils.CGREventWithOpts{ Opts: map[string]interface{}{ - utils.Subsys: utils.MetaSessionS, - utils.OptsAPIKey: "ses12345", + utils.Subsys: utils.MetaSessionS, + utils.OptsAPIKey: "ses12345", + utils.MetaEventType: utils.StatUpdate, }, CGREvent: &utils.CGREvent{ Tenant: "cgrates.org", diff --git a/go.sum b/go.sum index 68d0479d8..d9c3b8435 100644 --- a/go.sum +++ b/go.sum @@ -179,6 +179,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -246,8 +247,6 @@ github.com/miekg/dns v1.1.30/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7 github.com/mitchellh/mapstructure v1.3.3 h1:SzB1nHZ2Xi+17FP0zVQBHIZqvwRN9408fJO8h+eeNA8= github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= -github.com/nyaruka/phonenumbers v1.0.56 h1:WdOfLJMyhXibLTBHu1MIrPmZ5eylfGaXZ9vl9h9SB08= -github.com/nyaruka/phonenumbers v1.0.56/go.mod h1:sDaTZ/KPX5f8qyV9qN+hIm+4ZBARJrupC6LuhshJq1U= github.com/nyaruka/phonenumbers v1.0.57 h1:V4FNPs061PSUOEzQaLH0+pfzEdqoiMH/QJWryx/0hfs= github.com/nyaruka/phonenumbers v1.0.57/go.mod h1:sDaTZ/KPX5f8qyV9qN+hIm+4ZBARJrupC6LuhshJq1U= github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= diff --git a/rates/rates.go b/rates/rates.go index 1b136d50b..44c4510c3 100644 --- a/rates/rates.go +++ b/rates/rates.go @@ -72,7 +72,10 @@ func (rS *RateS) Call(serviceMethod string, args interface{}, reply interface{}) // matchingRateProfileForEvent returns the matched RateProfile for the given event func (rS *RateS) matchingRateProfileForEvent(args *ArgsCostForEvent, rPfIDs []string) (rtPfl *engine.RateProfile, err error) { - evNm := utils.MapStorage{utils.MetaReq: args.CGREvent.Event} + evNm := utils.MapStorage{ + utils.MetaReq: args.CGREvent.Event, + utils.MetaOpts: args.Opts, + } if len(rPfIDs) == 0 { var rPfIDMp utils.StringSet if rPfIDMp, err = engine.MatchingItemIDsForEvent( diff --git a/utils/consts.go b/utils/consts.go index ca573d034..80f2218c9 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -551,7 +551,6 @@ const ( Meta = "*" MetaSysLog = "*syslog" MetaStdLog = "*stdout" - EventType = "EventType" EventSource = "EventSource" AccountID = "AccountID" ResourceID = "ResourceID" @@ -2400,6 +2399,8 @@ const ( Subsys = "*subsys" OptsAttributesProcessRuns = "*processRuns" OptsDispatcherMethod = "*method" + MetaEventType = "*eventType" + EventType = "EventType" ) // Event Flags