mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 20:59:53 +05:00
Add CDRsV1 to DispatcherS
This commit is contained in:
committed by
Dan Christian Bogos
parent
c878c7b6e9
commit
d45e230f98
@@ -117,3 +117,8 @@ func (cdrSv1 *CDRsV1) CountCDRs(args *utils.RPCCDRsFilter, reply *int64) error {
|
||||
func (cdrSv1 *CDRsV1) GetCDRs(args utils.RPCCDRsFilter, reply *[]*engine.CDR) error {
|
||||
return cdrSv1.CDRs.V1GetCDRs(args, reply)
|
||||
}
|
||||
|
||||
func (cdrSv1 *CDRsV1) Ping(ign *utils.CGREvent, reply *string) error {
|
||||
*reply = utils.Pong
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -671,3 +671,17 @@ func (dSv1 DispatcherSv1) GetProfileForEvent(ev *dispatchers.DispatcherEvent,
|
||||
dPrfl *engine.DispatcherProfile) error {
|
||||
return dSv1.dS.V1GetProfileForEvent(ev, dPrfl)
|
||||
}
|
||||
|
||||
func NewDispatcherSCDRsV1(dps *dispatchers.DispatcherService) *DispatcherSCDRsV1 {
|
||||
return &DispatcherSCDRsV1{dS: dps}
|
||||
}
|
||||
|
||||
// Exports RPC from CDRsV1
|
||||
type DispatcherSCDRsV1 struct {
|
||||
dS *dispatchers.DispatcherService
|
||||
}
|
||||
|
||||
// Ping used to detreminate if component is active
|
||||
func (dS *DispatcherSCDRsV1) Ping(args *utils.CGREventWithArgDispatcher, reply *string) error {
|
||||
return dS.dS.CDRsV1Ping(args, reply)
|
||||
}
|
||||
|
||||
@@ -1037,6 +1037,9 @@ func startDispatcherService(internalDispatcherSChan chan *dispatchers.Dispatcher
|
||||
server.RpcRegisterName(utils.SchedulerSv1,
|
||||
v1.NewDispatcherSchedulerSv1(dspS))
|
||||
|
||||
server.RpcRegisterName(utils.CDRsV1,
|
||||
v1.NewDispatcherSCDRsV1(dspS))
|
||||
|
||||
internalDispatcherSChan <- dspS
|
||||
}
|
||||
|
||||
|
||||
38
dispatchers/cdrs.go
Normal file
38
dispatchers/cdrs.go
Normal file
@@ -0,0 +1,38 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package dispatchers
|
||||
|
||||
import "github.com/cgrates/cgrates/utils"
|
||||
|
||||
// CacheSv1Ping interogates CacheSv1 server responsible to process the event
|
||||
func (dS *DispatcherService) CDRsV1Ping(args *utils.CGREventWithArgDispatcher,
|
||||
reply *string) (err error) {
|
||||
if args.ArgDispatcher == nil {
|
||||
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
|
||||
}
|
||||
if dS.attrS != nil {
|
||||
if err = dS.authorize(utils.CDRsV1Ping,
|
||||
args.CGREvent.Tenant,
|
||||
args.APIKey, args.CGREvent.Time); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return dS.Dispatch(args.CGREvent, utils.MetaCDRs, args.RouteID,
|
||||
utils.CDRsV1Ping, args.CGREvent, reply)
|
||||
}
|
||||
@@ -279,6 +279,8 @@ func (cdrS *CDRServer) rateCDRWithErr(cdr *CDR) (ratedCDRs []*CDR) {
|
||||
func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREvent,
|
||||
attrS, store, export, thdS, statS bool) (err error) {
|
||||
var chrgrs []*ChrgSProcessEventReply
|
||||
//in case of internal connection what should we do here ?
|
||||
//
|
||||
if err = cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent,
|
||||
cgrEv, &chrgrs); err != nil {
|
||||
utils.Logger.Warning(
|
||||
@@ -317,11 +319,29 @@ func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREvent,
|
||||
// statSProcessEvent will send the event to StatS if the connection is configured
|
||||
func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
var rplyEv AttrSProcessEventReply
|
||||
attrArgs := &AttrArgsProcessEvent{
|
||||
Context: utils.StringPointer(utils.MetaCDRs),
|
||||
CGREvent: *cgrEv}
|
||||
//check if we have APIKey in event and in case it has add it in ArgDispatcher
|
||||
apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey]
|
||||
if hasApiKey {
|
||||
attrArgs.ArgDispatcher = &utils.ArgDispatcher{
|
||||
APIKey: utils.StringPointer(apiKeyIface.(string)),
|
||||
}
|
||||
}
|
||||
//check if we have RouteID in event and in case it has add it in ArgDispatcher
|
||||
routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID]
|
||||
if hasRouteID {
|
||||
if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct
|
||||
attrArgs.ArgDispatcher = &utils.ArgDispatcher{
|
||||
RouteID: utils.StringPointer(routeIDIface.(string)),
|
||||
}
|
||||
} else {
|
||||
attrArgs.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string))
|
||||
}
|
||||
}
|
||||
if err = cdrS.attrS.Call(utils.AttributeSv1ProcessEvent,
|
||||
&AttrArgsProcessEvent{
|
||||
Context: utils.StringPointer(utils.MetaCDRs),
|
||||
CGREvent: *cgrEv},
|
||||
&rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 {
|
||||
attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 {
|
||||
*cgrEv = *rplyEv.CGREvent
|
||||
} else if err.Error() == utils.ErrNotFound.Error() {
|
||||
err = nil // cancel ErrNotFound
|
||||
@@ -332,8 +352,27 @@ func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
// thdSProcessEvent will send the event to ThresholdS if the connection is configured
|
||||
func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREvent) {
|
||||
var tIDs []string
|
||||
thArgs := &ArgsProcessEvent{CGREvent: *cgrEv}
|
||||
//check if we have APIKey in event and in case it has add it in ArgDispatcher
|
||||
apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey]
|
||||
if hasApiKey {
|
||||
thArgs.ArgDispatcher = &utils.ArgDispatcher{
|
||||
APIKey: utils.StringPointer(apiKeyIface.(string)),
|
||||
}
|
||||
}
|
||||
//check if we have RouteID in event and in case it has add it in ArgDispatcher
|
||||
routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID]
|
||||
if hasRouteID {
|
||||
if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct
|
||||
thArgs.ArgDispatcher = &utils.ArgDispatcher{
|
||||
RouteID: utils.StringPointer(routeIDIface.(string)),
|
||||
}
|
||||
} else {
|
||||
thArgs.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string))
|
||||
}
|
||||
}
|
||||
if err := cdrS.thdS.Call(utils.ThresholdSv1ProcessEvent,
|
||||
&ArgsProcessEvent{CGREvent: *cgrEv}, &tIDs); err != nil &&
|
||||
thArgs, &tIDs); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s processing CDR event %+v with thdS.",
|
||||
@@ -345,8 +384,27 @@ func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREvent) {
|
||||
// statSProcessEvent will send the event to StatS if the connection is configured
|
||||
func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREvent) {
|
||||
var reply []string
|
||||
statArgs := &StatsArgsProcessEvent{CGREvent: *cgrEv}
|
||||
//check if we have APIKey in event and in case it has add it in ArgDispatcher
|
||||
apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey]
|
||||
if hasApiKey {
|
||||
statArgs.ArgDispatcher = &utils.ArgDispatcher{
|
||||
APIKey: utils.StringPointer(apiKeyIface.(string)),
|
||||
}
|
||||
}
|
||||
//check if we have RouteID in event and in case it has add it in ArgDispatcher
|
||||
routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID]
|
||||
if hasRouteID {
|
||||
if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct
|
||||
statArgs.ArgDispatcher = &utils.ArgDispatcher{
|
||||
RouteID: utils.StringPointer(routeIDIface.(string)),
|
||||
}
|
||||
} else {
|
||||
statArgs.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string))
|
||||
}
|
||||
}
|
||||
if err := cdrS.statS.Call(utils.StatSv1ProcessEvent,
|
||||
&StatsArgsProcessEvent{CGREvent: *cgrEv}, &reply); err != nil &&
|
||||
statArgs, &reply); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s.",
|
||||
|
||||
@@ -151,7 +151,6 @@ func (cS *ChargerService) processEvent(cgrEv *utils.CGREvent) (rply []*ChrgSProc
|
||||
args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string))
|
||||
}
|
||||
}
|
||||
|
||||
var evReply AttrSProcessEventReply
|
||||
if err = cS.attrS.Call(utils.AttributeSv1ProcessEvent,
|
||||
args, &evReply); err != nil {
|
||||
|
||||
@@ -949,6 +949,7 @@ func (sS *SessionS) forkSession(s *Session) (err error) {
|
||||
if len(s.SRuns) != 0 {
|
||||
return errors.New("already forked")
|
||||
}
|
||||
//we need to see what we do in case we have ArgDispatcher
|
||||
cgrEv := &utils.CGREvent{
|
||||
Tenant: s.Tenant,
|
||||
ID: utils.UUIDSha1Prefix(),
|
||||
@@ -1160,6 +1161,24 @@ func (sS *SessionS) authSession(tnt string, evStart *engine.SafEvent) (maxUsage
|
||||
Tenant: tnt,
|
||||
EventStart: evStart,
|
||||
}
|
||||
//check if we have APIKey in event and in case it has add it in ArgDispatcher
|
||||
apiKeyIface, errApiKey := evStart.FieldAsString([]string{utils.MetaApiKey})
|
||||
if errApiKey == nil {
|
||||
s.ArgDispatcher = &utils.ArgDispatcher{
|
||||
APIKey: utils.StringPointer(apiKeyIface),
|
||||
}
|
||||
}
|
||||
//check if we have RouteID in event and in case it has add it in ArgDispatcher
|
||||
routeIDIface, errRouteID := evStart.FieldAsString([]string{utils.MetaRouteID})
|
||||
if errRouteID == nil {
|
||||
if errApiKey.Error() == utils.ErrNotFound.Error() { //in case we don't have APIKey, but we have RouteID we need to initialize the struct
|
||||
s.ArgDispatcher = &utils.ArgDispatcher{
|
||||
RouteID: utils.StringPointer(routeIDIface),
|
||||
}
|
||||
} else {
|
||||
s.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface)
|
||||
}
|
||||
}
|
||||
if err = sS.forkSession(s); err != nil {
|
||||
return
|
||||
}
|
||||
@@ -1197,6 +1216,24 @@ func (sS *SessionS) initSession(tnt string, evStart *engine.SafEvent, clntConnID
|
||||
ClientConnID: clntConnID,
|
||||
DebitInterval: dbtItval,
|
||||
}
|
||||
//check if we have APIKey in event and in case it has add it in ArgDispatcher
|
||||
apiKeyIface, errApiKey := evStart.FieldAsString([]string{utils.MetaApiKey})
|
||||
if errApiKey == nil {
|
||||
s.ArgDispatcher = &utils.ArgDispatcher{
|
||||
APIKey: utils.StringPointer(apiKeyIface),
|
||||
}
|
||||
}
|
||||
//check if we have RouteID in event and in case it has add it in ArgDispatcher
|
||||
routeIDIface, errRouteID := evStart.FieldAsString([]string{utils.MetaRouteID})
|
||||
if errRouteID == nil {
|
||||
if errApiKey.Error() == utils.ErrNotFound.Error() { //in case we don't have APIKey, but we have RouteID we need to initialize the struct
|
||||
s.ArgDispatcher = &utils.ArgDispatcher{
|
||||
RouteID: utils.StringPointer(routeIDIface),
|
||||
}
|
||||
} else {
|
||||
s.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface)
|
||||
}
|
||||
}
|
||||
if err = sS.forkSession(s); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -858,6 +858,7 @@ const (
|
||||
|
||||
// Cdrs APIs
|
||||
const (
|
||||
CDRsV1 = "CDRsV1"
|
||||
CDRsV1CountCDRs = "CDRsV1.CountCDRs"
|
||||
CDRsV1RateCDRs = "CDRsV1.RateCDRs"
|
||||
CDRsV1GetCDRs = "CDRsV1.GetCDRs"
|
||||
@@ -865,6 +866,7 @@ const (
|
||||
CDRsV1StoreSessionCost = "CDRsV1.StoreSessionCost"
|
||||
CDRsV1ProcessEvent = "CDRsV1.ProcessEvent"
|
||||
CDRsV2StoreSessionCost = "CDRsV2.StoreSessionCost"
|
||||
CDRsV1Ping = "CDRsV1.Ping"
|
||||
)
|
||||
|
||||
// Scheduler
|
||||
|
||||
Reference in New Issue
Block a user