diff --git a/apier/v1/cdrs.go b/apier/v1/cdrs.go index e41aa84c6..731a2247a 100644 --- a/apier/v1/cdrs.go +++ b/apier/v1/cdrs.go @@ -117,3 +117,8 @@ func (cdrSv1 *CDRsV1) CountCDRs(args *utils.RPCCDRsFilter, reply *int64) error { func (cdrSv1 *CDRsV1) GetCDRs(args utils.RPCCDRsFilter, reply *[]*engine.CDR) error { return cdrSv1.CDRs.V1GetCDRs(args, reply) } + +func (cdrSv1 *CDRsV1) Ping(ign *utils.CGREvent, reply *string) error { + *reply = utils.Pong + return nil +} diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index c27fad907..9f9dd7714 100755 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -671,3 +671,17 @@ func (dSv1 DispatcherSv1) GetProfileForEvent(ev *dispatchers.DispatcherEvent, dPrfl *engine.DispatcherProfile) error { return dSv1.dS.V1GetProfileForEvent(ev, dPrfl) } + +func NewDispatcherSCDRsV1(dps *dispatchers.DispatcherService) *DispatcherSCDRsV1 { + return &DispatcherSCDRsV1{dS: dps} +} + +// Exports RPC from CDRsV1 +type DispatcherSCDRsV1 struct { + dS *dispatchers.DispatcherService +} + +// Ping used to detreminate if component is active +func (dS *DispatcherSCDRsV1) Ping(args *utils.CGREventWithArgDispatcher, reply *string) error { + return dS.dS.CDRsV1Ping(args, reply) +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 5b3426eac..e42c60831 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -1037,6 +1037,9 @@ func startDispatcherService(internalDispatcherSChan chan *dispatchers.Dispatcher server.RpcRegisterName(utils.SchedulerSv1, v1.NewDispatcherSchedulerSv1(dspS)) + server.RpcRegisterName(utils.CDRsV1, + v1.NewDispatcherSCDRsV1(dspS)) + internalDispatcherSChan <- dspS } diff --git a/dispatchers/cdrs.go b/dispatchers/cdrs.go new file mode 100644 index 000000000..2fa7d5c1e --- /dev/null +++ b/dispatchers/cdrs.go @@ -0,0 +1,38 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package dispatchers + +import "github.com/cgrates/cgrates/utils" + +// CacheSv1Ping interogates CacheSv1 server responsible to process the event +func (dS *DispatcherService) CDRsV1Ping(args *utils.CGREventWithArgDispatcher, + reply *string) (err error) { + if args.ArgDispatcher == nil { + return utils.NewErrMandatoryIeMissing("ArgDispatcher") + } + if dS.attrS != nil { + if err = dS.authorize(utils.CDRsV1Ping, + args.CGREvent.Tenant, + args.APIKey, args.CGREvent.Time); err != nil { + return + } + } + return dS.Dispatch(args.CGREvent, utils.MetaCDRs, args.RouteID, + utils.CDRsV1Ping, args.CGREvent, reply) +} diff --git a/engine/cdrs.go b/engine/cdrs.go index ff71668c4..ed2b8c7f7 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -279,6 +279,8 @@ func (cdrS *CDRServer) rateCDRWithErr(cdr *CDR) (ratedCDRs []*CDR) { func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREvent, attrS, store, export, thdS, statS bool) (err error) { var chrgrs []*ChrgSProcessEventReply + //in case of internal connection what should we do here ? + // if err = cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent, cgrEv, &chrgrs); err != nil { utils.Logger.Warning( @@ -317,11 +319,29 @@ func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREvent, // statSProcessEvent will send the event to StatS if the connection is configured func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREvent) (err error) { var rplyEv AttrSProcessEventReply + attrArgs := &AttrArgsProcessEvent{ + Context: utils.StringPointer(utils.MetaCDRs), + CGREvent: *cgrEv} + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + attrArgs.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + attrArgs.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + attrArgs.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } if err = cdrS.attrS.Call(utils.AttributeSv1ProcessEvent, - &AttrArgsProcessEvent{ - Context: utils.StringPointer(utils.MetaCDRs), - CGREvent: *cgrEv}, - &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 { + attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 { *cgrEv = *rplyEv.CGREvent } else if err.Error() == utils.ErrNotFound.Error() { err = nil // cancel ErrNotFound @@ -332,8 +352,27 @@ func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREvent) (err error) { // thdSProcessEvent will send the event to ThresholdS if the connection is configured func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREvent) { var tIDs []string + thArgs := &ArgsProcessEvent{CGREvent: *cgrEv} + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + thArgs.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + thArgs.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + thArgs.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } if err := cdrS.thdS.Call(utils.ThresholdSv1ProcessEvent, - &ArgsProcessEvent{CGREvent: *cgrEv}, &tIDs); err != nil && + thArgs, &tIDs); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing CDR event %+v with thdS.", @@ -345,8 +384,27 @@ func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREvent) { // statSProcessEvent will send the event to StatS if the connection is configured func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREvent) { var reply []string + statArgs := &StatsArgsProcessEvent{CGREvent: *cgrEv} + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + statArgs.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + statArgs.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + statArgs.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } if err := cdrS.statS.Call(utils.StatSv1ProcessEvent, - &StatsArgsProcessEvent{CGREvent: *cgrEv}, &reply); err != nil && + statArgs, &reply); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s.", diff --git a/engine/chargers.go b/engine/chargers.go index b3762acd1..5ac550508 100644 --- a/engine/chargers.go +++ b/engine/chargers.go @@ -151,7 +151,6 @@ func (cS *ChargerService) processEvent(cgrEv *utils.CGREvent) (rply []*ChrgSProc args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) } } - var evReply AttrSProcessEventReply if err = cS.attrS.Call(utils.AttributeSv1ProcessEvent, args, &evReply); err != nil { diff --git a/sessions/sessions.go b/sessions/sessions.go index 29a87e257..04fb53fd1 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -949,6 +949,7 @@ func (sS *SessionS) forkSession(s *Session) (err error) { if len(s.SRuns) != 0 { return errors.New("already forked") } + //we need to see what we do in case we have ArgDispatcher cgrEv := &utils.CGREvent{ Tenant: s.Tenant, ID: utils.UUIDSha1Prefix(), @@ -1160,6 +1161,24 @@ func (sS *SessionS) authSession(tnt string, evStart *engine.SafEvent) (maxUsage Tenant: tnt, EventStart: evStart, } + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, errApiKey := evStart.FieldAsString([]string{utils.MetaApiKey}) + if errApiKey == nil { + s.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, errRouteID := evStart.FieldAsString([]string{utils.MetaRouteID}) + if errRouteID == nil { + if errApiKey.Error() == utils.ErrNotFound.Error() { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + s.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface), + } + } else { + s.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface) + } + } if err = sS.forkSession(s); err != nil { return } @@ -1197,6 +1216,24 @@ func (sS *SessionS) initSession(tnt string, evStart *engine.SafEvent, clntConnID ClientConnID: clntConnID, DebitInterval: dbtItval, } + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, errApiKey := evStart.FieldAsString([]string{utils.MetaApiKey}) + if errApiKey == nil { + s.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, errRouteID := evStart.FieldAsString([]string{utils.MetaRouteID}) + if errRouteID == nil { + if errApiKey.Error() == utils.ErrNotFound.Error() { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + s.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface), + } + } else { + s.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface) + } + } if err = sS.forkSession(s); err != nil { return nil, err } diff --git a/utils/consts.go b/utils/consts.go index e16a2087e..199923056 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -858,6 +858,7 @@ const ( // Cdrs APIs const ( + CDRsV1 = "CDRsV1" CDRsV1CountCDRs = "CDRsV1.CountCDRs" CDRsV1RateCDRs = "CDRsV1.RateCDRs" CDRsV1GetCDRs = "CDRsV1.GetCDRs" @@ -865,6 +866,7 @@ const ( CDRsV1StoreSessionCost = "CDRsV1.StoreSessionCost" CDRsV1ProcessEvent = "CDRsV1.ProcessEvent" CDRsV2StoreSessionCost = "CDRsV2.StoreSessionCost" + CDRsV1Ping = "CDRsV1.Ping" ) // Scheduler