diff --git a/engine/libengine.go b/engine/libengine.go index 33a358084..221b1669f 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -155,23 +155,25 @@ func NewServiceWithPing(val any, name, prefix string) (*birpc.Service, error) { if err != nil { return nil, err } - srv.Methods["Ping"] = pingM + srv.Methods[utils.Ping] = pingM return srv, nil } +// NewServiceWithName is used by a service to return a number of *birpc.Service objects with different versions +// +// which are groupped by the vX prefix in the RPC capable methods of val interface. (ie: CoreS and CoreSv1) func NewServiceWithName(val any, name string, useName bool) (_ IntService, err error) { var srv *birpc.Service if srv, err = birpc.NewService(val, name, useName); err != nil { return } - srv.Methods["Ping"] = pingM + srv.Methods[utils.Ping] = pingM s := IntService{srv.Name: srv} for m, v := range srv.Methods { m = strings.TrimPrefix(m, "BiRPC") if len(m) < 2 || unicode.ToLower(rune(m[0])) != 'v' { continue } - key := srv.Name if unicode.IsLower(rune(key[len(key)-1])) { key += "V" @@ -184,7 +186,7 @@ func NewServiceWithName(val any, name string, useName bool) (_ IntService, err e srv2 = new(birpc.Service) *srv2 = *srv srv2.Name = key - srv2.Methods = map[string]*birpc.MethodType{"Ping": pingM} + srv2.Methods = map[string]*birpc.MethodType{utils.Ping: pingM} s[key] = srv2 } srv2.Methods[m[2:]] = v @@ -197,7 +199,7 @@ func NewDispatcherService(val any) (_ IntService, err error) { if srv, err = birpc.NewService(val, utils.EmptyString, false); err != nil { return } - srv.Methods["Ping"] = pingM + srv.Methods[utils.Ping] = pingM s := IntService{srv.Name: srv} for m, v := range srv.Methods { key := srv.Name @@ -283,7 +285,7 @@ func NewDispatcherService(val any) (_ IntService, err error) { srv2 = new(birpc.Service) *srv2 = *srv srv2.Name = key - srv2.Methods = map[string]*birpc.MethodType{"Ping": pingM} + srv2.Methods = map[string]*birpc.MethodType{utils.Ping: pingM} s[key] = srv2 } srv2.Methods[m[2:]] = v @@ -308,7 +310,7 @@ func ping(_ any, _ *context.Context, _ *utils.CGREvent, reply *string) error { var pingM = &birpc.MethodType{ Method: reflect.Method{ - Name: "Ping", + Name: utils.Ping, Type: reflect.TypeOf(ping), Func: reflect.ValueOf(ping), }, diff --git a/sessions/apis.go b/sessions/apis.go new file mode 100644 index 000000000..cd0047979 --- /dev/null +++ b/sessions/apis.go @@ -0,0 +1,259 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package sessions + +import ( + "fmt" + "strings" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/guardian" + "github.com/cgrates/cgrates/utils" +) + +// BiRPCv1AuthorizeEvent performs authorization for CGREvent based on specific subsystems +func (sS *SessionS) BiRPCv1AuthorizeEvent(ctx *context.Context, + args *utils.CGREvent, authReply *V1AuthorizeReply) (err error) { + if args == nil { + return utils.NewErrMandatoryIeMissing(utils.CGREventString) + } + if args.Event == nil { + return utils.NewErrMandatoryIeMissing(utils.Event) + } + var withErrors bool + if args.ID == "" { + args.ID = utils.GenUUID() + } + if args.Tenant == "" { + args.Tenant = sS.cfg.GeneralCfg().DefaultTenant + } + // RPC caching + if sS.cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { + cacheKey := utils.ConcatenatedKey(utils.SessionSv1AuthorizeEvent, args.ID) + refID := guardian.Guardian.GuardIDs("", + sS.cfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) + + if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { + cachedResp := itm.(*utils.CachedRPCResponse) + if cachedResp.Error == nil { + *authReply = *cachedResp.Result.(*V1AuthorizeReply) + } + return cachedResp.Error + } + defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey, + &utils.CachedRPCResponse{Result: authReply, Error: err}, + nil, true, utils.NonTransactional) + } + // end of RPC caching + dP := args.AsDataProvider() + var attrS bool + if attrS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Attributes, + config.SessionsAttributesDftOpt, utils.MetaAttributes); err != nil { + return + } + var acntS bool + if acntS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.MaxUsage, + config.SessionsMaxUsageDftOpt, utils.MetaAccounts); err != nil { + return + } + var routeS bool + if routeS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Routes, + config.SessionsRoutesDftOpt, utils.MetaRoutes); err != nil { + return + } + var resourceS bool + if resourceS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.ResourcesAuthorize, + config.SessionsResourcesAuthorizeDftOpt, utils.MetaResources); err != nil { + return + } + if !attrS && !acntS && !resourceS && !routeS { + return // Nothing to do + } + if args.APIOpts == nil { + args.APIOpts = make(map[string]any) + } + if attrS { + if args.APIOpts == nil { + args.APIOpts = make(map[string]any) + } + rplyAttr, err := sS.processAttributes(ctx, args) + if err == nil { + args = rplyAttr.CGREvent + authReply.Attributes = &rplyAttr + } else if err.Error() != utils.ErrNotFound.Error() { + return utils.NewErrAttributeS(err) + } + } + runEvents := make(map[string]*utils.CGREvent) + + var chrgS bool + if chrgS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Chargers, + config.SessionsChargersDftOpt, utils.MetaChargers); err != nil { + return + } + if chrgS { + var chrgrs []*engine.ChrgSProcessEventReply + if chrgrs, err = sS.processChargerS(ctx, args); err != nil { + return + } + for _, chrgr := range chrgrs { + runEvents[chrgr.ChargerSProfile] = chrgr.CGREvent + } + } else { + runEvents[utils.MetaRaw] = args + } + if acntS { + var maxAbstracts map[string]*utils.Decimal + if maxAbstracts, err = sS.accounSMaxAbstracts(ctx, runEvents); err != nil { + return utils.NewErrAccountS(err) + } + authReply.MaxUsage = getMaxUsageFromRuns(maxAbstracts) + } + if resourceS { + if len(sS.cfg.SessionSCfg().ResourceSConns) == 0 { + return utils.NewErrNotConnected(utils.ResourceS) + } + originID, _ := args.FieldAsString(utils.OriginID) + if originID == "" { + originID = utils.UUIDSha1Prefix() + } + args.APIOpts[utils.OptsResourcesUsageID] = originID + args.APIOpts[utils.OptsResourcesUnits] = 1 + var allocMsg string + if err = sS.connMgr.Call(ctx, sS.cfg.SessionSCfg().ResourceSConns, utils.ResourceSv1AuthorizeResources, + args, &allocMsg); err != nil { + return utils.NewErrResourceS(err) + } + authReply.ResourceAllocation = &allocMsg + } + if routeS { + routesReply, err := sS.getRoutes(ctx, args.Clone()) + if err != nil { + return err + } + if routesReply != nil { + authReply.RouteProfiles = routesReply + } + } + var thdS bool + if thdS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Thresholds, + config.SessionsThresholdsDftOpt, utils.MetaThresholds); err != nil { + return + } + if thdS { + tIDs, err := sS.processThreshold(ctx, args, true) + if err != nil && err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", + utils.SessionS, err.Error(), args)) + withErrors = true + } + authReply.ThresholdIDs = &tIDs + } + var stS bool + if stS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Stats, + config.SessionsStatsDftOpt, utils.MetaStats); err != nil { + return + } + if stS { + sIDs, err := sS.processStats(ctx, args, false) + if err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing event %+v with StatS.", + utils.SessionS, err.Error(), args)) + withErrors = true + } + authReply.StatQueueIDs = &sIDs + } + if withErrors { + err = utils.ErrPartiallyExecuted + } + return +} + +// BiRPCv1AuthorizeEventWithDigest performs authorization for CGREvent based on specific subsystem +// returning one level fields instead of multiple ones returned by BiRPCv1AuthorizeEvent +func (sS *SessionS) BiRPCv1AuthorizeEventWithDigest(ctx *context.Context, + args *utils.CGREvent, authReply *V1AuthorizeReplyWithDigest) (err error) { + var initAuthRply V1AuthorizeReply + if err = sS.BiRPCv1AuthorizeEvent(ctx, args, &initAuthRply); err != nil { + return + } + if initAuthRply.Attributes != nil && len(initAuthRply.Attributes.AlteredFields) != 0 { + authReply.AttributesDigest = utils.StringPointer(initAuthRply.Attributes.Digest()) + } + if initAuthRply.ResourceAllocation != nil && len(*initAuthRply.ResourceAllocation) != 0 { + authReply.ResourceAllocation = initAuthRply.ResourceAllocation + } + if initAuthRply.MaxUsage != nil { + maxDur, _ := initAuthRply.MaxUsage.Duration() + authReply.MaxUsage = maxDur.Nanoseconds() + } + if initAuthRply.RouteProfiles != nil && len(initAuthRply.RouteProfiles) != 0 { + authReply.RoutesDigest = utils.StringPointer(initAuthRply.RouteProfiles.Digest()) + } + if initAuthRply.ThresholdIDs != nil && len(*initAuthRply.ThresholdIDs) != 0 { + authReply.Thresholds = utils.StringPointer( + strings.Join(*initAuthRply.ThresholdIDs, utils.FieldsSep)) + } + if initAuthRply.StatQueueIDs != nil && len(*initAuthRply.StatQueueIDs) != 0 { + authReply.StatQueues = utils.StringPointer( + strings.Join(*initAuthRply.StatQueueIDs, utils.FieldsSep)) + } + return +} + +// BiRPCv1ProcessCDR sends the CDR to CDRs +func (sS *SessionS) BiRPCv1ProcessCDR(ctx *context.Context, + cgrEv *utils.CGREvent, rply *string) (err error) { + if cgrEv.Event == nil { + return utils.NewErrMandatoryIeMissing(utils.Event) + } + if cgrEv.ID == utils.EmptyString { + cgrEv.ID = utils.GenUUID() + } + if cgrEv.Tenant == utils.EmptyString { + cgrEv.Tenant = sS.cfg.GeneralCfg().DefaultTenant + } + + // RPC caching + if sS.cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { + cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessCDR, cgrEv.ID) + refID := guardian.Guardian.GuardIDs("", + sS.cfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) + + if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { + cachedResp := itm.(*utils.CachedRPCResponse) + if cachedResp.Error == nil { + *rply = *cachedResp.Result.(*string) + } + return cachedResp.Error + } + defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey, + &utils.CachedRPCResponse{Result: rply, Error: err}, + nil, true, utils.NonTransactional) + } + // end of RPC caching + + return sS.processCDR(ctx, cgrEv, rply) +} diff --git a/sessions/basics_it_test.go b/sessions/basics_it_test.go new file mode 100644 index 000000000..a968a00e9 --- /dev/null +++ b/sessions/basics_it_test.go @@ -0,0 +1,406 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package sessions + +import ( + "testing" + "time" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + sBscsItCfgPath string + sBscsItCfg *config.CGRConfig + sBscsItRPC *birpc.Client + + sBscsITtests = []func(t *testing.T){ + testSBscsItInitCfg, + testSBscsItFlushDBs, + testSBscsItStartEngine, + testSBscsItApierRpcConn, + testSBscsItAuthorizeEvent, + testSBscsItAuthorizeEventWithDigest, + testSBscsItProcessCDR, + testSBscsItStopCgrEngine, + } +) + +func TestSBasicsIt(t *testing.T) { + sBscsItCfgPath = "/home/dan/sshfs/sesdev/etc/cgrates/" + for _, stest := range sBscsITtests { + t.Run("TestSBasicsIt", stest) + } +} + +// Init config firs +func testSBscsItInitCfg(t *testing.T) { + var err error + sBscsItCfg, err = config.NewCGRConfigFromPath(context.Background(), sBscsItCfgPath) + if err != nil { + t.Error(err) + } +} + +// Remove data in both rating and accounting db +func testSBscsItFlushDBs(t *testing.T) { + if err := engine.InitDataDB(sBscsItCfg); err != nil { + t.Fatal(err) + } + if err := engine.InitStorDB(sBscsItCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func testSBscsItStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(sBscsItCfgPath, *utils.WaitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func testSBscsItApierRpcConn(t *testing.T) { + sBscsItRPC = engine.NewRPCClient(t, sBscsItCfg.ListenCfg(), *utils.Encoding) +} + +// tests related to AuthorizeEvent API +func testSBscsItAuthorizeEvent(t *testing.T) { + // Account requested not found, should fail here with error + var rplyAuth V1AuthorizeReply + if err := sBscsItRPC.Call(context.Background(), utils.SessionSv1AuthorizeEvent, + &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSBscsItAuthorizeEvent1", + APIOpts: map[string]any{ + utils.MetaAccounts: true, + }, + Event: map[string]any{ + utils.AccountField: "1001", + utils.Destination: "1002", + utils.OriginID: "testSBscsItAuthorizeEvent1", + utils.SetupTime: "2018-01-07T17:00:00Z", + }, + }, &rplyAuth); err == nil || err.Error() != "ACCOUNTS_ERROR:NOT_FOUND" { + t.Error(err) + } + // Available less than requested(1m) + argSet := &utils.AccountWithAPIOpts{ + Account: &utils.Account{ + Tenant: "cgrates.org", + ID: "1001", + FilterIDs: []string{"*string:~*req.Account:1001"}, + Balances: map[string]*utils.Balance{ + "ABSTRACT1": &utils.Balance{ + ID: "ABSTRACT1", + Type: utils.MetaAbstract, + Weights: utils.DynamicWeights{&utils.DynamicWeight{Weight: 20.0}}, + CostIncrements: []*utils.CostIncrement{ + &utils.CostIncrement{ + Increment: utils.NewDecimalFromUsageIgnoreErr("1s"), + RecurrentFee: utils.NewDecimalFromFloat64(0.01), + }, + }, + Units: utils.NewDecimalFromUsageIgnoreErr("1m"), + }, + "CONCRETE1": &utils.Balance{ + ID: "CONCRETE1", + Type: utils.MetaConcrete, + Weights: utils.DynamicWeights{&utils.DynamicWeight{Weight: 10.0}}, + CostIncrements: []*utils.CostIncrement{ + &utils.CostIncrement{ + Increment: utils.NewDecimalFromUsageIgnoreErr("1s"), + RecurrentFee: utils.NewDecimalFromFloat64(0.01), + }, + }, + Units: utils.NewDecimalFromFloat64(0.5), + }, + }, + }, + } + var rplySet string + if err := sBscsItRPC.Call(context.Background(), utils.AdminSv1SetAccount, + argSet, &rplySet); err != nil { + t.Error(err) + } else if rplySet != utils.OK { + t.Errorf("Received: %s", rplySet) + } + argGet := &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "1001"}} + var acntRply utils.Account + if err := sBscsItRPC.Call(context.Background(), utils.AdminSv1GetAccount, + argGet, &acntRply); err != nil { + t.Error(err) + } else if acntRply.Balances["ABSTRACT1"].Units.Compare(utils.NewDecimalFromUsageIgnoreErr("1m")) != 0 || + acntRply.Balances["CONCRETE1"].Units.Compare(utils.NewDecimalFromFloat64(0.5)) != 0 { + t.Errorf("Received: %s", utils.ToJSON(acntRply)) + } + if err := sBscsItRPC.Call(context.Background(), utils.SessionSv1AuthorizeEvent, + &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSBscsItAuthorizeEvent1", + APIOpts: map[string]any{ + utils.MetaAccounts: true, + }, + Event: map[string]any{ + utils.AccountField: "1001", + utils.Destination: "1002", + utils.OriginID: "testSBscsItAuthorizeEvent1", + utils.SetupTime: "2018-01-07T17:00:00Z", + }, + }, &rplyAuth); err != nil { + t.Error(err) + } else if rplyAuth.MaxUsage.Compare(utils.NewDecimalFromUsageIgnoreErr("50s")) != 0 { + t.Errorf("Received: %s", utils.ToJSON(rplyAuth)) + } + + // Balances should not be modified + if err := sBscsItRPC.Call(context.Background(), utils.AdminSv1GetAccount, + argGet, &acntRply); err != nil { + t.Error(err) + } else if acntRply.Balances["ABSTRACT1"].Units.Compare(utils.NewDecimalFromUsageIgnoreErr("1m")) != 0 || + acntRply.Balances["CONCRETE1"].Units.Compare(utils.NewDecimalFromFloat64(0.5)) != 0 { + t.Errorf("Received: %s", utils.ToJSON(acntRply)) + } + + // Available more than requested (1m) + argSet = &utils.AccountWithAPIOpts{ + Account: &utils.Account{ + Tenant: "cgrates.org", + ID: "1001", + FilterIDs: []string{"*string:~*req.Account:1001"}, + Balances: map[string]*utils.Balance{ + "CONCRETE1": &utils.Balance{ + ID: "CONCRETE1", + Type: utils.MetaConcrete, + Weights: utils.DynamicWeights{&utils.DynamicWeight{Weight: 10.0}}, + CostIncrements: []*utils.CostIncrement{ + &utils.CostIncrement{ + Increment: utils.NewDecimalFromUsageIgnoreErr("1s"), + RecurrentFee: utils.NewDecimalFromFloat64(0.01), + }, + }, + Units: utils.NewDecimalFromFloat64(10), + }, + }, + }, + } + if err := sBscsItRPC.Call(context.Background(), utils.AdminSv1SetAccount, + argSet, &rplySet); err != nil { + t.Error(err) + } else if rplySet != utils.OK { + t.Errorf("Received: %s", rplySet) + } + argGet = &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "1001"}} + if err := sBscsItRPC.Call(context.Background(), utils.AdminSv1GetAccount, + argGet, &acntRply); err != nil { + t.Error(err) + } else if acntRply.Balances["CONCRETE1"].Units.Compare(utils.NewDecimalFromFloat64(10.0)) != 0 { + t.Errorf("Received: %s", utils.ToJSON(acntRply)) + } + if err := sBscsItRPC.Call(context.Background(), utils.SessionSv1AuthorizeEvent, + &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSBscsItAuthorizeEvent1", + APIOpts: map[string]any{ + utils.MetaAccounts: true, + }, + Event: map[string]any{ + utils.AccountField: "1001", + utils.Destination: "1002", + utils.OriginID: "testSBscsItAuthorizeEvent1", + utils.SetupTime: "2018-01-07T17:00:00Z", + }, + }, &rplyAuth); err != nil { + t.Error(err) + } else if rplyAuth.MaxUsage.Compare(utils.NewDecimalFromUsageIgnoreErr("1m")) != 0 { + t.Errorf("Received: %s", utils.ToJSON(rplyAuth)) + } + +} + +// tests related to AuthorizeEventWithDigest API +func testSBscsItAuthorizeEventWithDigest(t *testing.T) { + // Available more than requested (1m) + argSet := &utils.AccountWithAPIOpts{ + Account: &utils.Account{ + Tenant: "cgrates.org", + ID: "1001", + FilterIDs: []string{"*string:~*req.Account:1001"}, + Balances: map[string]*utils.Balance{ + "CONCRETE1": &utils.Balance{ + ID: "CONCRETE1", + Type: utils.MetaConcrete, + Weights: utils.DynamicWeights{&utils.DynamicWeight{Weight: 10.0}}, + CostIncrements: []*utils.CostIncrement{ + &utils.CostIncrement{ + Increment: utils.NewDecimalFromUsageIgnoreErr("1s"), + RecurrentFee: utils.NewDecimalFromFloat64(0.01), + }, + }, + Units: utils.NewDecimalFromFloat64(10), + }, + }, + }, + } + var rplySet string + if err := sBscsItRPC.Call(context.Background(), utils.AdminSv1SetAccount, + argSet, &rplySet); err != nil { + t.Error(err) + } else if rplySet != utils.OK { + t.Errorf("Received: %s", rplySet) + } + argGet := &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "1001"}} + var acntRply utils.Account + if err := sBscsItRPC.Call(context.Background(), utils.AdminSv1GetAccount, + argGet, &acntRply); err != nil { + t.Error(err) + } else if acntRply.Balances["CONCRETE1"].Units.Compare(utils.NewDecimalFromFloat64(10.0)) != 0 { + t.Errorf("Received: %s", utils.ToJSON(acntRply)) + } + var rplyAuth V1AuthorizeReplyWithDigest + if err := sBscsItRPC.Call(context.Background(), utils.SessionSv1AuthorizeEventWithDigest, + &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSBscsItAuthorizeEventWithDigest1", + APIOpts: map[string]any{ + utils.MetaAccounts: true, + }, + Event: map[string]any{ + utils.AccountField: "1001", + utils.Destination: "1002", + utils.OriginID: "testSBscsItAuthorizeEventWithDigest1", + utils.SetupTime: "2018-01-07T17:00:00Z", + }, + }, &rplyAuth); err != nil { + t.Error(err) + } else if rplyAuth.MaxUsage != time.Duration(time.Minute).Nanoseconds() { + t.Errorf("Received: %s", utils.ToJSON(rplyAuth)) + } +} + +// tests related to AuthorizeEventWithDigest API +func testSBscsItProcessCDR(t *testing.T) { + // Set the account for CDR + argSet := &utils.AccountWithAPIOpts{ + Account: &utils.Account{ + Tenant: "cgrates.org", + ID: "1001", + FilterIDs: []string{"*string:~*req.Account:1001"}, + Balances: map[string]*utils.Balance{ + "CONCRETE1": &utils.Balance{ + ID: "CONCRETE1", + Type: utils.MetaConcrete, + Weights: utils.DynamicWeights{&utils.DynamicWeight{Weight: 10.0}}, + CostIncrements: []*utils.CostIncrement{ + &utils.CostIncrement{ + Increment: utils.NewDecimalFromUsageIgnoreErr("1s"), + RecurrentFee: utils.NewDecimalFromFloat64(0.01), + }, + }, + Units: utils.NewDecimalFromFloat64(10), + }, + }, + }, + } + var rplySet string + if err := sBscsItRPC.Call(context.Background(), utils.AdminSv1SetAccount, + argSet, &rplySet); err != nil { + t.Error(err) + } else if rplySet != utils.OK { + t.Errorf("Received: %s", rplySet) + } + argGet := &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "1001"}} + var acntRply utils.Account + if err := sBscsItRPC.Call(context.Background(), utils.AdminSv1GetAccount, + argGet, &acntRply); err != nil { + t.Error(err) + } else if acntRply.Balances["CONCRETE1"].Units.Compare(utils.NewDecimalFromFloat64(10.0)) != 0 { + t.Errorf("Received: %s", utils.ToJSON(acntRply)) + } + var rplyAuth V1AuthorizeReplyWithDigest + if err := sBscsItRPC.Call(context.Background(), utils.SessionSv1AuthorizeEventWithDigest, + &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSBscsItAuthorizeEventWithDigest1", + APIOpts: map[string]any{ + utils.MetaAccounts: true, + }, + Event: map[string]any{ + utils.AccountField: "1001", + utils.Destination: "1002", + utils.OriginID: "testSBscsItAuthorizeEventWithDigest1", + utils.SetupTime: "2018-01-07T17:00:00Z", + }, + }, &rplyAuth); err != nil { + t.Error(err) + } else if rplyAuth.MaxUsage != time.Duration(time.Minute).Nanoseconds() { + t.Errorf("Received: %s", utils.ToJSON(rplyAuth)) + } + + var rplyProcCDR string + if err := sBscsItRPC.Call(context.Background(), utils.SessionSv1ProcessCDR, + &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSBscsItAuthorizeEventWithDigest1", + APIOpts: map[string]any{ + utils.MetaAccounts: true, + utils.MetaUsage: "1m30s", + }, + Event: map[string]any{ + utils.AccountField: "1001", + utils.Destination: "1002", + utils.OriginID: "testSBscsItAuthorizeEventWithDigest1", + utils.AnswerTime: "2018-01-07T17:00:00Z", + utils.Usage: "1m30s", + }, + }, &rplyProcCDR); err != nil { + t.Error(err) + } else if rplyProcCDR != utils.OK { + t.Errorf("Received: %s", rplyProcCDR) + } + + var rplyGetCDRs []*utils.CDR + if err := sBscsItRPC.Call(context.Background(), utils.AdminSv1GetCDRs, + &utils.CDRFilters{}, &rplyGetCDRs); err != nil { + t.Error(err) + } else if len(rplyGetCDRs) == 0 || + rplyGetCDRs[0].Opts[utils.MetaAccountSCost].(map[string]any)[utils.Abstracts] != 90000000000.0 || + rplyGetCDRs[0].Opts[utils.MetaAccountSCost].(map[string]any)[utils.Concretes] != 0.9 { + t.Errorf("Received: %s", utils.ToJSON(rplyGetCDRs)) + } + + if err := sBscsItRPC.Call(context.Background(), utils.AdminSv1GetAccount, + argGet, &acntRply); err != nil { + t.Error(err) + } else if acntRply.Balances["CONCRETE1"].Units.Compare(utils.NewDecimalFromFloat64(9.1)) != 0 { + t.Errorf("Received: %s", utils.ToJSON(acntRply)) + } +} + +func testSBscsItStopCgrEngine(t *testing.T) { + if err := engine.KillEngine(*utils.WaitRater); err != nil { + t.Error(err) + } +} diff --git a/sessions/libsessions.go b/sessions/libsessions.go index 0c529bd3e..4ce58a180 100644 --- a/sessions/libsessions.go +++ b/sessions/libsessions.go @@ -502,7 +502,7 @@ func (v1AuthReply *V1AuthorizeReply) AsNavigableMap() map[string]*utils.DataNode type V1AuthorizeReplyWithDigest struct { AttributesDigest *string ResourceAllocation *string - MaxUsage float64 // special treat returning time.Duration.Seconds() + MaxUsage int64 // special treat returning time.Duration.Nanoseconds() RoutesDigest *string Thresholds *string StatQueues *string diff --git a/sessions/sessions.go b/sessions/sessions.go index 17762645f..bfe6deed2 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -1414,15 +1414,15 @@ func (sS *SessionS) chargeEvent(ctx *context.Context, cgrEv *utils.CGREvent, for return // returns here the maxUsage from update } -// accounSMaxAbstracts computes the maximum abstract units for the events provided as reply from the ChargerS +// accounSMaxAbstracts computes the maximum abstract units for the events received func (sS *SessionS) accounSMaxAbstracts(ctx *context.Context, cgrEvs map[string]*utils.CGREvent) (maxAbstracts map[string]*utils.Decimal, err error) { - if len(sS.cfg.SessionSCfg().AttributeSConns) == 0 { + if len(sS.cfg.SessionSCfg().AccountSConns) == 0 { return nil, utils.NewErrNotConnected(utils.AccountS) } maxAbstracts = make(map[string]*utils.Decimal) for runID, cgrEv := range cgrEvs { acntCost := new(utils.EventCharges) - if err = sS.connMgr.Call(ctx, sS.cfg.SessionSCfg().AttributeSConns, // Fix Here with AccountS + if err = sS.connMgr.Call(ctx, sS.cfg.SessionSCfg().AccountSConns, utils.AccountSv1MaxAbstracts, cgrEv, &acntCost); err != nil { return } @@ -1516,236 +1516,6 @@ func (sS *SessionS) BiRPCv1ReplicateSessions(ctx *context.Context, return } -// BiRPCv1AuthorizeEvent performs authorization for CGREvent based on specific components -func (sS *SessionS) BiRPCv1AuthorizeEvent(ctx *context.Context, - args *utils.CGREvent, authReply *V1AuthorizeReply) (err error) { - if args == nil { - return utils.NewErrMandatoryIeMissing(utils.CGREventString) - } - if args.Event == nil { - return utils.NewErrMandatoryIeMissing(utils.Event) - } - var withErrors bool - if args.ID == "" { - args.ID = utils.GenUUID() - } - if args.Tenant == "" { - args.Tenant = sS.cfg.GeneralCfg().DefaultTenant - } - - // RPC caching - if sS.cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { - cacheKey := utils.ConcatenatedKey(utils.SessionSv1AuthorizeEvent, args.ID) - refID := guardian.Guardian.GuardIDs("", - sS.cfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(refID) - - if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { - cachedResp := itm.(*utils.CachedRPCResponse) - if cachedResp.Error == nil { - *authReply = *cachedResp.Result.(*V1AuthorizeReply) - } - return cachedResp.Error - } - defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey, - &utils.CachedRPCResponse{Result: authReply, Error: err}, - nil, true, utils.NonTransactional) - } - // end of RPC caching - dP := args.AsDataProvider() - var attrS bool - if attrS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Attributes, - config.SessionsAttributesDftOpt, utils.MetaAttributes); err != nil { - return - } - var routeS bool - if routeS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Routes, - config.SessionsRoutesDftOpt, utils.MetaRoutes); err != nil { - return - } - var maxUsage bool - if maxUsage, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.MaxUsage, - config.SessionsMaxUsageDftOpt, utils.OptsSesMaxUsage); err != nil { - return - } - var resAuthorize bool - if resAuthorize, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.ResourcesAuthorize, - config.SessionsResourcesAuthorizeDftOpt, utils.OptsSesResourceSAuthorize); err != nil { - return - } - if !(attrS || maxUsage || resAuthorize || routeS) { - return // Nothing to do - } - if args.APIOpts == nil { - args.APIOpts = make(map[string]any) - } - if attrS { - if args.APIOpts == nil { - args.APIOpts = make(map[string]any) - } - rplyAttr, err := sS.processAttributes(ctx, args) - if err == nil { - args = rplyAttr.CGREvent - authReply.Attributes = &rplyAttr - } else if err.Error() != utils.ErrNotFound.Error() { - return utils.NewErrAttributeS(err) - } - } - runEvents := make(map[string]*utils.CGREvent) - - var chrgS bool - if chrgS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Chargers, - config.SessionsChargersDftOpt, utils.MetaChargers); err != nil { - return - } - if chrgS { - var chrgrs []*engine.ChrgSProcessEventReply - if chrgrs, err = sS.processChargerS(ctx, args); err != nil { - return - } - for _, chrgr := range chrgrs { - runEvents[chrgr.ChargerSProfile] = chrgr.CGREvent - } - } else { - runEvents[utils.MetaRaw] = args - } - var acntS bool - if acntS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Accounts, - config.SessionsAccountsDftOpt, utils.MetaAccounts); err != nil { - return - } - if acntS { - var maxAbstracts map[string]*utils.Decimal - if maxAbstracts, err = sS.accounSMaxAbstracts(ctx, runEvents); err != nil { - authReply.MaxUsage = getMaxUsageFromRuns(maxAbstracts) - } - } - if utils.OptAsBool(args.APIOpts, utils.OptsSesResourceSAuthorize) { - if len(sS.cfg.SessionSCfg().ResourceSConns) == 0 { - return utils.NewErrNotConnected(utils.ResourceS) - } - originID, _ := args.FieldAsString(utils.OriginID) - if originID == "" { - originID = utils.UUIDSha1Prefix() - } - args.APIOpts[utils.OptsResourcesUsageID] = originID - args.APIOpts[utils.OptsResourcesUnits] = 1 - var allocMsg string - if err = sS.connMgr.Call(ctx, sS.cfg.SessionSCfg().ResourceSConns, utils.ResourceSv1AuthorizeResources, - args, &allocMsg); err != nil { - return utils.NewErrResourceS(err) - } - authReply.ResourceAllocation = &allocMsg - } - if routeS { - routesReply, err := sS.getRoutes(ctx, args.Clone()) - if err != nil { - return err - } - if routesReply != nil { - authReply.RouteProfiles = routesReply - } - } - var thdS bool - if thdS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Thresholds, - config.SessionsThresholdsDftOpt, utils.MetaThresholds); err != nil { - return - } - if thdS { - tIDs, err := sS.processThreshold(ctx, args, true) - if err != nil && err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", - utils.SessionS, err.Error(), args)) - withErrors = true - } - authReply.ThresholdIDs = &tIDs - } - var stS bool - if stS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Stats, - config.SessionsStatsDftOpt, utils.MetaStats); err != nil { - return - } - if stS { - sIDs, err := sS.processStats(ctx, args, false) - if err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing event %+v with StatS.", - utils.SessionS, err.Error(), args)) - withErrors = true - } - authReply.StatQueueIDs = &sIDs - } - if withErrors { - err = utils.ErrPartiallyExecuted - } - return -} - -// BiRPCv1AuthorizeEventWithDigest performs authorization for CGREvent based on specific components -// returning one level fields instead of multiple ones returned by BiRPCv1AuthorizeEvent -func (sS *SessionS) BiRPCv1AuthorizeEventWithDigest(ctx *context.Context, - args *utils.CGREvent, authReply *V1AuthorizeReplyWithDigest) (err error) { - var initAuthRply V1AuthorizeReply - if err = sS.BiRPCv1AuthorizeEvent(ctx, args, &initAuthRply); err != nil { - return - } - dP := args.AsDataProvider() - var attrS bool - if attrS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Attributes, - config.SessionsAttributesDftOpt, utils.MetaAttributes); err != nil { - return - } - if attrS && initAuthRply.Attributes != nil { - authReply.AttributesDigest = utils.StringPointer(initAuthRply.Attributes.Digest()) - } - var resourcesAuthorize bool - if resourcesAuthorize, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.ResourcesAuthorize, - config.SessionsResourcesAuthorizeDftOpt, utils.OptsSesResourceSAuthorize); err != nil { - return - } - if resourcesAuthorize { - authReply.ResourceAllocation = initAuthRply.ResourceAllocation - } - var acntS bool - if acntS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Accounts, - config.SessionsAccountsDftOpt, utils.MetaAccounts); err != nil { - return - } - if acntS { - maxDur, _ := initAuthRply.MaxUsage.Duration() - authReply.MaxUsage = maxDur.Seconds() - } - var routeS bool - if routeS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Routes, - config.SessionsRoutesDftOpt, utils.MetaRoutes); err != nil { - return - } - if routeS { - authReply.RoutesDigest = utils.StringPointer(initAuthRply.RouteProfiles.Digest()) - } - var thdS bool - if thdS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Thresholds, - config.SessionsThresholdsDftOpt, utils.MetaThresholds); err != nil { - return - } - if thdS { - authReply.Thresholds = utils.StringPointer( - strings.Join(*initAuthRply.ThresholdIDs, utils.FieldsSep)) - } - var stS bool - if stS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Stats, - config.SessionsStatsDftOpt, utils.MetaStats); err != nil { - return - } - if stS { - authReply.StatQueues = utils.StringPointer( - strings.Join(*initAuthRply.StatQueueIDs, utils.FieldsSep)) - } - return -} - // BiRPCv1InitiateSession initiates a new session func (sS *SessionS) BiRPCv1InitiateSession(ctx *context.Context, args *utils.CGREvent, rply *V1InitSessionReply) (err error) { @@ -2195,46 +1965,6 @@ func (sS *SessionS) BiRPCv1TerminateSession(ctx *context.Context, return } -// BiRPCv1ProcessCDR sends the CDR to CDRs -func (sS *SessionS) BiRPCv1ProcessCDR(ctx *context.Context, - cgrEv *utils.CGREvent, rply *string) (err error) { - if cgrEv.Event == nil { - return utils.NewErrMandatoryIeMissing(utils.Event) - } - if cgrEv.ID == utils.EmptyString { - cgrEv.ID = utils.GenUUID() - } - if cgrEv.Tenant == utils.EmptyString { - cgrEv.Tenant = sS.cfg.GeneralCfg().DefaultTenant - } - - // RPC caching - if sS.cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { - cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessCDR, cgrEv.ID) - refID := guardian.Guardian.GuardIDs("", - sS.cfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(refID) - - if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { - cachedResp := itm.(*utils.CachedRPCResponse) - if cachedResp.Error == nil { - *rply = *cachedResp.Result.(*string) - } - return cachedResp.Error - } - defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey, - &utils.CachedRPCResponse{Result: rply, Error: err}, - nil, true, utils.NonTransactional) - } - // end of RPC caching - // in case that source don't exist add it - if _, has := cgrEv.Event[utils.Source]; !has { - cgrEv.Event[utils.Source] = utils.MetaSessionS - } - - return sS.processCDR(ctx, cgrEv, rply) -} - // BiRPCv1ProcessMessage processes one event with the right subsystems based on arguments received func (sS *SessionS) BiRPCv1ProcessMessage(ctx *context.Context, args *utils.CGREvent, rply *V1ProcessMessageReply) (err error) { diff --git a/utils/consts.go b/utils/consts.go index a11701fce..41a0446c3 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -854,6 +854,7 @@ const ( MetaCostIncrement = "*costIncrement" Length = "Length" V1Prfx = "V1" + Ping = "Ping" // dns DNSQueryType = "QueryType" diff --git a/utils/decimal.go b/utils/decimal.go index 96f7f7421..a6b40e16f 100644 --- a/utils/decimal.go +++ b/utils/decimal.go @@ -171,6 +171,12 @@ func NewDecimalFromUsage(u string) (d *Decimal, err error) { return } +// NewDecimalFromUsage is a constructor for Decimal out of unit represents as string +func NewDecimalFromUsageIgnoreErr(u string) (d *Decimal) { + d, _ = NewDecimalFromUsage(u) + return +} + // NewDecimal is a constructor for Decimal, following the one of decimal.Big func NewDecimal(value int64, scale int) *Decimal { return &Decimal{decimal.WithContext(DecimalContext).SetMantScale(value, scale)}