diff --git a/apier/v1/attributes.go b/apier/v1/attributes.go index 45c8659f3..9ac336721 100644 --- a/apier/v1/attributes.go +++ b/apier/v1/attributes.go @@ -126,7 +126,7 @@ func (alSv1 *AttributeSv1) ProcessEvent(args *engine.AttrArgsProcessEvent, return alSv1.attrS.V1ProcessEvent(args, reply) } -func (alSv1 *AttributeSv1) Ping(ign string, reply *string) error { +func (alSv1 *AttributeSv1) Ping(ign *utils.CGREvent, reply *string) error { *reply = utils.Pong return nil } diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index 777af64f1..4a47992ee 100755 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -19,6 +19,7 @@ along with this program. If not, see package v1 import ( + "github.com/cgrates/cgrates/dispatchers" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -178,7 +179,7 @@ func (dSup *DispatcherSupplierSv1) Ping(ign string, reply *string) error { func (dSup *DispatcherSupplierSv1) GetSuppliers(args *dispatchers.ArgsGetSuppliersWithApiKey, reply *engine.SortedSuppliers) error { return dSup.dSup.SupplierSv1GetSuppliers(args, reply) -} +}*/ func NewDispatcherAttributeSv1(dps *dispatchers.DispatcherService) *DispatcherAttributeSv1 { return &DispatcherAttributeSv1{dA: dps} @@ -189,9 +190,15 @@ type DispatcherAttributeSv1 struct { dA *dispatchers.DispatcherService } +// Call implements rpcclient.RpcClientConnection interface for internal RPC +// func (alSv1 *DispatcherAttributeSv1) Call(serviceMethod string, +// args interface{}, reply interface{}) error { +// return utils.APIerRPCCall(alSv1, serviceMethod, args, reply) +// } + // Ping implements SupplierSv1Ping -func (dA *DispatcherAttributeSv1) Ping(ign string, reply *string) error { - return dA.dA.AttributeSv1Ping(ign, reply) +func (dA *DispatcherAttributeSv1) Ping(args *dispatchers.CGREvWithApiKey, reply *string) error { + return dA.dA.AttributeSv1Ping(args, reply) } // GetAttributeForEvent implements AttributeSv1GetAttributeForEvent @@ -206,6 +213,7 @@ func (dA *DispatcherAttributeSv1) ProcessEvent(args *dispatchers.ArgsAttrProcess return dA.dA.AttributeSv1ProcessEvent(args, reply) } +/* func NewDispatcherSessionSv1(dps *dispatchers.DispatcherService) *DispatcherSessionSv1 { return &DispatcherSessionSv1{dS: dps} } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index a65858419..3157e81a0 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -965,7 +965,7 @@ func loaderService(cacheS *engine.CacheS, cfg *config.CGRConfig, } // startDispatcherService fires up the DispatcherS -func startDispatcherService(internalDispatcherSChan, +func startDispatcherService(internalDispatcherSChan chan *dispatchers.DispatcherService, intAttrSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, @@ -1041,10 +1041,12 @@ func startDispatcherService(internalDispatcherSChan, server.RpcRegisterName(utils.SupplierSv1, v1.NewDispatcherSupplierSv1(dspS)) } - if !cfg.AttributeSCfg().Enabled && len(cfg.DispatcherSCfg().AttrSConns) != 0 { - server.RpcRegisterName(utils.AttributeSv1, - v1.NewDispatcherAttributeSv1(dspS)) - } + */ + // if !cfg.AttributeSCfg().Enabled { //dispatcer enable all methos + attrv1 := v1.NewDispatcherAttributeSv1(dspS) + server.RpcRegisterName(utils.AttributeSv1, attrv1) + // } + /* if !cfg.SessionSCfg().Enabled && len(cfg.DispatcherSCfg().SessionSConns) != 0 { server.RpcRegisterName(utils.SessionSv1, v1.NewDispatcherSessionSv1(dspS)) @@ -1054,6 +1056,7 @@ func startDispatcherService(internalDispatcherSChan, v1.NewDispatcherChargerSv1(dspS)) } */ + internalDispatcherSChan <- dspS } // startAnalyzerService fires up the AnalyzerS @@ -1084,8 +1087,8 @@ func startRpc(server *utils.Server, internalRaterChan, internalCdrSChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, internalStatSChan, internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan, - internalSMGChan, internalDispatcherSChan, internalAnalyzerSChan chan rpcclient.RpcClientConnection, - exitChan chan bool) { + internalSMGChan, internalAnalyzerSChan chan rpcclient.RpcClientConnection, + internalDispatcherSChan chan *dispatchers.DispatcherService, exitChan chan bool) { select { // Any of the rpc methods will unlock listening to rpc requests case resp := <-internalRaterChan: internalRaterChan <- resp @@ -1336,7 +1339,7 @@ func main() { if cfg.RalsCfg().RALsEnabled || cfg.PubSubServerEnabled || cfg.AliasesServerEnabled || cfg.UserServerEnabled || cfg.SchedulerCfg().Enabled || cfg.AttributeSCfg().Enabled || cfg.ResourceSCfg().Enabled || cfg.StatSCfg().Enabled || - cfg.ThresholdSCfg().Enabled || cfg.SupplierSCfg().Enabled { // Some services can run without db, ie: SessionS or CDRC + cfg.ThresholdSCfg().Enabled || cfg.SupplierSCfg().Enabled || cfg.DispatcherSCfg().Enabled { // Some services can run without db, ie: SessionS or CDRC dm, err = engine.ConfigureDataStorage(cfg.DataDbCfg().DataDbType, cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort, cfg.DataDbCfg().DataDbName, cfg.DataDbCfg().DataDbUser, @@ -1412,7 +1415,7 @@ func main() { internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1) internalSupplierSChan := make(chan rpcclient.RpcClientConnection, 1) filterSChan := make(chan *engine.FilterS, 1) - internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1) + internalDispatcherSChan := make(chan *dispatchers.DispatcherService, 1) internalAnalyzerSChan := make(chan rpcclient.RpcClientConnection, 1) // Start ServiceManager @@ -1546,7 +1549,7 @@ func main() { internalStatSChan, internalAttributeSChan, internalChargerSChan, internalThresholdSChan, internalSupplierSChan, - internalSMGChan, internalDispatcherSChan, internalAnalyzerSChan, exitChan) + internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan, exitChan) <-exitChan if *cpuProfDir != "" { // wait to end cpuProfiling diff --git a/config/config.go b/config/config.go index 4aaf58847..bfbe991e3 100755 --- a/config/config.go +++ b/config/config.go @@ -701,6 +701,9 @@ func (self *CGRConfig) checkConfigSanity() error { } } if self.dispatcherSCfg.Enabled { + if self.attributeSCfg.Enabled { + return fmt.Errorf("<%s> cannot start in tandem with <%s>", utils.DispatcherS, utils.AttributeS) + } if len(self.dispatcherSCfg.Conns) == 0 { return fmt.Errorf("<%s> no connections defined", utils.DispatcherS) } diff --git a/dispatchers/attributes.go b/dispatchers/attributes.go index 140e1f344..f367a31c6 100755 --- a/dispatchers/attributes.go +++ b/dispatchers/attributes.go @@ -24,17 +24,17 @@ import ( ) // AttributeSv1Ping interogates AttributeS server responsible to process the event -func (dS *DispatcherService) AttributeSv1Ping(args *ArgsAttrProcessEventWithApiKey, +func (dS *DispatcherService) AttributeSv1Ping(args *CGREvWithApiKey, reply *string) (err error) { if dS.attrS != nil { if err = dS.authorize(utils.AttributeSv1Ping, - args.AttrArgsProcessEvent.CGREvent.Tenant, - args.APIKey, args.AttrArgsProcessEvent.CGREvent.Time); err != nil { + args.CGREvent.Tenant, + args.APIKey, args.CGREvent.Time); err != nil { return } } return dS.Dispatch(&args.CGREvent, utils.MetaAttributes, - utils.AttributeSv1Ping, args.AttrArgsProcessEvent, reply) + utils.AttributeSv1Ping, args.CGREvent, reply) } // AttributeSv1GetAttributeForEvent is the dispatcher method for AttributeSv1.GetAttributeForEvent @@ -52,17 +52,16 @@ func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(args *ArgsAttrProc } -/* func (dS *DispatcherService) AttributeSv1ProcessEvent(args *ArgsAttrProcessEventWithApiKey, reply *engine.AttrSProcessEventReply) (err error) { - if dS.attrS == nil { - return utils.NewErrNotConnected(utils.AttributeS) - } - if err = dS.authorize(utils.AttributeSv1ProcessEvent, args.AttrArgsProcessEvent.CGREvent.Tenant, - args.APIKey, args.AttrArgsProcessEvent.CGREvent.Time); err != nil { - return - } - return dS.attrS.Call(utils.AttributeSv1ProcessEvent, args.AttrArgsProcessEvent, reply) + if dS.attrS != nil { + if err = dS.authorize(utils.AttributeSv1ProcessEvent, + args.AttrArgsProcessEvent.CGREvent.Tenant, + args.APIKey, args.AttrArgsProcessEvent.CGREvent.Time); err != nil { + return + } + } + return dS.Dispatch(&args.CGREvent, utils.MetaAttributes, + utils.AttributeSv1ProcessEvent, args.AttrArgsProcessEvent, reply) } -*/ diff --git a/dispatchers/attributes_it_test.go b/dispatchers/attributes_it_test.go index 134db03e8..1f50a1ae6 100755 --- a/dispatchers/attributes_it_test.go +++ b/dispatchers/attributes_it_test.go @@ -46,17 +46,17 @@ var sTestsDspAttr = []func(t *testing.T){ testDspAttrInitCfg, testDspAttrInitDataDb, testDspAttrResetStorDb, - testDspAttrStartEngine, + // testDspAttrStartEngine, testDspAttrRPCConn, - testDspAttrPing, testDspAttrLoadData, + testDspAttrPing, testDspAttrAddAttributesWithPermision, testDspAttrTestMissingApiKey, testDspAttrTestUnknownApiKey, testDspAttrTestAuthKey, testDspAttrAddAttributesWithPermision2, testDspAttrTestAuthKey2, - testDspAttrKillEngine, + // testDspAttrKillEngine, } //Test start here @@ -68,7 +68,7 @@ func TestDspAttributeS(t *testing.T) { func testDspAttrInitCfg(t *testing.T) { var err error - dspAttrCfgPath = path.Join(dspDataDir, "conf", "samples", "dispatcher") + dspAttrCfgPath = path.Join(dspDataDir, "conf", "samples", "dispatchers") dspAttrCfg, err = config.NewCGRConfigFromFolder(dspAttrCfgPath) if err != nil { t.Error(err) @@ -118,33 +118,40 @@ func testDspAttrRPCConn(t *testing.T) { if err != nil { t.Fatal(err) } - -} - -func testDspAttrPing(t *testing.T) { - var reply string - if err := instAttrRPC.Call(utils.AttributeSv1Ping, "", &reply); err != nil { - t.Error(err) - } else if reply != utils.Pong { - t.Errorf("Received: %s", reply) - } - if err := dspAttrRPC.Call(utils.AttributeSv1Ping, "", &reply); err != nil { - t.Error(err) - } else if reply != utils.Pong { - t.Errorf("Received: %s", reply) - } } func testDspAttrLoadData(t *testing.T) { var reply string attrs := &utils.AttrLoadTpFromFolder{ - FolderPath: path.Join(dspDataDir, "tariffplans", "tutorial")} + FolderPath: path.Join(dspDataDir, "tariffplans", "dispatchers")} if err := instAttrRPC.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { t.Error(err) } time.Sleep(500 * time.Millisecond) } +func testDspAttrPing(t *testing.T) { + var reply string + if err := instAttrRPC.Call(utils.AttributeSv1Ping, &utils.CGREvent{}, &reply); err != nil { + t.Error(err) + } else if reply != utils.Pong { + t.Errorf("Received: %s", reply) + } + if dspAttrRPC == nil { + t.Fatal(dspAttrRPC) + } + if err := dspAttrRPC.Call(utils.AttributeSv1Ping, &CGREvWithApiKey{ + CGREvent: utils.CGREvent{ + Tenant: "cgrates.org", + }, + APIKey: "attr12345", + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.Pong { + t.Errorf("Received: %s", reply) + } +} + func testDspAttrAddAttributesWithPermision(t *testing.T) { alsPrf := &engine.AttributeProfile{ Tenant: "cgrates.org", @@ -197,7 +204,7 @@ func testDspAttrTestMissingApiKey(t *testing.T) { var attrReply *engine.AttributeProfile if err := dspAttrRPC.Call(utils.AttributeSv1GetAttributeForEvent, args, &attrReply); err == nil || err.Error() != utils.NewErrMandatoryIeMissing(utils.APIKey).Error() { - t.Error(err) + t.Errorf("Error:%v rply=%s", err, utils.ToJSON(attrReply)) } } @@ -271,7 +278,9 @@ func testDspAttrAddAttributesWithPermision2(t *testing.T) { &utils.TenantID{Tenant: "cgrates.org", ID: "AuthKey"}, &reply); err != nil { t.Error(err) } - reply.Compile() + if reply != nil { + reply.Compile() + } if !reflect.DeepEqual(alsPrf, reply) { t.Errorf("Expecting : %+v, received: %+v", alsPrf, reply) } @@ -310,7 +319,9 @@ func testDspAttrTestAuthKey2(t *testing.T) { args, &attrReply); err != nil { t.Error(err) } - attrReply.Compile() + if attrReply != nil { + attrReply.Compile() + } if !reflect.DeepEqual(eAttrPrf, attrReply) { t.Errorf("Expecting: %s, received: %s", utils.ToJSON(eAttrPrf), utils.ToJSON(attrReply)) } diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 407556a48..acc549082 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -76,14 +76,25 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent, if subsys != "" { idxKeyPrfx = utils.ConcatenatedKey(ev.Tenant, subsys) } - matchingPrfls := make(map[string]*engine.DispatcherProfile) + var matchedPrlf *engine.DispatcherProfile prflIDs, err := engine.MatchingItemIDsForEvent(ev.Event, dS.cfg.DispatcherSCfg().StringIndexedFields, dS.cfg.DispatcherSCfg().PrefixIndexedFields, dS.dm, utils.CacheDispatcherFilterIndexes, idxKeyPrfx, dS.cfg.FilterSCfg().IndexedSelects) if err != nil { - return nil, err + // return nil, err + if err != utils.ErrNotFound { + return nil, err + } + prflIDs, err = engine.MatchingItemIDsForEvent(ev.Event, + dS.cfg.DispatcherSCfg().StringIndexedFields, + dS.cfg.DispatcherSCfg().PrefixIndexedFields, + dS.dm, utils.CacheDispatcherFilterIndexes, + anyIdxPrfx, dS.cfg.FilterSCfg().IndexedSelects) + if err != nil { + return nil, err + } } for prflID := range prflIDs { prfl, err := dS.dm.GetDispatcherProfile(ev.Tenant, prflID, true, true, utils.NonTransactional) @@ -91,17 +102,7 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent, if err != utils.ErrNotFound { return nil, err } - if idxKeyPrfx == anyIdxPrfx { - continue // already checked *any - } - // check *any as subsystem - if prfl, err = dS.dm.GetDispatcherProfile(ev.Tenant, prflID, true, true, utils.NonTransactional); err != nil { - if err == utils.ErrNotFound { - continue - } - return nil, err - } - + continue } if prfl.ActivationInterval != nil && ev.Time != nil && !prfl.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active @@ -113,20 +114,13 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent, } else if !pass { continue } - matchingPrfls[prflID] = prfl + if matchedPrlf == nil || prfl.Weight > matchedPrlf.Weight { + matchedPrlf = prfl + } } - if len(matchingPrfls) == 0 { + if matchedPrlf == nil { return nil, utils.ErrNotFound } - // All good, convert from Map to Slice so we can sort - prfls := make(engine.DispatcherProfiles, len(matchingPrfls)) - i := 0 - for _, prfl := range matchingPrfls { - prfls[i] = prfl - i++ - } - prfls.Sort() - matchedPrlf := prfls[0] // only use the first profile tntID := matchedPrlf.TenantID() // get or build the Dispatcher for the config if x, ok := engine.Cache.Get(utils.CacheDispatchers, @@ -166,17 +160,19 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, connID := d.NextConnID() conn, has := dS.conns[connID] if !has { - utils.NewErrDispatcherS( + err = utils.NewErrDispatcherS( fmt.Errorf("no connection with id: <%s>", connID)) + continue } - if err = conn.Call(serviceMethod, args, reply); !utils.IsNetworkError(err) { - break + if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) { + continue } if ev.RouteID != nil && *ev.RouteID != "" { // cache the discovered route engine.Cache.Set(utils.CacheDispatcherRoutes, *ev.RouteID, connID, nil, true, utils.EmptyString) } + break } return } diff --git a/engine/datamanager.go b/engine/datamanager.go index d4ccdc58b..a323e4db1 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -1319,20 +1319,30 @@ func (dm *DataManager) SetDispatcherProfile(dpp *DispatcherProfile, withIndex bo } if withIndex { if oldDpp != nil { - var needsRemove bool - for _, fltrID := range oldDpp.FilterIDs { - if !utils.IsSliceMember(dpp.FilterIDs, fltrID) { + for _, ctx := range oldDpp.Subsystems { + var needsRemove bool + if !utils.IsSliceMember(dpp.Subsystems, ctx) { needsRemove = true + } else { + for _, fltrID := range oldDpp.FilterIDs { + if !utils.IsSliceMember(dpp.FilterIDs, fltrID) { + needsRemove = true + } + } } - } - if needsRemove { - if err = NewFilterIndexer(dm, utils.DispatcherProfilePrefix, - dpp.Tenant).RemoveItemFromIndex(dpp.Tenant, dpp.ID, oldDpp.FilterIDs); err != nil { - return + if needsRemove { + if err = NewFilterIndexer(dm, utils.DispatcherProfilePrefix, + utils.ConcatenatedKey(dpp.Tenant, ctx)).RemoveItemFromIndex(dpp.Tenant, dpp.ID, oldDpp.FilterIDs); err != nil { + return + } } } } - return createAndIndex(utils.DispatcherProfilePrefix, dpp.Tenant, utils.EmptyString, dpp.ID, dpp.FilterIDs, dm) + for _, ctx := range dpp.Subsystems { + if err = createAndIndex(utils.DispatcherProfilePrefix, dpp.Tenant, ctx, dpp.ID, dpp.FilterIDs, dm); err != nil { + return + } + } } return } @@ -1352,7 +1362,12 @@ func (dm *DataManager) RemoveDispatcherProfile(tenant, id string, return utils.ErrNotFound } if withIndex { - return NewFilterIndexer(dm, utils.DispatcherProfilePrefix, tenant).RemoveItemFromIndex(tenant, id, oldDpp.FilterIDs) + for _, ctx := range oldDpp.Subsystems { + if err = NewFilterIndexer(dm, utils.DispatcherProfilePrefix, + utils.ConcatenatedKey(tenant, ctx)).RemoveItemFromIndex(tenant, id, oldDpp.FilterIDs); err != nil { + return + } + } } return }