From 9bc1bc0625d14636d9c920f1a87bb0ba0db3a993 Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 19 Feb 2020 14:32:24 +0200 Subject: [PATCH] Add new API SessionSv1.GetCost + tests --- apier/v1/api_interfaces.go | 1 + apier/v1/dispatcher.go | 6 + apier/v1/sessions.go | 5 + apier/v1/sessions_process_event_it_test.go | 42 +++++++ apier/v1/sessionsbirpc.go | 6 + changelog | 3 +- data/tariffplans/dispatchers/Attributes.csv | 2 +- dispatchers/sessions.go | 21 ++++ dispatchers/sessions_it_test.go | 41 +++++++ sessions/sessions.go | 123 ++++++++++++++++++++ utils/consts.go | 1 + 11 files changed, 248 insertions(+), 3 deletions(-) diff --git a/apier/v1/api_interfaces.go b/apier/v1/api_interfaces.go index e265d88e3..97199aa96 100644 --- a/apier/v1/api_interfaces.go +++ b/apier/v1/api_interfaces.go @@ -84,6 +84,7 @@ type SessionSv1Interface interface { ProcessCDR(cgrEv *utils.CGREventWithArgDispatcher, rply *string) error ProcessMessage(args *sessions.V1ProcessMessageArgs, rply *sessions.V1ProcessMessageReply) error ProcessEvent(args *sessions.V1ProcessEventArgs, rply *sessions.V1ProcessEventReply) error + GetCost(args *sessions.V1ProcessEventArgs, rply *sessions.V1GetCostReply) error GetActiveSessions(args *utils.SessionFilter, rply *[]*sessions.ExternalSession) error GetActiveSessionsCount(args *utils.SessionFilter, rply *int) error ForceDisconnect(args *utils.SessionFilter, rply *string) error diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index 3324e5671..ccea49282 100755 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -455,6 +455,12 @@ func (dS *DispatcherSessionSv1) ProcessEvent(args *sessions.V1ProcessEventArgs, return dS.dS.SessionSv1ProcessEvent(args, reply) } +// GetCost implements SessionSv1GetCost +func (dS *DispatcherSessionSv1) GetCost(args *sessions.V1ProcessEventArgs, + reply *sessions.V1GetCostReply) (err error) { + return dS.dS.SessionSv1GetCost(args, reply) +} + // TerminateSession implements SessionSv1TerminateSession func (dS *DispatcherSessionSv1) TerminateSession(args *sessions.V1TerminateSessionArgs, reply *string) (err error) { diff --git a/apier/v1/sessions.go b/apier/v1/sessions.go index de06379b9..dcd2bda9f 100644 --- a/apier/v1/sessions.go +++ b/apier/v1/sessions.go @@ -82,6 +82,11 @@ func (ssv1 *SessionSv1) ProcessEvent(args *sessions.V1ProcessEventArgs, return ssv1.Ss.BiRPCv1ProcessEvent(nil, args, rply) } +func (ssv1 *SessionSv1) GetCost(args *sessions.V1ProcessEventArgs, + rply *sessions.V1GetCostReply) error { + return ssv1.Ss.BiRPCv1GetCost(nil, args, rply) +} + func (ssv1 *SessionSv1) GetActiveSessions(args *utils.SessionFilter, rply *[]*sessions.ExternalSession) error { return ssv1.Ss.BiRPCv1GetActiveSessions(nil, args, rply) diff --git a/apier/v1/sessions_process_event_it_test.go b/apier/v1/sessions_process_event_it_test.go index c723411c9..4c9b29854 100644 --- a/apier/v1/sessions_process_event_it_test.go +++ b/apier/v1/sessions_process_event_it_test.go @@ -53,6 +53,7 @@ var sTestSessionSv1ProcessEvent = []func(t *testing.T){ testSSv1ItProcessEventWithGetCost2, testSSv1ItProcessEventWithGetCost3, testSSv1ItProcessEventWithGetCost4, + testSSv1ItGetCost, testSSv1ItStopCgrEngine, } @@ -614,3 +615,44 @@ func testSSv1ItProcessEventWithGetCost4(t *testing.T) { } } + +func testSSv1ItGetCost(t *testing.T) { + // GetCost for ANY2CNT Subject + args := &sessions.V1ProcessEventArgs{ + Flags: []string{utils.MetaAttributes, utils.MetaCost}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSSv1ItGetCost", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.MONETARY, + utils.OriginID: "testSSv1ItProcessEventWithGetCost", + utils.RequestType: sSV1RequestType, + utils.Subject: "*attributes", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: 10 * time.Minute, + }, + }, + } + var rply sessions.V1GetCostReply + if err := sSv1BiRpc.Call(utils.SessionSv1GetCost, + args, &rply); err != nil { + t.Error(err) + } + if rply.Attributes == nil { + t.Error("Received nil Attributes") + } else if !reflect.DeepEqual(rply.Attributes.MatchedProfiles, []string{"ATTR_SUBJECT_CASE1"}) { + t.Errorf("Expected: %+v,received: %+v", []string{"ATTR_SUBJECT_CASE1"}, rply.Attributes.MatchedProfiles) + } else if !reflect.DeepEqual(rply.Attributes.AlteredFields, []string{"*req.Subject"}) { + t.Errorf("Expected: %+v,received: %+v", []string{"*req.Subject"}, rply.Attributes.AlteredFields) + } + if rply.EventCost == nil { + t.Errorf("Received nil EventCost") + } else if *rply.EventCost.Cost != 0.198 { // same cost as in CDR + t.Errorf("Expected: %+v,received: %+v", 0.198, *rply.EventCost.Cost) + } else if *rply.EventCost.Usage != 10*time.Minute { + t.Errorf("Expected: %+v,received: %+v", 10*time.Minute, *rply.EventCost.Usage) + } +} diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index 35009f979..be4317d8c 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -42,6 +42,7 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} { utils.SessionSv1ProcessCDR: ssv1.BiRPCv1ProcessCDR, utils.SessionSv1ProcessMessage: ssv1.BiRPCv1ProcessMessage, utils.SessionSv1ProcessEvent: ssv1.BiRPCv1ProcessEvent, + utils.SessionSv1GetCost: ssv1.BiRPCv1GetCost, utils.SessionSv1ForceDisconnect: ssv1.BiRPCv1ForceDisconnect, utils.SessionSv1RegisterInternalBiJSONConn: ssv1.BiRPCv1RegisterInternalBiJSONConn, @@ -103,6 +104,11 @@ func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(clnt *rpc2.Client, args *sessions.V1 return ssv1.Ss.BiRPCv1ProcessEvent(clnt, args, rply) } +func (ssv1 *SessionSv1) BiRPCv1GetCost(clnt *rpc2.Client, args *sessions.V1ProcessEventArgs, + rply *sessions.V1GetCostReply) error { + return ssv1.Ss.BiRPCv1GetCost(clnt, args, rply) +} + func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(clnt *rpc2.Client, args *utils.SessionFilter, rply *[]*sessions.ExternalSession) error { return ssv1.Ss.BiRPCv1GetActiveSessions(clnt, args, rply) diff --git a/changelog b/changelog index 48416562f..3a6953c67 100644 --- a/changelog +++ b/changelog @@ -16,7 +16,6 @@ cgrates (0.11) UNRELEASED; urgency=medium [ Voivozeanu Teofil ] * [AgentRequest] Add support for *group type and correctly overwrite the values in case of *variable - * [EventReader] Correctly populate ConcurrentRequest from config in - EventReader + * [EventReader] Correctly populate ConcurrentRequest from config -- Alexandru Tripon Tue, 18 Feb 2020 08:24:39 +0200 diff --git a/data/tariffplans/dispatchers/Attributes.csv b/data/tariffplans/dispatchers/Attributes.csv index 819bc67d9..038e7c153 100644 --- a/data/tariffplans/dispatchers/Attributes.csv +++ b/data/tariffplans/dispatchers/Attributes.csv @@ -9,7 +9,7 @@ cgrates.org,ATTR_API_THR_AUTH,*auth,*string:~*req.APIKey:thr12345,,,*req.APIMeth cgrates.org,ATTR_API_SUP_AUTH,*auth,*string:~*req.APIKey:sup12345,,,*req.APIMethods,*constant,SupplierSv1.Ping&SupplierSv1.GetSuppliers&SupplierSv1.GetSupplierProfilesForEvent,false,20 cgrates.org,ATTR_API_STAT_AUTH,*auth,*string:~*req.APIKey:stat12345,,,*req.APIMethods,*constant,StatSv1.Ping&StatSv1.GetStatQueuesForEvent&StatSv1.GetQueueStringMetrics&StatSv1.ProcessEvent&StatSv1.GetQueueIDs&StatSv1.GetQueueFloatMetrics,false,20 cgrates.org,ATTR_API_RES_AUTH,*auth,*string:~*req.APIKey:res12345,,,*req.APIMethods,*constant,ResourceSv1.Ping&ResourceSv1.GetResourcesForEvent&ResourceSv1.AuthorizeResources&ResourceSv1.AllocateResources&ResourceSv1.ReleaseResources&ResourceSv1.GetResource,false,20 -cgrates.org,ATTR_API_SES_AUTH,*auth,*string:~*req.APIKey:ses12345,,,*req.APIMethods,*constant,SessionSv1.Ping&SessionSv1.AuthorizeEvent&SessionSv1.AuthorizeEventWithDigest&SessionSv1.InitiateSession&SessionSv1.InitiateSessionWithDigest&SessionSv1.UpdateSession&SessionSv1.SyncSessions&SessionSv1.TerminateSession&SessionSv1.ProcessCDR&SessionSv1.ProcessMessage&SessionSv1.GetActiveSessions&SessionSv1.GetActiveSessionsCount&SessionSv1.ForceDisconnect&SessionSv1.GetPassiveSessions&SessionSv1.GetPassiveSessionsCount&SessionSv1.ReplicateSessions&SessionSv1.SetPassiveSession&SessionSv1.ProcessEvent,false,20 +cgrates.org,ATTR_API_SES_AUTH,*auth,*string:~*req.APIKey:ses12345,,,*req.APIMethods,*constant,SessionSv1.Ping&SessionSv1.AuthorizeEvent&SessionSv1.AuthorizeEventWithDigest&SessionSv1.InitiateSession&SessionSv1.InitiateSessionWithDigest&SessionSv1.UpdateSession&SessionSv1.SyncSessions&SessionSv1.TerminateSession&SessionSv1.ProcessCDR&SessionSv1.ProcessMessage&SessionSv1.GetActiveSessions&SessionSv1.GetActiveSessionsCount&SessionSv1.ForceDisconnect&SessionSv1.GetPassiveSessions&SessionSv1.GetPassiveSessionsCount&SessionSv1.ReplicateSessions&SessionSv1.SetPassiveSession&SessionSv1.ProcessEvent&SessionSv1.GetCost,false,20 cgrates.org,ATTR_API_RSP_AUTH,*auth,*string:~*req.APIKey:rsp12345,,,*req.APIMethods,*constant,CoreSv1.Status&CoreSv1.Ping&Responder.Shutdown&Responder.Ping,false,20 cgrates.org,ATTR_API_CHC_AUTH,*auth,*string:~*req.APIKey:chc12345,,,*req.APIMethods,*constant,CacheSv1.Ping&CacheSv1.GetCacheStats&CacheSv1.LoadCache&CacheSv1.PrecacheStatus&CacheSv1.GetItemIDs&CacheSv1.HasItem&CacheSv1.GetItemExpiryTime&CacheSv1.ReloadCache&CacheSv1.RemoveItem&CacheSv1.FlushCache&CacheSv1.Clear,false,20 cgrates.org,ATTR_API_GRD_AUTH,*auth,*string:~*req.APIKey:grd12345,,,*req.APIMethods,*constant,GuardianSv1.Ping&GuardianSv1.RemoteLock&GuardianSv1.RemoteUnlock,false,20 diff --git a/dispatchers/sessions.go b/dispatchers/sessions.go index c22b6e1db..e0eadca56 100755 --- a/dispatchers/sessions.go +++ b/dispatchers/sessions.go @@ -257,6 +257,27 @@ func (dS *DispatcherService) SessionSv1ProcessEvent(args *sessions.V1ProcessEven utils.SessionSv1ProcessEvent, args, reply) } +func (dS *DispatcherService) SessionSv1GetCost(args *sessions.V1ProcessEventArgs, + reply *sessions.V1GetCostReply) (err error) { + args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { + if args.ArgDispatcher == nil { + return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) + } + if err = dS.authorize(utils.SessionSv1GetCost, + args.CGREvent.Tenant, + args.APIKey, args.CGREvent.Time); err != nil { + return + } + } + var routeID *string + if args.ArgDispatcher != nil { + routeID = args.ArgDispatcher.RouteID + } + return dS.Dispatch(args.CGREvent, utils.MetaSessionS, routeID, + utils.SessionSv1GetCost, args, reply) +} + func (dS *DispatcherService) SessionSv1GetActiveSessions(args *utils.SessionFilter, reply *[]*sessions.ExternalSession) (err error) { tnt := dS.cfg.GeneralCfg().DefaultTenant diff --git a/dispatchers/sessions_it_test.go b/dispatchers/sessions_it_test.go index 7de006d3a..ba622b5da 100755 --- a/dispatchers/sessions_it_test.go +++ b/dispatchers/sessions_it_test.go @@ -51,6 +51,7 @@ var sTestsDspSession = []func(t *testing.T){ testDspSessionProcessEvent3, + testDspSessionGetCost, testDspSessionReplicate, testDspSessionPassive, testDspSessionForceDisconect, @@ -932,3 +933,43 @@ func testDspSessionProcessEvent3(t *testing.T) { t.Errorf("Expected no active sessions recived %v", repl) } } + +func testDspSessionGetCost(t *testing.T) { + + args := &sessions.V1ProcessEventArgs{ + Flags: []string{utils.MetaCost}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSSv1ItGetCost", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.MONETARY, + utils.OriginID: "testSSv1ItProcessEventWithGetCost", + utils.RequestType: utils.META_PREPAID, + utils.Subject: "ANY2CNT", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: 10 * time.Minute, + }, + }, + ArgDispatcher: &utils.ArgDispatcher{ + APIKey: utils.StringPointer("ses12345"), + }, + } + + var rply sessions.V1GetCostReply + if err := dispEngine.RPC.Call(utils.SessionSv1GetCost, + args, &rply); err != nil { + t.Error(err) + } + + if rply.EventCost == nil { + t.Errorf("Received nil EventCost") + } else if *rply.EventCost.Cost != 0.198 { // same cost as in CDR + t.Errorf("Expected: %+v,received: %+v", 0.198, *rply.EventCost.Cost) + } else if *rply.EventCost.Usage != 10*time.Minute { + t.Errorf("Expected: %+v,received: %+v", 10*time.Minute, *rply.EventCost.Usage) + } + +} diff --git a/sessions/sessions.go b/sessions/sessions.go index 6fccabf14..d52a2a6ac 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -3240,6 +3240,129 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, return } +// V1GetCostReply is the reply for the GetCost API +type V1GetCostReply struct { + Attributes *engine.AttrSProcessEventReply + EventCost *engine.EventCost +} + +// BiRPCv1ProcessEvent processes one event with the right subsystems based on arguments received +func (sS *SessionS) BiRPCv1GetCost(clnt rpcclient.ClientConnector, + args *V1ProcessEventArgs, rply *V1GetCostReply) (err error) { + if args.CGREvent == nil { + return utils.NewErrMandatoryIeMissing(utils.CGREventString) + } + var withErrors bool + if args.CGREvent.ID == "" { + args.CGREvent.ID = utils.GenUUID() + } + + // RPC caching + if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 { + cacheKey := utils.ConcatenatedKey(utils.SessionSv1GetCost, args.CGREvent.ID) + refID := guardian.Guardian.GuardIDs("", + sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) + + if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { + cachedResp := itm.(*utils.CachedRPCResponse) + if cachedResp.Error == nil { + *rply = *cachedResp.Result.(*V1GetCostReply) + } + return cachedResp.Error + } + defer engine.Cache.Set(utils.CacheRPCResponses, cacheKey, + &utils.CachedRPCResponse{Result: rply, Error: err}, + nil, true, utils.NonTransactional) + } + // end of RPC caching + + if args.CGREvent.Tenant == "" { + args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant + } + + //convert from Flags []string to utils.FlagsWithParams + var argsFlagsWithParams utils.FlagsWithParams + if argsFlagsWithParams, err = utils.FlagsWithParamsFromSlice(args.Flags); err != nil { + return + } + // check for *attribute + if argsFlagsWithParams.HasKey(utils.MetaAttributes) { + rplyAttr, err := sS.processAttributes(args.CGREvent, args.ArgDispatcher, + argsFlagsWithParams.ParamsSlice(utils.MetaAttributes)) + if err == nil { + args.CGREvent = rplyAttr.CGREvent.Clone() + rply.Attributes = &rplyAttr + } else if err.Error() != utils.ErrNotFound.Error() { + return utils.NewErrAttributeS(err) + } + } + // check for *cost + if argsFlagsWithParams.HasKey(utils.MetaCost) { + //compose the CallDescriptor with Args + me := engine.MapEvent(args.CGREvent.Event).Clone() + startTime := me.GetTimeIgnoreErrors(utils.AnswerTime, + sS.cgrCfg.GeneralCfg().DefaultTimezone) + if startTime.IsZero() { // AnswerTime not parsable, try SetupTime + startTime = me.GetTimeIgnoreErrors(utils.SetupTime, + sS.cgrCfg.GeneralCfg().DefaultTimezone) + } + category := me.GetStringIgnoreErrors(utils.Category) + if len(category) == 0 { + category = sS.cgrCfg.GeneralCfg().DefaultCategory + } + subject := me.GetStringIgnoreErrors(utils.Subject) + if len(subject) == 0 { + subject = me.GetStringIgnoreErrors(utils.Account) + } + + cd := &engine.CallDescriptor{ + RunID: me.GetStringIgnoreErrors(utils.RunID), + ToR: me.GetStringIgnoreErrors(utils.ToR), + Tenant: args.CGREvent.Tenant, + Category: category, + Subject: subject, + Account: me.GetStringIgnoreErrors(utils.Account), + Destination: me.GetStringIgnoreErrors(utils.Destination), + TimeStart: startTime, + TimeEnd: startTime.Add(me.GetDurationIgnoreErrors(utils.Usage)), + } + var argDsp *utils.ArgDispatcher + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKey, errAPIKey := me.GetString(utils.MetaApiKey) + if errAPIKey == nil { + argDsp = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKey), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + if routeID, err := me.GetString(utils.MetaRouteID); err == nil { + if errAPIKey == utils.ErrNotFound { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + argDsp = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeID), + } + } else { + argDsp.RouteID = utils.StringPointer(routeID) + } + } + + var cc engine.CallCost + if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().RALsConns, nil, + utils.ResponderGetCost, + &engine.CallDescriptorWithArgDispatcher{CallDescriptor: cd, + ArgDispatcher: argDsp}, &cc); err != nil { + return + } + ec := engine.NewEventCostFromCallCost(&cc, args.CGREvent.ID, me.GetStringIgnoreErrors(utils.RunID)) + ec.Compute() + rply.EventCost = ec + } + if withErrors { + err = utils.ErrPartiallyExecuted + } + return +} + // BiRPCv1SyncSessions will sync sessions on demand func (sS *SessionS) BiRPCv1SyncSessions(clnt rpcclient.ClientConnector, ignParam *utils.TenantWithArgDispatcher, reply *string) error { diff --git a/utils/consts.go b/utils/consts.go index 04fd88c06..55fa3e48c 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -1289,6 +1289,7 @@ const ( SessionSv1ProcessCDR = "SessionSv1.ProcessCDR" SessionSv1ProcessMessage = "SessionSv1.ProcessMessage" SessionSv1ProcessEvent = "SessionSv1.ProcessEvent" + SessionSv1GetCost = "SessionSv1.GetCost" SessionSv1DisconnectSession = "SessionSv1.DisconnectSession" SessionSv1GetActiveSessions = "SessionSv1.GetActiveSessions" SessionSv1GetActiveSessionsCount = "SessionSv1.GetActiveSessionsCount"