From f8b0ff7f542a8977f179dad6b50396189754685e Mon Sep 17 00:00:00 2001 From: TeoV Date: Mon, 3 Jun 2019 15:00:35 +0300 Subject: [PATCH] Add ProcessEvent route to kamailio --- agents/kamagent.go | 67 ++++++++++++++++++++--- agents/kamevent.go | 113 +++++++++++++++++++++++++++++++++++++-- agents/kamevent_test.go | 115 +++++++++++++++++++++++++++++++++++++++- sessions/sessions.go | 10 ++++ utils/consts.go | 1 + 5 files changed, 292 insertions(+), 14 deletions(-) diff --git a/agents/kamagent.go b/agents/kamagent.go index 712abcb72..857305c11 100644 --- a/agents/kamagent.go +++ b/agents/kamagent.go @@ -35,10 +35,11 @@ import ( ) var ( - kamAuthReqRegexp = regexp.MustCompile(CGR_AUTH_REQUEST) - kamCallStartRegexp = regexp.MustCompile(CGR_CALL_START) - kamCallEndRegexp = regexp.MustCompile(CGR_CALL_END) - kamDlgListRegexp = regexp.MustCompile(CGR_DLG_LIST) + kamAuthReqRegexp = regexp.MustCompile(CGR_AUTH_REQUEST) + kamCallStartRegexp = regexp.MustCompile(CGR_CALL_START) + kamCallEndRegexp = regexp.MustCompile(CGR_CALL_END) + kamDlgListRegexp = regexp.MustCompile(CGR_DLG_LIST) + kamProcessEventRegex = regexp.MustCompile(CGR_PROCESS_EVENT) ) func NewKamailioAgent(kaCfg *config.KamAgentCfg, @@ -64,10 +65,11 @@ type KamailioAgent struct { func (self *KamailioAgent) Connect() error { var err error eventHandlers := map[*regexp.Regexp][]func([]byte, string){ - kamAuthReqRegexp: {self.onCgrAuth}, - kamCallStartRegexp: {self.onCallStart}, - kamCallEndRegexp: {self.onCallEnd}, - kamDlgListRegexp: {self.onDlgList}, + kamAuthReqRegexp: {self.onCgrAuth}, + kamCallStartRegexp: {self.onCallStart}, + kamCallEndRegexp: {self.onCallEnd}, + kamDlgListRegexp: {self.onDlgList}, + kamProcessEventRegex: {self.onCgrProcessEvent}, } errChan := make(chan error) for _, connCfg := range self.cfg.EvapiConns { @@ -236,6 +238,55 @@ func (ka *KamailioAgent) onDlgList(evData []byte, connID string) { ka.activeSessionIDs <- sIDs } +func (ka *KamailioAgent) onCgrProcessEvent(evData []byte, connID string) { + kev, err := NewKamEvent(evData) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event data: %s, error: %s", + utils.KamailioAgent, evData, err.Error())) + return + } + + if kev.MissingParameter() { + if kRply, err := kev.AsKamProcessEventReply(nil, nil, utils.ErrMandatoryIeMissing); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed building process session event reply for event: %s, error: %s", + utils.KamailioAgent, kev[utils.OriginID], err.Error())) + } else if err = ka.conns[connID].Send(kRply.String()); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed sending process session event reply for event: %s, error %s", + utils.KamailioAgent, kev[utils.OriginID], err.Error())) + } + return + } + + //in case that we don't reveice cgr_subsystems from kamailio + //we consider this as ping-pong event + if _, has := kev[utils.CGRSubsystems]; !has { + if err = ka.conns[connID].Send(kev.AsKamProcessEventEmptyReply().String()); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed sending empty process event reply for event: %s, error %s", + utils.KamailioAgent, kev[utils.OriginID], err.Error())) + } + } + + procEvArgs := kev.V1ProcessEventArgs() + if procEvArgs == nil { + utils.Logger.Err(fmt.Sprintf("<%s> event: %s cannot generate process event session arguments", + utils.KamailioAgent, kev[utils.OriginID])) + return + } + procEvArgs.CGREvent.Event[utils.OriginHost] = utils.FirstNonEmpty(procEvArgs.CGREvent.Event[utils.OriginHost].(string), ka.conns[connID].RemoteAddr().String()) + procEvArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID + + var processReply sessions.V1ProcessEventReply + err = ka.sessionS.Call(utils.SessionSv1ProcessEvent, procEvArgs, &processReply) + + if kar, err := kev.AsKamProcessEventReply(procEvArgs, &processReply, err); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed building process session event reply for event: %s, error: %s", + utils.KamailioAgent, kev[utils.OriginID], err.Error())) + } else if err = ka.conns[connID].Send(kar.String()); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed sending auth reply for event: %s, error: %s", + utils.KamailioAgent, kev[utils.OriginID], err.Error())) + } +} + func (self *KamailioAgent) disconnectSession(connID string, dscEv *KamSessionDisconnect) error { if err := self.conns[connID].Send(dscEv.String()); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed sending disconnect request: %s, connection id: %s, error %s", diff --git a/agents/kamevent.go b/agents/kamevent.go index 8c4f215b2..e6c4e0558 100644 --- a/agents/kamevent.go +++ b/agents/kamevent.go @@ -35,6 +35,7 @@ const ( CGR_SESSION_DISCONNECT = "CGR_SESSION_DISCONNECT" CGR_CALL_START = "CGR_CALL_START" CGR_CALL_END = "CGR_CALL_END" + CGR_PROCESS_EVENT = "CGR_PROCESS_EVENT" KamTRIndex = "tr_index" KamTRLabel = "tr_label" KamHashEntry = "h_entry" @@ -104,6 +105,19 @@ func (kev KamEvent) MissingParameter() bool { kev[utils.Account], kev[utils.Destination], }, "") + case CGR_PROCESS_EVENT: + // TRIndex and TRLabel must exist in order to know where to send back the response + mndPrm := []string{kev[KamTRIndex], kev[KamTRLabel]} + _, has := kev[utils.CGRSubsystems] + // in case that the user populate cgr_subsystems we treat it like a ProcessEvent + // and expect to have the required fields + if has { + mndPrm = append(mndPrm, kev[utils.OriginID], + kev[utils.AnswerTime], + kev[utils.Account], + kev[utils.Destination]) + } + return utils.IsSliceMember(mndPrm, "") default: // no/unsupported event return true } @@ -152,6 +166,12 @@ func (kev KamEvent) AsCGREvent(timezone string) (cgrEv *utils.CGREvent, err erro return nil, err } sTime = sTimePrv + case CGR_PROCESS_EVENT: + sTimePrv, err := utils.ParseTimeDetectLayout(kev[utils.AnswerTime], timezone) + if err != nil { + return nil, err + } + sTime = sTimePrv default: // no/unsupported event return } @@ -202,12 +222,12 @@ func (kev KamEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) { // AsKamAuthReply builds up a Kamailio AuthReply based on arguments and reply from SessionS func (kev KamEvent) AsKamAuthReply(authArgs *sessions.V1AuthorizeArgs, - authReply *sessions.V1AuthorizeReply, rplyErr error) (kar *KamAuthReply, err error) { + authReply *sessions.V1AuthorizeReply, rplyErr error) (kar *KamReply, err error) { evName := CGR_AUTH_REPLY if kamRouReply, has := kev[KamReplyRoute]; has { evName = kamRouReply } - kar = &KamAuthReply{Event: evName, + kar = &KamReply{Event: evName, TransactionIndex: kev[KamTRIndex], TransactionLabel: kev[KamTRLabel], } @@ -265,6 +285,89 @@ func (kev KamEvent) V1InitSessionArgs() (args *sessions.V1InitSessionArgs) { return } +// V1ProcessEventArgs returns the arguments used in SessionSv1.ProcessEvent +func (kev KamEvent) V1ProcessEventArgs() (args *sessions.V1ProcessEventArgs) { + cgrEv, err := kev.AsCGREvent(config.CgrConfig().GeneralCfg().DefaultTimezone) + if err != nil { + return + } + args = &sessions.V1ProcessEventArgs{ // defaults + CGREvent: cgrEv, + } + subsystems, has := kev[utils.CGRSubsystems] + if !has { + return + } + args.GetAttributes = strings.Index(subsystems, utils.MetaAttributes) != -1 + args.AllocateResources = strings.Index(subsystems, utils.MetaResources) != -1 + args.Debit = strings.Index(subsystems, utils.MetaAccounts) != -1 + args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 + args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 + args.GetSuppliers = strings.Index(subsystems, utils.MetaSuppliers) != -1 + args.SuppliersIgnoreErrors = strings.Index(subsystems, utils.MetaSuppliersIgnoreErrors) != -1 + if strings.Index(subsystems, utils.MetaSuppliersEventCost) != -1 { + args.SuppliersMaxCost = utils.MetaEventCost + } + cgrArgs := cgrEv.ConsumeArgs(strings.Index(subsystems, utils.MetaDispatchers) != -1, true) + args.ArgDispatcher = cgrArgs.ArgDispatcher + args.Paginator = *cgrArgs.SupplierPaginator + return +} + +// AsKamAuthReply builds up a Kamailio ProcessEvent based on arguments and reply from SessionS +func (kev KamEvent) AsKamProcessEventReply(procEvArgs *sessions.V1ProcessEventArgs, + procEvReply *sessions.V1ProcessEventReply, rplyErr error) (kar *KamReply, err error) { + evName := CGR_PROCESS_EVENT + if kamRouReply, has := kev[KamReplyRoute]; has { + evName = kamRouReply + } + kar = &KamReply{Event: evName, + TransactionIndex: kev[KamTRIndex], + TransactionLabel: kev[KamTRLabel], + } + if rplyErr != nil { + kar.Error = rplyErr.Error() + return + } + if procEvArgs.GetAttributes && procEvReply.Attributes != nil { + kar.Attributes = procEvReply.Attributes.Digest() + } + if procEvArgs.AllocateResources { + kar.ResourceAllocation = *procEvReply.ResourceAllocation + } + if procEvArgs.Debit { + if *procEvReply.MaxUsage == -1 { // For calls different than unlimited, set limits + kar.MaxUsage = -1 + } else { + kar.MaxUsage = int(utils.Round(procEvReply.MaxUsage.Seconds(), 0, utils.ROUNDING_MIDDLE)) + } + } + if procEvArgs.GetSuppliers && procEvReply.Suppliers != nil { + kar.Suppliers = procEvReply.Suppliers.Digest() + } + + if procEvArgs.ProcessThresholds { + kar.Thresholds = strings.Join(*procEvReply.ThresholdIDs, utils.FIELDS_SEP) + } + if procEvArgs.ProcessStats { + kar.StatQueues = strings.Join(*procEvReply.StatQueueIDs, utils.FIELDS_SEP) + } + return +} + +// AsKamProcessEventEmptyReply builds up a Kamailio ProcessEventEmpty +func (kev KamEvent) AsKamProcessEventEmptyReply() (kar *KamReply) { + evName := CGR_PROCESS_EVENT + if kamRouReply, has := kev[KamReplyRoute]; has { + evName = kamRouReply + } + kar = &KamReply{Event: evName, + TransactionIndex: kev[KamTRIndex], + TransactionLabel: kev[KamTRLabel], + } + return +} + // V1TerminateSessionArgs returns the arguments used in SMGv1.TerminateSession func (kev KamEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionArgs) { cgrEv, err := kev.AsCGREvent(config.CgrConfig().GeneralCfg().DefaultTimezone) @@ -288,7 +391,9 @@ func (kev KamEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionA return } -type KamAuthReply struct { +//KamReply will be used to send back to kamailio from +//Authrization,ProcessEvent and ProcessEvent empty (pingPong) +type KamReply struct { Event string // Kamailio will use this to differentiate between requests and replies TransactionIndex string // Original transaction index TransactionLabel string // Original transaction label @@ -301,7 +406,7 @@ type KamAuthReply struct { Error string // Reply in case of error } -func (self *KamAuthReply) String() string { +func (self *KamReply) String() string { mrsh, _ := json.Marshal(self) return string(mrsh) } diff --git a/agents/kamevent_test.go b/agents/kamevent_test.go index 89d0f4dd0..d1bc46bad 100644 --- a/agents/kamevent_test.go +++ b/agents/kamevent_test.go @@ -211,7 +211,7 @@ func TestKamEvAsKamAuthReply(t *testing.T) { authRply := &sessions.V1AuthorizeReply{ MaxUsage: utils.DurationPointer(time.Duration(5 * time.Second)), } - expected := &KamAuthReply{ + expected := &KamReply{ Event: CGR_AUTH_REPLY, MaxUsage: 5, } @@ -249,7 +249,7 @@ func TestKamEvAsKamAuthReply(t *testing.T) { }, }, } - expected = &KamAuthReply{ + expected = &KamReply{ Event: "CGR_PROFILE_REPLY", Attributes: "Password:check123,RequestType:*prepaid", } @@ -335,3 +335,114 @@ func TestKamEvV1TerminateSessionArgs(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", expected.TerminateSession, rcv.TerminateSession) } } + +func TestKamEvV1ProcessEventArgs(t *testing.T) { + timezone := config.CgrConfig().GeneralCfg().DefaultTimezone + kamEv := KamEvent{"event": "CGR_PROCESS_EVENT", + "callid": "46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0", + "from_tag": "bf71ad59", "to_tag": "7351fecf", + "cgr_reqtype": utils.META_POSTPAID, "cgr_account": "1001", + "cgr_destination": "1002", "cgr_answertime": "1419839310", + "cgr_duration": "3", "cgr_pdd": "4", + utils.CGR_SUPPLIER: "supplier2", + utils.CGR_DISCONNECT_CAUSE: "200"} + sTime, err := utils.ParseTimeDetectLayout(kamEv[utils.AnswerTime], timezone) + if err != nil { + return + } + expected := &sessions.V1ProcessEventArgs{ + CGREvent: &utils.CGREvent{ + Tenant: utils.FirstNonEmpty(kamEv[utils.Tenant], + config.CgrConfig().GeneralCfg().DefaultTenant), + ID: utils.UUIDSha1Prefix(), + Time: &sTime, + Event: kamEv.AsMapStringInterface(), + }, + } + rcv := kamEv.V1ProcessEventArgs() + if !reflect.DeepEqual(expected.CGREvent.Tenant, rcv.CGREvent.Tenant) { + t.Errorf("Expecting: %+v, received: %+v", expected.CGREvent.Tenant, rcv.CGREvent.Tenant) + } else if !reflect.DeepEqual(expected.CGREvent.Time, rcv.CGREvent.Time) { + t.Errorf("Expecting: %+v, received: %+v", expected.CGREvent.Time, rcv.CGREvent.Time) + } else if !reflect.DeepEqual(expected.CGREvent.Event, rcv.CGREvent.Event) { + t.Errorf("Expecting: %+v, received: %+v", expected.CGREvent.Event, rcv.CGREvent.Event) + } else if !reflect.DeepEqual(expected.CGREvent.Event, rcv.CGREvent.Event) { + t.Errorf("Expecting: %+v, received: %+v", expected.CGREvent.Event, rcv.CGREvent.Event) + } +} + +func TestKamEvAsKamProcessEventReply(t *testing.T) { + timezone := config.CgrConfig().GeneralCfg().DefaultTimezone + kamEv := KamEvent{"event": "CGR_PROCESS_EVENT", + "callid": "46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0", + "from_tag": "bf71ad59", "to_tag": "7351fecf", + "cgr_reqtype": utils.META_POSTPAID, "cgr_account": "1001", + "cgr_destination": "1002", "cgr_answertime": "1419839310", + "cgr_duration": "3", "cgr_pdd": "4", + utils.CGR_SUPPLIER: "supplier2", + utils.CGR_DISCONNECT_CAUSE: "200"} + sTime, err := utils.ParseTimeDetectLayout(kamEv[utils.AnswerTime], timezone) + if err != nil { + return + } + procEvArgs := &sessions.V1ProcessEventArgs{ + Debit: true, + CGREvent: &utils.CGREvent{ + Tenant: utils.FirstNonEmpty(kamEv[utils.Tenant], + config.CgrConfig().GeneralCfg().DefaultTenant), + ID: utils.UUIDSha1Prefix(), + Time: &sTime, + Event: kamEv.AsMapStringInterface(), + }, + } + procEvhRply := &sessions.V1ProcessEventReply{ + MaxUsage: utils.DurationPointer(time.Duration(5 * time.Second)), + } + expected := &KamReply{ + Event: CGR_PROCESS_EVENT, + MaxUsage: 5, + } + if rcv, err := kamEv.AsKamProcessEventReply(procEvArgs, procEvhRply, nil); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expected, rcv) { + t.Errorf("Expecting: %+v, received: %+v", expected, rcv) + } + kamEv = KamEvent{"event": "CGR_PROFILE_REQUEST", + "Tenant": "cgrates.org", "Account": "1001", + KamReplyRoute: "CGR_PROFILE_REPLY"} + procEvArgs = &sessions.V1ProcessEventArgs{ + GetAttributes: true, + CGREvent: &utils.CGREvent{ + Tenant: utils.FirstNonEmpty(kamEv[utils.Tenant], + config.CgrConfig().GeneralCfg().DefaultTenant), + ID: utils.UUIDSha1Prefix(), + Time: &sTime, + Event: kamEv.AsMapStringInterface(), + }, + } + procEvhRply = &sessions.V1ProcessEventReply{ + Attributes: &engine.AttrSProcessEventReply{ + MatchedProfiles: []string{"ATTR_1001_ACCOUNT_PROFILE"}, + AlteredFields: []string{"Password", utils.RequestType}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestKamEvAsKamAuthReply", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.Account: "1001", + "Password": "check123", + utils.RequestType: utils.META_PREPAID, + }, + }, + }, + } + expected = &KamReply{ + Event: "CGR_PROFILE_REPLY", + Attributes: "Password:check123,RequestType:*prepaid", + } + if rcv, err := kamEv.AsKamProcessEventReply(procEvArgs, procEvhRply, nil); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expected, rcv) { + t.Errorf("Expecting: %+v, received: %+v", expected, rcv) + } +} diff --git a/sessions/sessions.go b/sessions/sessions.go index dd3941393..aa259c2f6 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -2596,6 +2596,8 @@ type V1ProcessEventReply struct { ResourceAllocation *string Attributes *engine.AttrSProcessEventReply Suppliers *engine.SortedSuppliers + ThresholdIDs *[]string + StatQueueIDs *[]string } // AsNavigableMap is part of engine.NavigableMapper interface @@ -2621,6 +2623,12 @@ func (v1Rply *V1ProcessEventReply) AsNavigableMap( if v1Rply.Suppliers != nil { cgrReply[utils.CapSuppliers] = v1Rply.Suppliers.AsNavigableMap() } + if v1Rply.ThresholdIDs != nil { + cgrReply[utils.CapThresholds] = *v1Rply.ThresholdIDs + } + if v1Rply.StatQueueIDs != nil { + cgrReply[utils.CapStatQueues] = *v1Rply.StatQueueIDs + } } return config.NewNavigableMap(cgrReply), nil } @@ -2749,6 +2757,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", utils.SessionS, err.Error(), thEv)) } + rply.ThresholdIDs = &tIDs } if args.ProcessStats { if sS.statS == nil { @@ -2766,6 +2775,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, fmt.Sprintf("<%s> error: %s processing event %+v with StatS.", utils.SessionS, err.Error(), args.CGREvent)) } + rply.ThresholdIDs = &statReply } return nil } diff --git a/utils/consts.go b/utils/consts.go index d0013c809..b577a6e8c 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -585,6 +585,7 @@ const ( ApierV = "ApierV" MetaApier = "*apier" CGREventString = "CGREvent" + MetaPing = "*ping" ) // Migrator Action