From 72ed8dcb0d46cd2220ac4676a31a4d724d56748d Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 26 Jan 2018 20:17:56 +0100 Subject: [PATCH] First cut of KamailioAgent --- agents/fsagent.go | 40 +-- agents/fsevent.go | 26 +- agents/kamagent.go | 200 +++++++++++++++ agents/kamevent.go | 282 +++++++++++++++++++++ agents/kamevent_test.go | 58 +++++ cmd/cgr-engine/cgr-engine.go | 47 +--- config/config_defaults.go | 1 + config/kamagent.go | 1 + engine/cgrcdr_test.go | 18 +- sessionmanager/kamailiosm.go | 314 ----------------------- sessionmanager/kamailiosm_test.go | 26 -- sessionmanager/kamevent.go | 407 ------------------------------ sessionmanager/kamevent_test.go | 127 ---------- sessionmanager/osipsevent.go | 5 + sessionmanager/smg_event_test.go | 25 +- utils/consts.go | 7 +- 16 files changed, 594 insertions(+), 990 deletions(-) create mode 100644 agents/kamagent.go create mode 100644 agents/kamevent.go create mode 100644 agents/kamevent_test.go delete mode 100644 sessionmanager/kamailiosm.go delete mode 100644 sessionmanager/kamailiosm_test.go delete mode 100644 sessionmanager/kamevent.go delete mode 100644 sessionmanager/kamevent_test.go diff --git a/agents/fsagent.go b/agents/fsagent.go index 84a51bb5a..232c6debe 100644 --- a/agents/fsagent.go +++ b/agents/fsagent.go @@ -21,15 +21,12 @@ package agents import ( "errors" "fmt" - "reflect" - "strings" "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/sessionmanager" "github.com/cgrates/cgrates/utils" "github.com/cgrates/fsock" - "github.com/cgrates/rpcclient" ) func NewFSSessionManager(fsAgentConfig *config.FsAgentConfig, @@ -222,12 +219,6 @@ func (sm *FSSessionManager) onChannelAnswer(fsev FSEvent, connId string) { sm.disconnectSession(connId, chanUUID, "", utils.ErrServerError.Error()) return } - if initSessionArgs.AllocateResources { - if initReply.ResourceAllocation == nil { - sm.disconnectSession(connId, chanUUID, "", - utils.ErrUnallocatedResource.Error()) - } - } } func (sm *FSSessionManager) onChannelHangupComplete(fsev FSEvent, connId string) { @@ -241,14 +232,13 @@ func (sm *FSSessionManager) onChannelHangupComplete(fsev FSEvent, connId string) utils.Logger.Err( fmt.Sprintf("<%s> Could not terminate session with event %s, error: %s", utils.FreeSWITCHAgent, fsev.GetUUID(), err.Error())) - return } } if sm.cfg.CreateCdr { cdr := fsev.AsCDR(sm.timezone) if err := sm.smg.Call(utils.SessionSv1ProcessCDR, cdr, &reply); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", - utils.FreeSWITCHAgent, cdr.CGRID, cdr.OriginID, err.Error())) + utils.Logger.Err(fmt.Sprintf("<%s> Failed processing CDR: %s, error: <%s>", + utils.FreeSWITCHAgent, utils.ToJSON(cdr), err.Error())) } } } @@ -342,30 +332,8 @@ func (sm *FSSessionManager) Shutdown() (err error) { } // rpcclient.RpcClientConnection interface -func (fsa *FSSessionManager) Call(serviceMethod string, args interface{}, reply interface{}) error { - parts := strings.Split(serviceMethod, ".") - if len(parts) != 2 { - return rpcclient.ErrUnsupporteServiceMethod - } - // get method - method := reflect.ValueOf(fsa).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method - if !method.IsValid() { - return rpcclient.ErrUnsupporteServiceMethod - } - // construct the params - params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)} - ret := method.Call(params) - if len(ret) != 1 { - return utils.ErrServerError - } - if ret[0].Interface() == nil { - return nil - } - err, ok := ret[0].Interface().(error) - if !ok { - return utils.ErrServerError - } - return err +func (sm *FSSessionManager) Call(serviceMethod string, args interface{}, reply interface{}) error { + return utils.APIerRPCCall(sm, serviceMethod, args, reply) } // Internal method to disconnect session in asterisk diff --git a/agents/fsevent.go b/agents/fsevent.go index d4dd036ef..5688ac767 100644 --- a/agents/fsevent.go +++ b/agents/fsevent.go @@ -67,14 +67,10 @@ const ( IGNOREPARK = "variable_cgr_ignorepark" FS_VARPREFIX = "variable_" VarCGRSubsystems = "variable_cgr_subsystems" - SubSAccountS = "accounts" - SubSSupplierS = "suppliers" - SubSResourceS = "resources" - SubSAttributeS = "attributes" CGRResourceAllocation = "cgr_resource_allocation" VAR_CGR_DISCONNECT_CAUSE = "variable_" + utils.CGR_DISCONNECT_CAUSE VAR_CGR_CMPUTELCR = "variable_" + utils.CGR_COMPUTELCR - FsConnID = "FsConnID" // used to share connID info in event + FsConnID = "FsConnID" // used to share connID info in event for remote disconnects VarAnswerEpoch = "variable_answer_epoch" ) @@ -386,22 +382,22 @@ func (fsev FSEvent) V1AuthorizeArgs() (args *sessionmanager.V1AuthorizeArgs) { if !has { return } - if strings.Index(subsystems, SubSAccountS) == -1 { + if strings.Index(subsystems, utils.MetaAccounts) == -1 { args.GetMaxUsage = false } - if strings.Index(subsystems, SubSResourceS) != -1 { + if strings.Index(subsystems, utils.MetaResources) != -1 { args.AuthorizeResources = true } - if strings.Index(subsystems, SubSSupplierS) != -1 { + if strings.Index(subsystems, utils.MetaSuppliers) != -1 { args.GetSuppliers = true } - if strings.Index(subsystems, SubSAttributeS) != -1 { + if strings.Index(subsystems, utils.MetaAttributes) != -1 { args.GetAttributes = true } return } -// V2InitSessionArgs returns the arguments used in SMGv1.InitSession +// V1InitSessionArgs returns the arguments used in SessionSv1.InitSession func (fsev FSEvent) V1InitSessionArgs() (args *sessionmanager.V1InitSessionArgs) { args = &sessionmanager.V1InitSessionArgs{ // defaults InitSession: true, @@ -415,13 +411,13 @@ func (fsev FSEvent) V1InitSessionArgs() (args *sessionmanager.V1InitSessionArgs) if !has { return } - if strings.Index(subsystems, SubSAccountS) == -1 { + if strings.Index(subsystems, utils.MetaAccounts) == -1 { args.InitSession = false } - if strings.Index(subsystems, SubSResourceS) != -1 { + if strings.Index(subsystems, utils.MetaResources) != -1 { args.AllocateResources = true } - if strings.Index(subsystems, SubSAttributeS) != -1 { + if strings.Index(subsystems, utils.MetaAttributes) != -1 { args.GetAttributes = true } return @@ -441,10 +437,10 @@ func (fsev FSEvent) V1TerminateSessionArgs() (args *sessionmanager.V1TerminateSe if !has { return } - if strings.Index(subsystems, SubSAccountS) == -1 { + if strings.Index(subsystems, utils.MetaAccounts) == -1 { args.TerminateSession = false } - if strings.Index(subsystems, SubSResourceS) != -1 { + if strings.Index(subsystems, utils.MetaResources) != -1 { args.ReleaseResources = true } return diff --git a/agents/kamagent.go b/agents/kamagent.go new file mode 100644 index 000000000..5d6f8df13 --- /dev/null +++ b/agents/kamagent.go @@ -0,0 +1,200 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package agents + +import ( + "fmt" + "log" + "regexp" + "strings" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/sessionmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/kamevapi" +) + +func NewKamailioAgent(kaCfg *config.KamAgentCfg, + sessionS *utils.BiRPCInternalClient, timezone string) (ka *KamailioAgent) { + ka = &KamailioAgent{cfg: kaCfg, sessionS: sessionS, + timezone: timezone, + conns: make(map[string]*kamevapi.KamEvapi)} + ka.sessionS.SetClientConn(ka) // pass the connection to KA back into smg so we can receive the disconnects + return +} + +type KamailioAgent struct { + cfg *config.KamAgentCfg + sessionS *utils.BiRPCInternalClient + timezone string + conns map[string]*kamevapi.KamEvapi +} + +func (self *KamailioAgent) Connect() error { + var err error + eventHandlers := map[*regexp.Regexp][]func([]byte, string){ + regexp.MustCompile(CGR_AUTH_REQUEST): []func([]byte, string){ + self.onCgrAuth}, + regexp.MustCompile(CGR_CALL_START): []func([]byte, string){ + self.onCallStart}, + regexp.MustCompile(CGR_CALL_END): []func([]byte, string){self.onCallEnd}, + } + errChan := make(chan error) + for _, connCfg := range self.cfg.EvapiConns { + connID := utils.GenUUID() + logger := log.New(utils.Logger, "kamevapi:", 2) + if self.conns[connID], err = kamevapi.NewKamEvapi(connCfg.Address, connID, connCfg.Reconnects, eventHandlers, logger); err != nil { + return err + } + go func() { // Start reading in own goroutine, return on error + if err := self.conns[connID].ReadEvents(); err != nil { + errChan <- err + } + }() + } + err = <-errChan // Will keep the Connect locked until the first error in one of the connections + return err +} + +func (self *KamailioAgent) Shutdown() error { + return nil +} + +// rpcclient.RpcClientConnection interface +func (ka *KamailioAgent) Call(serviceMethod string, args interface{}, reply interface{}) error { + return utils.APIerRPCCall(ka, serviceMethod, args, reply) +} + +// onCgrAuth is called when new event of type CGR_AUTH_REQUEST is coming +func (ka *KamailioAgent) onCgrAuth(evData []byte, connID string) { + kev, err := NewKamEvent(evData) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event data: %s, error: %s", + utils.KamailioAgent, evData, err.Error())) + return + } + if kev[utils.RequestType] == utils.META_NONE { // Do not process this request + return + } + if kev.MissingParameter() { + if kRply, err := kev.AsKamAuthReply(nil, nil, utils.ErrMandatoryIeMissing); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed building auth reply for event: %s, error: %s", + utils.KamailioAgent, kev[utils.OriginID], err.Error())) + } else if err = ka.conns[connID].Send(kRply.String()); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed sending auth reply for event: %s, error %s", + utils.KamailioAgent, kev[utils.OriginID], err.Error())) + } + return + } + authArgs := kev.V1AuthorizeArgs() + var authReply sessionmanager.V1AuthorizeReply + err = ka.sessionS.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply) + if kar, err := kev.AsKamAuthReply(authArgs, &authReply, err); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed building auth reply for event: %s, error: %s", + utils.KamailioAgent, kev[utils.OriginID], err.Error())) + } else if err = ka.conns[connID].Send(kar.String()); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed sending auth reply for event: %s, error: %s", + utils.KamailioAgent, kev[utils.OriginID], err.Error())) + } +} + +func (ka *KamailioAgent) onCallStart(evData []byte, connID string) { + kev, err := NewKamEvent(evData) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event: %s, error: %s", + utils.KamailioAgent, evData, err.Error())) + return + } + if kev[utils.RequestType] == utils.META_NONE { // Do not process this request + return + } + if kev.MissingParameter() { + ka.disconnectSession(connID, + NewKamSessionDisconnect(kev[KamHashEntry], kev[KamHashID], + utils.ErrMandatoryIeMissing.Error())) + } + initSessionArgs := kev.V1InitSessionArgs() + initSessionArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID so we can properly disconnect later + var initReply sessionmanager.V1InitSessionReply + if err := ka.sessionS.Call(utils.SessionSv1InitiateSession, + initSessionArgs, &initReply); err != nil { + utils.Logger.Err( + fmt.Sprintf("<%s> could not process answer for event %s, error: %s", + utils.KamailioAgent, kev[utils.OriginID], err.Error())) + ka.disconnectSession(connID, + NewKamSessionDisconnect(kev[KamHashEntry], kev[KamHashID], + utils.ErrServerError.Error())) + return + } +} + +func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) { + kev, err := NewKamEvent(evData) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event: %s, error: %s", + utils.KamailioAgent, evData, err.Error())) + return + } + if kev[utils.RequestType] == utils.META_NONE { // Do not process this request + return + } + if kev.MissingParameter() { + utils.Logger.Err(fmt.Sprintf("<%s> mandatory IE missing out from event: %s", + utils.KamailioAgent, kev[utils.OriginID])) + return + } + var reply string + if err := ka.sessionS.Call(utils.SessionSv1TerminateSession, + kev.V1TerminateSessionArgs(), &reply); err != nil { + utils.Logger.Err( + fmt.Sprintf("<%s> could not terminate session with event %s, error: %s", + utils.KamailioAgent, kev[utils.OriginID], err.Error())) + // no return here since we want CDR anyhow + } + if ka.cfg.CreateCdr || strings.Index(kev[KamCGRSubsystems], utils.MetaCDRs) != -1 { + cdr := kev.AsCDR(ka.timezone) + if err := ka.sessionS.Call(utils.SessionSv1ProcessCDR, cdr, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf("%s> failed processing CDR: %s, error: %s", + utils.KamailioAgent, utils.ToJSON(cdr), err.Error())) + } + } +} + +func (self *KamailioAgent) disconnectSession(connID string, dscEv *KamSessionDisconnect) error { + if err := self.conns[connID].Send(dscEv.String()); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed sending disconnect request: %s, connection id: %s, error %s", + utils.KamailioAgent, utils.ToJSON(dscEv), err.Error(), connID)) + return err + } + return nil +} + +// Internal method to disconnect session in Kamailio +func (ka *KamailioAgent) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) (err error) { + hEntry, _ := utils.CastFieldIfToString(args.EventStart[KamHashEntry]) + hID, _ := utils.CastFieldIfToString(args.EventStart[KamHashID]) + connID, _ := utils.CastFieldIfToString(args.EventStart[EvapiConnID]) + if err = ka.disconnectSession(connID, + NewKamSessionDisconnect(hEntry, hID, + utils.ErrInsufficientCredit.Error())); err != nil { + return + } + *reply = utils.OK + return +} diff --git a/agents/kamevent.go b/agents/kamevent.go new file mode 100644 index 000000000..66c28ffa2 --- /dev/null +++ b/agents/kamevent.go @@ -0,0 +1,282 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package agents + +import ( + "encoding/json" + "strings" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/sessionmanager" + "github.com/cgrates/cgrates/utils" +) + +const ( + EVENT = "event" + CGR_AUTH_REQUEST = "CGR_AUTH_REQUEST" + CGR_AUTH_REPLY = "CGR_AUTH_REPLY" + CGR_SESSION_DISCONNECT = "CGR_SESSION_DISCONNECT" + CGR_CALL_START = "CGR_CALL_START" + CGR_CALL_END = "CGR_CALL_END" + KamTRIndex = "tr_index" + KamTRLabel = "tr_label" + KamHashEntry = "h_entry" + KamHashID = "h_id" + KamCGRSubsystems = "cgr_subsystems" + EvapiConnID = "EvapiConnID" // used to share connID info in event for remote disconnects +) + +var kamReservedFields = []string{EVENT, KamTRIndex, KamTRLabel, + KamHashEntry, KamHashID, KamCGRSubsystems} + +func NewKamSessionDisconnect(hEntry, hID, reason string) *KamSessionDisconnect { + return &KamSessionDisconnect{ + Event: CGR_SESSION_DISCONNECT, + HashEntry: hEntry, + HashId: hID, + Reason: reason} +} + +type KamSessionDisconnect struct { + Event string + HashEntry string + HashId string + Reason string +} + +func (self *KamSessionDisconnect) String() string { + mrsh, _ := json.Marshal(self) + return string(mrsh) +} + +// NewKamEvent parses bytes received over the wire from Kamailio into KamEvent +func NewKamEvent(kamEvData []byte) (KamEvent, error) { + kev := make(map[string]string) + if err := json.Unmarshal(kamEvData, &kev); err != nil { + return nil, err + } + return kev, nil +} + +// KamEvent represents one event received from Kamailio +type KamEvent map[string]string + +func (kev KamEvent) MissingParameter() bool { + switch kev[EVENT] { + case CGR_AUTH_REQUEST: + return utils.IsSliceMember([]string{ + kev[KamTRIndex], + kev[KamTRLabel], + kev[utils.SetupTime], + kev[utils.Account], + kev[utils.Destination], + }, "") + case CGR_CALL_START: + return utils.IsSliceMember([]string{ + kev[KamHashEntry], + kev[KamHashID], + kev[utils.OriginID], + kev[utils.AnswerTime], + kev[utils.Account], + kev[utils.Destination], + }, "") + case CGR_CALL_END: + return utils.IsSliceMember([]string{ + kev[KamHashEntry], + kev[KamHashID], + kev[utils.OriginID], + kev[utils.AnswerTime], + kev[utils.Account], + kev[utils.Destination], + }, "") + default: // no/unsupported event + return true + } + +} + +// AsMapStringIface converts KamEvent into event used by other subsystems +func (kev KamEvent) AsMapStringInterface() (mp map[string]interface{}) { + mp = make(map[string]interface{}) + for k, v := range kev { + if !utils.IsSliceMember(kamReservedFields, k) { // reserved attributes not getting into event + mp[k] = v + } + } + return +} + +// AsCDR converts KamEvent into CDR +func (kev KamEvent) AsCDR(timezone string) (cdr *engine.CDR) { + cdr = new(engine.CDR) + for fld, val := range kev { // first ExtraFields so we can overwrite + if !utils.IsSliceMember(utils.PrimaryCdrFields, fld) && + !utils.IsSliceMember(kamReservedFields, fld) { + cdr.ExtraFields[fld] = val + } + } + cdr.ToR = utils.VOICE + cdr.OriginID = kev[utils.OriginID] + cdr.OriginHost = kev[utils.OriginHost] + cdr.Source = "KamailioEvent" + cdr.RequestType = utils.FirstNonEmpty(kev[utils.RequestType], config.CgrConfig().DefaultReqType) + cdr.Tenant = utils.FirstNonEmpty(kev[utils.Tenant], config.CgrConfig().DefaultTenant) + cdr.Category = utils.FirstNonEmpty(kev[utils.Category], config.CgrConfig().DefaultCategory) + cdr.Account = kev[utils.Account] + cdr.Subject = kev[utils.Subject] + cdr.Destination = kev[utils.Destination] + cdr.SetupTime, _ = utils.ParseTimeDetectLayout(kev[utils.SetupTime], timezone) + cdr.AnswerTime, _ = utils.ParseTimeDetectLayout(kev[utils.AnswerTime], timezone) + cdr.Usage, _ = utils.ParseDurationWithSecs(kev[utils.Usage]) + cdr.Cost = -1 + return cdr +} + +// String is used for pretty printing event in logs +func (kev KamEvent) String() string { + mrsh, _ := json.Marshal(kev) + return string(mrsh) +} + +func (kev KamEvent) V1AuthorizeArgs() (args *sessionmanager.V1AuthorizeArgs) { + args = &sessionmanager.V1AuthorizeArgs{ + GetMaxUsage: true, + CGREvent: utils.CGREvent{ + Tenant: utils.FirstNonEmpty(kev[utils.Tenant], + config.CgrConfig().DefaultTenant), + ID: utils.UUIDSha1Prefix(), + Event: kev.AsMapStringInterface(), + }, + } + subsystems, has := kev[KamCGRSubsystems] + if !has { + return + } + if strings.Index(subsystems, utils.MetaAccounts) == -1 { + args.GetMaxUsage = false + } + if strings.Index(subsystems, utils.MetaResources) != -1 { + args.AuthorizeResources = true + } + if strings.Index(subsystems, utils.MetaSuppliers) != -1 { + args.GetSuppliers = true + } + if strings.Index(subsystems, utils.MetaAttributes) != -1 { + args.GetAttributes = true + } + return +} + +// AsKamAuthReply builds up a Kamailio AuthReply based on arguments and reply from SessionS +func (kev KamEvent) AsKamAuthReply(authArgs *sessionmanager.V1AuthorizeArgs, + authReply *sessionmanager.V1AuthorizeReply, rplyErr error) (kar *KamAuthReply, err error) { + kar = &KamAuthReply{Event: CGR_AUTH_REPLY, + TransactionIndex: kev[KamTRIndex], + TransactionLabel: kev[KamTRLabel], + } + if rplyErr != nil { + kar.Error = rplyErr.Error() + return + } + if authArgs.GetAttributes && authReply.Attributes != nil { + kar.Attributes = authReply.Attributes.Digest() + } + if authArgs.AuthorizeResources { + kar.ResourceAllocation = *authReply.ResourceAllocation + } + if authArgs.GetMaxUsage { + if *authReply.MaxUsage == -1 { // For calls different than unlimited, set limits + kar.MaxUsage = -1 + } else { + kar.MaxUsage = int(utils.Round(authReply.MaxUsage.Seconds(), 0, utils.ROUNDING_MIDDLE)) + } + } + if authArgs.GetSuppliers && authReply.Suppliers != nil { + kar.Suppliers = authReply.Suppliers.Digest() + } + return +} + +// V1InitSessionArgs returns the arguments used in SessionSv1.InitSession +func (kev KamEvent) V1InitSessionArgs() (args *sessionmanager.V1InitSessionArgs) { + args = &sessionmanager.V1InitSessionArgs{ // defaults + InitSession: true, + CGREvent: utils.CGREvent{ + Tenant: utils.FirstNonEmpty(kev[utils.Tenant], + config.CgrConfig().DefaultTenant), + ID: utils.UUIDSha1Prefix(), + Event: kev.AsMapStringInterface(), + }, + } + subsystems, has := kev[KamCGRSubsystems] + if !has { + return + } + if strings.Index(subsystems, utils.MetaAccounts) == -1 { + args.InitSession = false + } + if strings.Index(subsystems, utils.MetaResources) != -1 { + args.AllocateResources = true + } + if strings.Index(subsystems, utils.MetaAttributes) != -1 { + args.GetAttributes = true + } + return +} + +// V1TerminateSessionArgs returns the arguments used in SMGv1.TerminateSession +func (kev KamEvent) V1TerminateSessionArgs() (args *sessionmanager.V1TerminateSessionArgs) { + args = &sessionmanager.V1TerminateSessionArgs{ // defaults + TerminateSession: true, + CGREvent: utils.CGREvent{ + Tenant: utils.FirstNonEmpty(kev[utils.Tenant], + config.CgrConfig().DefaultTenant), + ID: utils.UUIDSha1Prefix(), + Event: kev.AsMapStringInterface(), + }, + } + subsystems, has := kev[KamCGRSubsystems] + if !has { + return + } + if strings.Index(subsystems, utils.MetaAccounts) == -1 { + args.TerminateSession = false + } + if strings.Index(subsystems, utils.MetaResources) != -1 { + args.ReleaseResources = true + } + return +} + +type KamAuthReply struct { + Event string // Kamailio will use this to differentiate between requests and replies + TransactionIndex string // Original transaction index + TransactionLabel string // Original transaction label + Attributes string + ResourceAllocation string + MaxUsage int // Maximum session time in case of success, -1 for unlimited + Suppliers string // List of suppliers, comma separated + Error string // Reply in case of error +} + +func (self *KamAuthReply) String() string { + mrsh, _ := json.Marshal(self) + return string(mrsh) +} diff --git a/agents/kamevent_test.go b/agents/kamevent_test.go new file mode 100644 index 000000000..1e4cfd59f --- /dev/null +++ b/agents/kamevent_test.go @@ -0,0 +1,58 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package agents + +import ( + "reflect" + "testing" + + "github.com/cgrates/cgrates/utils" +) + +var kamEv = KamEvent{KamTRIndex: "29223", KamTRLabel: "698469260", + "callid": "ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ", "from_tag": "eb082607", "to_tag": "4ea9687f", "cgr_account": "dan", + "cgr_reqtype": utils.META_PREPAID, "cgr_subject": "dan", "cgr_destination": "+4986517174963", "cgr_tenant": "itsyscom.com", + "cgr_duration": "20", utils.CGR_SUPPLIER: "suppl2", utils.CGR_DISCONNECT_CAUSE: "200", "extra1": "val1", "extra2": "val2"} + +func TestNewKamEvent(t *testing.T) { + evStr := `{"event":"CGR_CALL_END", + "callid":"46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0", + "from_tag":"bf71ad59", + "to_tag":"7351fecf", + "cgr_reqtype":"*postpaid", + "cgr_account":"1001", + "cgr_destination":"1002", + "cgr_answertime":"1419839310", + "cgr_duration":"3", + "cgr_supplier":"supplier2", + "cgr_disconnectcause": "200", + "cgr_pdd": "4"}` + eKamEv := KamEvent{"event": "CGR_CALL_END", + "callid": "46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0", + "from_tag": "bf71ad59", "to_tag": "7351fecf", + "cgr_reqtype": utils.META_POSTPAID, "cgr_account": "1001", + "cgr_destination": "1002", "cgr_answertime": "1419839310", + "cgr_duration": "3", "cgr_pdd": "4", + utils.CGR_SUPPLIER: "supplier2", + utils.CGR_DISCONNECT_CAUSE: "200"} + if kamEv, err := NewKamEvent([]byte(evStr)); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eKamEv, kamEv) { + t.Error("Received: ", kamEv) + } +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index c44a33720..9142a037a 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -305,40 +305,17 @@ func startFsAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan c exitChan <- true } -func startSmKamailio(internalRaterChan, internalCDRSChan, internalRsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) { +func startKamAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan chan bool) { var err error - utils.Logger.Info("Starting CGRateS SMKamailio service.") - var ralsConn, cdrsConn, rlSConn *rpcclient.RpcClientPool - if len(cfg.SmKamConfig.RALsConns) != 0 { - ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.SmKamConfig.RALsConns, internalRaterChan, cfg.InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) - exitChan <- true - return - } - } - if len(cfg.SmKamConfig.CDRsConns) != 0 { - cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.SmKamConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRs: %s", err.Error())) - exitChan <- true - return - } - } - if len(cfg.SmKamConfig.RLsConns) != 0 { - rlSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.SmKamConfig.RLsConns, internalRsChan, cfg.InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to RLsConns: %s", err.Error())) - exitChan <- true - return - } - } - sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, ralsConn, cdrsConn, rlSConn, cfg.DefaultTimezone) - if err = sm.Connect(); err != nil { - utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) + utils.Logger.Info("Starting Kamailio agent") + smgRpcConn := <-internalSMGChan + internalSMGChan <- smgRpcConn + birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessionmanager.SMGeneric)) + ka := agents.NewKamailioAgent(cfg.KamAgentCfg(), + birpcClnt, utils.FirstNonEmpty(cfg.KamAgentCfg().Timezone, cfg.DefaultTimezone)) + + if err = ka.Connect(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) } exitChan <- true } @@ -906,8 +883,8 @@ func main() { } // Start SM-Kamailio - if cfg.SmKamConfig.Enabled { - go startSmKamailio(internalRaterChan, internalCdrSChan, internalRsChan, cdrDb, exitChan) + if cfg.KamAgentCfg().Enabled { + go startKamAgent(internalSMGChan, exitChan) } if cfg.AsteriskAgentCfg().Enabled { diff --git a/config/config_defaults.go b/config/config_defaults.go index ed40b1bf2..505251bb4 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -340,6 +340,7 @@ const CGRATES_CFG_JSON = ` {"address": "*internal"} // connection towards session service: <*internal> ], "create_cdr": false, // create CDR out of events and sends them to CDRS component + "timezone": "", // timezone of the Kamailio server "evapi_conns":[ // instantiate connections to multiple Kamailio servers {"address": "127.0.0.1:8448", "reconnects": 5} ], diff --git a/config/kamagent.go b/config/kamagent.go index 36e26a4b1..12bc9b4aa 100644 --- a/config/kamagent.go +++ b/config/kamagent.go @@ -43,6 +43,7 @@ type KamAgentCfg struct { SessionSConns []*HaPoolConfig CreateCdr bool EvapiConns []*KamConnConfig + Timezone string } func (ka *KamAgentCfg) loadFromJsonCfg(jsnCfg *KamAgentJsonCfg) error { diff --git a/engine/cgrcdr_test.go b/engine/cgrcdr_test.go index 9ac100bc1..56a4bfcca 100644 --- a/engine/cgrcdr_test.go +++ b/engine/cgrcdr_test.go @@ -59,11 +59,19 @@ func TestCgrCdrAsCDR(t *testing.T) { // Make sure the replicated CDR matches the expected CDR func TestReplicatedCgrCdrAsCDR(t *testing.T) { - cgrCdr := CgrCdr{utils.CGRID: "164b0422fdc6a5117031b427439482c6a4f90e41", utils.TOR: utils.VOICE, utils.OriginID: "dsafdsaf", utils.OriginHost: "192.168.1.1", - utils.Source: "internal_test", utils.RequestType: utils.META_RATED, - utils.Direction: utils.OUT, utils.Tenant: "cgrates.org", utils.Category: "call", - utils.Account: "1001", utils.Subject: "1001", utils.Destination: "1002", utils.SetupTime: "2013-11-07T08:42:20Z", utils.PDD: "0.200", utils.AnswerTime: "2013-11-07T08:42:26Z", - utils.Usage: "10s", utils.SUPPLIER: "SUPPL1", utils.DISCONNECT_CAUSE: "NORMAL_CLEARING", utils.COST: "0.12", utils.RATED: "true", "field_extr1": "val_extr1", "fieldextr2": "valextr2"} + cgrCdr := CgrCdr{utils.CGRID: "164b0422fdc6a5117031b427439482c6a4f90e41", + utils.TOR: utils.VOICE, utils.OriginID: "dsafdsaf", + utils.OriginHost: "192.168.1.1", + utils.Source: "internal_test", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", utils.Category: "call", + utils.Account: "1001", utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: "2013-11-07T08:42:20Z", + utils.AnswerTime: "2013-11-07T08:42:26Z", + utils.Usage: "10s", utils.COST: "0.12", + utils.RATED: "true", "field_extr1": "val_extr1", + "fieldextr2": "valextr2"} expctRtCdr := &CDR{CGRID: cgrCdr[utils.CGRID], ToR: cgrCdr[utils.TOR], OriginID: cgrCdr[utils.OriginID], diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go deleted file mode 100644 index 4aca0c369..000000000 --- a/sessionmanager/kamailiosm.go +++ /dev/null @@ -1,314 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package sessionmanager - -import ( - "errors" - "fmt" - "log" - "reflect" - "regexp" - "time" - - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" - "github.com/cgrates/kamevapi" - "github.com/cgrates/rpcclient" -) - -func NewKamailioSessionManager(smKamCfg *config.SmKamConfig, rater, cdrsrv, - rlS rpcclient.RpcClientConnection, timezone string) (ksm *KamailioSessionManager, err error) { - if rlS != nil && reflect.ValueOf(rlS).IsNil() { - rlS = nil - } - ksm = &KamailioSessionManager{cfg: smKamCfg, rater: rater, cdrsrv: cdrsrv, rlS: rlS, - timezone: timezone, conns: make(map[string]*kamevapi.KamEvapi), sessions: NewSessions()} - return -} - -type KamailioSessionManager struct { - cfg *config.SmKamConfig - rater rpcclient.RpcClientConnection - cdrsrv rpcclient.RpcClientConnection - rlS rpcclient.RpcClientConnection - timezone string - conns map[string]*kamevapi.KamEvapi - sessions *Sessions -} - -func (self *KamailioSessionManager) getSuppliers(kev KamEvent) (string, error) { - cd, err := kev.AsCallDescriptor() - cd.CgrID = kev.GetCgrId(self.timezone) - if err != nil { - utils.Logger.Info(fmt.Sprintf(" LCR_PREPROCESS_ERROR error: %s", err.Error())) - return "", errors.New("LCR_PREPROCESS_ERROR") - } - var lcr engine.LCRCost - if err = self.Rater().Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil { - utils.Logger.Info(fmt.Sprintf(" LCR_API_ERROR error: %s", err.Error())) - return "", errors.New("LCR_API_ERROR") - } - if lcr.HasErrors() { - lcr.LogErrors() - return "", errors.New("LCR_COMPUTE_ERROR") - } - return lcr.SuppliersString() -} - -func (self *KamailioSessionManager) allocateResources(kev KamEvent) (err error) { - if self.rlS == nil { - return errors.New("no RLs connection") - } - var ev map[string]interface{} - if ev, err = kev.AsMapStringIface(); err != nil { - return - } - attrRU := utils.ArgRSv1ResourceUsage{ - CGREvent: utils.CGREvent{ - Tenant: kev.GetTenant(utils.META_DEFAULT), - Event: ev, - }, - UsageID: kev.GetUUID(), - Units: 1, // One channel reserved - } - var reply string - return self.rlS.Call(utils.ResourceSv1AllocateResources, attrRU, &reply) -} - -func (self *KamailioSessionManager) onCgrAuth(evData []byte, connId string) { - kev, err := NewKamEvent(evData) - if err != nil { - utils.Logger.Info(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", evData, err.Error())) - return - } - if kev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request - return - } - if kev.MissingParameter(self.timezone) { - if kar, err := kev.AsKamAuthReply(0.0, "", false, "", utils.ErrMandatoryIeMissing); err != nil { - utils.Logger.Err(fmt.Sprintf(" Failed building auth reply %s", err.Error())) - } else if err = self.conns[connId].Send(kar.String()); err != nil { - utils.Logger.Err(fmt.Sprintf(" Failed sending auth reply %s", err.Error())) - } - return - } - var remainingDuration float64 - var errReply error - if errReply = self.rater.Call("Responder.GetDerivedMaxSessionTime", - kev.AsCDR(self.timezone), &remainingDuration); errReply != nil { - utils.Logger.Err(fmt.Sprintf(" Could not get max session time, error: %s", errReply.Error())) - } - var supplStr string - var errSuppl error - if kev.ComputeLcr() { - if supplStr, errSuppl = self.getSuppliers(kev); errSuppl != nil { - utils.Logger.Err(fmt.Sprintf(" Could not get suppliers, error: %s", errSuppl.Error())) - } - } - if errReply == nil { // Overwrite the error from maxSessionTime with the one from suppliers if nil - errReply = errSuppl - } - resourceAllowed := true - if self.rlS != nil { - if err := self.allocateResources(kev); err != nil { - utils.Logger.Err(fmt.Sprintf(" RLs error: %s", err.Error())) - resourceAllowed = false - } - } - if kar, err := kev.AsKamAuthReply(remainingDuration, supplStr, resourceAllowed, "", errReply); err != nil { - utils.Logger.Err(fmt.Sprintf(" Failed building auth reply %s", err.Error())) - } else if err = self.conns[connId].Send(kar.String()); err != nil { - utils.Logger.Err(fmt.Sprintf(" Failed sending auth reply %s", err.Error())) - } -} - -func (self *KamailioSessionManager) onCgrLcrReq(evData []byte, connId string) { - kev, err := NewKamEvent(evData) - if err != nil { - utils.Logger.Info(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", string(evData), err.Error())) - return - } - supplStr, err := self.getSuppliers(kev) - kamLcrReply, errReply := kev.AsKamAuthReply(0, supplStr, false, "", err) - kamLcrReply.Event = CGR_LCR_REPLY // Hit the CGR_LCR_REPLY event route on Kamailio side - if errReply != nil { - utils.Logger.Err(fmt.Sprintf(" Failed building LCR reply %s", errReply.Error())) - } else if err = self.conns[connId].Send(kamLcrReply.String()); err != nil { - utils.Logger.Err(fmt.Sprintf(" Failed sending LCR reply %s", err.Error())) - } -} - -// onCgrRLReq is the handler for CGR_RL_REQUEST events coming from Kamailio -func (self *KamailioSessionManager) onCgrRLReq(evData []byte, connId string) { - kev, err := NewKamEvent(evData) - if err != nil { - utils.Logger.Info(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", string(evData), err.Error())) - return - } - resourceAllowed := true - if err := self.allocateResources(kev); err != nil { - utils.Logger.Err(fmt.Sprintf(" RLs error: %s", err.Error())) - resourceAllowed = false - } - kamRLReply, errReply := kev.AsKamAuthReply(0, "", resourceAllowed, "", err) - kamRLReply.Event = CGR_RL_REPLY // Hit the CGR_LCR_REPLY event route on Kamailio side - if errReply != nil { - utils.Logger.Err(fmt.Sprintf(" Failed building RL reply %s", errReply.Error())) - } else if err = self.conns[connId].Send(kamRLReply.String()); err != nil { - utils.Logger.Err(fmt.Sprintf(" Failed sending RL reply %s", err.Error())) - } -} - -func (self *KamailioSessionManager) onCallStart(evData []byte, connId string) { - kamEv, err := NewKamEvent(evData) - if err != nil { - utils.Logger.Err(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", evData, err.Error())) - return - } - if kamEv.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request - return - } - if kamEv.MissingParameter(self.timezone) { - self.DisconnectSession(kamEv, connId, utils.ErrMandatoryIeMissing.Error()) - return - } - s := NewSession(kamEv, connId, self) - if s != nil { - self.sessions.indexSession(s) - } -} - -func (self *KamailioSessionManager) onCallEnd(evData []byte, connId string) { - kev, err := NewKamEvent(evData) - if err != nil { - utils.Logger.Err(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", evData, err.Error())) - return - } - if kev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request - return - } - if kev.MissingParameter(self.timezone) { - utils.Logger.Err(fmt.Sprintf(" Mandatory IE missing out of event: %+v", kev)) - } - go self.ProcessCdr(kev.AsCDR(self.Timezone())) - if self.rlS != nil { // Release RLs resource - go func() { - ev, err := kev.AsMapStringIface() - if err != nil { - utils.Logger.Err(fmt.Sprintf(" RLs error: %s", err.Error())) - return - } - var reply string - attrRU := utils.ArgRSv1ResourceUsage{ - CGREvent: utils.CGREvent{ - Tenant: kev.GetTenant(utils.META_DEFAULT), - Event: ev, - }, - UsageID: kev.GetUUID(), - Units: 1, - } - if err := self.rlS.Call(utils.ResourceSv1ReleaseResources, attrRU, &reply); err != nil { - utils.Logger.Err(fmt.Sprintf(" RLs API error: %s", err.Error())) - } - }() - } - if s := self.sessions.getSession(kev.GetUUID()); s != nil { - if err := self.sessions.removeSession(s, kev); err != nil { - utils.Logger.Err(err.Error()) - } - } - -} - -func (self *KamailioSessionManager) Connect() error { - var err error - eventHandlers := map[*regexp.Regexp][]func([]byte, string){ - regexp.MustCompile(CGR_AUTH_REQUEST): []func([]byte, string){self.onCgrAuth}, - regexp.MustCompile(CGR_LCR_REQUEST): []func([]byte, string){self.onCgrLcrReq}, - regexp.MustCompile(CGR_RL_REQUEST): []func([]byte, string){self.onCgrRLReq}, - regexp.MustCompile(CGR_CALL_START): []func([]byte, string){self.onCallStart}, - regexp.MustCompile(CGR_CALL_END): []func([]byte, string){self.onCallEnd}, - } - errChan := make(chan error) - for _, connCfg := range self.cfg.EvapiConns { - connId := utils.GenUUID() - logger := log.New(utils.Logger, "KamEvapi:", 2) - if self.conns[connId], err = kamevapi.NewKamEvapi(connCfg.Address, connId, connCfg.Reconnects, eventHandlers, logger); err != nil { - return err - } - go func() { // Start reading in own goroutine, return on error - if err := self.conns[connId].ReadEvents(); err != nil { - errChan <- err - } - }() - } - err = <-errChan // Will keep the Connect locked until the first error in one of the connections - return err -} - -func (self *KamailioSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error { - sessionIds := ev.GetSessionIds() - disconnectEv := &KamSessionDisconnect{Event: CGR_SESSION_DISCONNECT, HashEntry: sessionIds[0], HashId: sessionIds[1], Reason: notify} - if err := self.conns[connId].Send(disconnectEv.String()); err != nil { - utils.Logger.Err(fmt.Sprintf(" Failed sending disconnect request, error %s, connection id: %s", err.Error(), connId)) - return err - } - return nil -} - -func (self *KamailioSessionManager) DebitInterval() time.Duration { - return self.cfg.DebitInterval -} -func (self *KamailioSessionManager) CdrSrv() rpcclient.RpcClientConnection { - return self.cdrsrv -} -func (self *KamailioSessionManager) Rater() rpcclient.RpcClientConnection { - return self.rater -} - -func (self *KamailioSessionManager) ProcessCdr(cdr *engine.CDR) error { - if !self.cfg.CreateCdr { - return nil - } - var reply string - if err := self.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply); err != nil { - utils.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", cdr.CGRID, cdr.OriginID, err.Error())) - } - return nil -} - -func (sm *KamailioSessionManager) WarnSessionMinDuration(sessionUuid, connId string) { -} - -func (self *KamailioSessionManager) Shutdown() error { - return nil -} - -func (self *KamailioSessionManager) Sessions() []*Session { - return self.sessions.getSessions() -} - -func (self *KamailioSessionManager) SyncSessions() error { - return nil -} - -func (self *KamailioSessionManager) Timezone() string { - return self.timezone -} diff --git a/sessionmanager/kamailiosm_test.go b/sessionmanager/kamailiosm_test.go deleted file mode 100644 index 3562e8337..000000000 --- a/sessionmanager/kamailiosm_test.go +++ /dev/null @@ -1,26 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package sessionmanager - -import ( - "testing" -) - -func TestKamSMInterface(t *testing.T) { - var _ SessionManager = SessionManager(new(KamailioSessionManager)) -} diff --git a/sessionmanager/kamevent.go b/sessionmanager/kamevent.go deleted file mode 100644 index eed1da06e..000000000 --- a/sessionmanager/kamevent.go +++ /dev/null @@ -1,407 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package sessionmanager - -import ( - "encoding/json" - "strconv" - "strings" - "time" - - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -const ( - EVENT = "event" - CGR_AUTH_REQUEST = "CGR_AUTH_REQUEST" - CGR_LCR_REQUEST = "CGR_LCR_REQUEST" - CGR_AUTH_REPLY = "CGR_AUTH_REPLY" - CGR_LCR_REPLY = "CGR_LCR_REPLY" - CGR_SESSION_DISCONNECT = "CGR_SESSION_DISCONNECT" - CGR_CALL_START = "CGR_CALL_START" - CGR_CALL_END = "CGR_CALL_END" - CGR_RL_REQUEST = "CGR_RL_REQUEST" - CGR_RL_REPLY = "CGR_RL_REPLY" - CGR_SETUPTIME = "cgr_setuptime" - CGR_ANSWERTIME = "cgr_answertime" - CGR_STOPTIME = "cgr_stoptime" - CGR_DURATION = "cgr_duration" - CGR_PDD = "cgr_pdd" - - KAM_TR_INDEX = "tr_index" - KAM_TR_LABEL = "tr_label" - HASH_ENTRY = "h_entry" - HASH_ID = "h_id" -) - -var primaryFields = []string{EVENT, CALLID, FROM_TAG, HASH_ENTRY, HASH_ID, CGR_ACCOUNT, CGR_SUBJECT, CGR_DESTINATION, - CGR_CATEGORY, CGR_TENANT, CGR_REQTYPE, CGR_ANSWERTIME, CGR_SETUPTIME, CGR_STOPTIME, CGR_DURATION, CGR_PDD, utils.CGR_SUPPLIER, utils.CGR_DISCONNECT_CAUSE} - -type KamAuthReply struct { - Event string // Kamailio will use this to differentiate between requests and replies - TransactionIndex int // Original transaction index - TransactionLabel int // Original transaction label - MaxSessionTime int // Maximum session time in case of success, -1 for unlimited - Suppliers string // List of suppliers, comma separated - ResourceAllocated bool - AllocationMessage string - Error string // Reply in case of error -} - -func (self *KamAuthReply) String() string { - mrsh, _ := json.Marshal(self) - return string(mrsh) -} - -type KamLcrReply struct { - Event string - Suppliers string - Error error -} - -func (self *KamLcrReply) String() string { - self.Event = CGR_LCR_REPLY - mrsh, _ := json.Marshal(self) - return string(mrsh) -} - -type KamSessionDisconnect struct { - Event string - HashEntry string - HashId string - Reason string -} - -func (self *KamSessionDisconnect) String() string { - mrsh, _ := json.Marshal(self) - return string(mrsh) -} - -func NewKamEvent(kamEvData []byte) (KamEvent, error) { - kev := make(map[string]string) - if err := json.Unmarshal(kamEvData, &kev); err != nil { - return nil, err - } - return kev, nil -} - -// Hold events received from Kamailio -type KamEvent map[string]string - -// Backwards compatibility, should be AsEvent -func (kev KamEvent) AsEvent(ignored string) engine.Event { - return engine.Event(kev) -} - -func (kev KamEvent) GetName() string { - return kev[EVENT] -} -func (kev KamEvent) GetCgrId(timezone string) string { - setupTime, _ := kev.GetSetupTime(utils.META_DEFAULT, timezone) - return utils.Sha1(kev.GetUUID(), setupTime.UTC().String()) -} -func (kev KamEvent) GetUUID() string { - return kev[CALLID] + ";" + kev[FROM_TAG] // ToTag not available in callStart event -} -func (kev KamEvent) GetSessionIds() []string { - return []string{kev[HASH_ENTRY], kev[HASH_ID]} -} -func (kev KamEvent) GetDirection(fieldName string) string { - return utils.OUT -} -func (kev KamEvent) GetAccount(fieldName string) string { - if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value - return fieldName[len(utils.STATIC_VALUE_PREFIX):] - } - return utils.FirstNonEmpty(kev[fieldName], kev[CGR_ACCOUNT]) -} - -func (kev KamEvent) GetSubject(fieldName string) string { - if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value - return fieldName[len(utils.STATIC_VALUE_PREFIX):] - } - return utils.FirstNonEmpty(kev[fieldName], kev[CGR_SUBJECT], kev.GetAccount(fieldName)) -} -func (kev KamEvent) GetDestination(fieldName string) string { - if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value - return fieldName[len(utils.STATIC_VALUE_PREFIX):] - } - return utils.FirstNonEmpty(kev[fieldName], kev[CGR_DESTINATION]) -} -func (kev KamEvent) GetCallDestNr(fieldName string) string { - return kev.GetDestination(utils.META_DEFAULT) -} -func (kev KamEvent) GetCategory(fieldName string) string { - if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value - return fieldName[len(utils.STATIC_VALUE_PREFIX):] - } - return utils.FirstNonEmpty(kev[fieldName], kev[CGR_CATEGORY], config.CgrConfig().DefaultCategory) -} -func (kev KamEvent) GetTenant(fieldName string) string { - if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value - return fieldName[len(utils.STATIC_VALUE_PREFIX):] - } - return utils.FirstNonEmpty(kev[fieldName], kev[CGR_TENANT], config.CgrConfig().DefaultTenant) -} -func (kev KamEvent) GetReqType(fieldName string) string { - if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value - return fieldName[len(utils.STATIC_VALUE_PREFIX):] - } - return utils.FirstNonEmpty(kev[fieldName], kev[CGR_REQTYPE], config.CgrConfig().DefaultReqType) -} -func (kev KamEvent) GetAnswerTime(fieldName, timezone string) (time.Time, error) { - aTimeStr := utils.FirstNonEmpty(kev[fieldName], kev[CGR_ANSWERTIME]) - if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value - aTimeStr = fieldName[len(utils.STATIC_VALUE_PREFIX):] - } - return utils.ParseTimeDetectLayout(aTimeStr, timezone) -} -func (kev KamEvent) GetSetupTime(fieldName, timezone string) (time.Time, error) { - sTimeStr := utils.FirstNonEmpty(kev[fieldName], kev[CGR_SETUPTIME], kev[CGR_ANSWERTIME]) - if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value - sTimeStr = fieldName[len(utils.STATIC_VALUE_PREFIX):] - } - return utils.ParseTimeDetectLayout(sTimeStr, timezone) -} -func (kev KamEvent) GetEndTime(fieldName, timezone string) (time.Time, error) { - return utils.ParseTimeDetectLayout(kev[CGR_STOPTIME], timezone) -} -func (kev KamEvent) GetDuration(fieldName string) (time.Duration, error) { - durStr := utils.FirstNonEmpty(kev[fieldName], kev[CGR_DURATION]) - if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value - durStr = fieldName[len(utils.STATIC_VALUE_PREFIX):] - } - return utils.ParseDurationWithSecs(durStr) -} -func (kev KamEvent) GetPdd(fieldName string) (time.Duration, error) { - var pddStr string - if utils.IsSliceMember([]string{utils.PDD, utils.META_DEFAULT}, fieldName) { - pddStr = kev[CGR_PDD] - } else if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value - pddStr = fieldName[len(utils.STATIC_VALUE_PREFIX):] - } else { - pddStr = kev[fieldName] - } - return utils.ParseDurationWithSecs(pddStr) -} -func (kev KamEvent) GetSupplier(fieldName string) string { - if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value - return fieldName[len(utils.STATIC_VALUE_PREFIX):] - } - return utils.FirstNonEmpty(kev[fieldName], kev[utils.CGR_SUPPLIER]) -} - -func (kev KamEvent) GetDisconnectCause(fieldName string) string { - if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value - return fieldName[len(utils.STATIC_VALUE_PREFIX):] - } - return utils.FirstNonEmpty(kev[fieldName], kev[utils.CGR_DISCONNECT_CAUSE]) -} - -//ToDo: extract the IP of the kamailio server generating the event -func (kev KamEvent) GetOriginatorIP(string) string { - return "127.0.0.1" -} -func (kev KamEvent) GetExtraFields() map[string]string { - extraFields := make(map[string]string) - for field, val := range kev { - if !utils.IsSliceMember(primaryFields, field) { - extraFields[field] = val - } - } - return extraFields -} -func (kev KamEvent) GetCdrSource() string { - return "KAMAILIO_" + kev.GetName() -} - -func (kev KamEvent) MissingParameter(timezone string) bool { - var nullTime time.Time - switch kev.GetName() { - case CGR_AUTH_REQUEST: - if setupTime, err := kev.GetSetupTime(utils.META_DEFAULT, timezone); err != nil || setupTime == nullTime { - return true - } - return len(kev.GetAccount(utils.META_DEFAULT)) == 0 || - len(kev.GetDestination(utils.META_DEFAULT)) == 0 || - len(kev[KAM_TR_INDEX]) == 0 || len(kev[KAM_TR_LABEL]) == 0 - case CGR_LCR_REQUEST: - return len(kev.GetAccount(utils.META_DEFAULT)) == 0 || - len(kev.GetDestination(utils.META_DEFAULT)) == 0 || - len(kev[KAM_TR_INDEX]) == 0 || len(kev[KAM_TR_LABEL]) == 0 - case CGR_CALL_START: - if aTime, err := kev.GetAnswerTime(utils.META_DEFAULT, timezone); err != nil || aTime == nullTime { - return true - } - return len(kev.GetUUID()) == 0 || - len(kev.GetAccount(utils.META_DEFAULT)) == 0 || - len(kev.GetDestination(utils.META_DEFAULT)) == 0 || - len(kev[HASH_ENTRY]) == 0 || len(kev[HASH_ID]) == 0 - case CGR_CALL_END: - return len(kev.GetUUID()) == 0 || - len(kev.GetAccount(utils.META_DEFAULT)) == 0 || - len(kev.GetDestination(utils.META_DEFAULT)) == 0 || - len(kev[CGR_DURATION]) == 0 - default: - return true - } - -} - -// Useful for CDR generation -func (kev KamEvent) ParseEventValue(rsrFld *utils.RSRField, timezone string) string { - sTime, _ := kev.GetSetupTime(utils.META_DEFAULT, config.CgrConfig().DefaultTimezone) - aTime, _ := kev.GetAnswerTime(utils.META_DEFAULT, config.CgrConfig().DefaultTimezone) - duration, _ := kev.GetDuration(utils.META_DEFAULT) - switch rsrFld.Id { - case utils.CGRID: - return rsrFld.ParseValue(kev.GetCgrId(timezone)) - case utils.TOR: - return rsrFld.ParseValue(utils.VOICE) - case utils.OriginID: - return rsrFld.ParseValue(kev.GetUUID()) - case utils.OriginHost: - return rsrFld.ParseValue(kev.GetOriginatorIP(utils.META_DEFAULT)) - case utils.Source: - return rsrFld.ParseValue(kev.GetCdrSource()) - case utils.RequestType: - return rsrFld.ParseValue(kev.GetReqType(utils.META_DEFAULT)) - case utils.Direction: - return rsrFld.ParseValue(kev.GetDirection(utils.META_DEFAULT)) - case utils.Tenant: - return rsrFld.ParseValue(kev.GetTenant(utils.META_DEFAULT)) - case utils.Category: - return rsrFld.ParseValue(kev.GetCategory(utils.META_DEFAULT)) - case utils.Account: - return rsrFld.ParseValue(kev.GetAccount(utils.META_DEFAULT)) - case utils.Subject: - return rsrFld.ParseValue(kev.GetSubject(utils.META_DEFAULT)) - case utils.Destination: - return rsrFld.ParseValue(kev.GetDestination(utils.META_DEFAULT)) - case utils.SetupTime: - return rsrFld.ParseValue(sTime.String()) - case utils.AnswerTime: - return rsrFld.ParseValue(aTime.String()) - case utils.Usage: - return rsrFld.ParseValue(strconv.FormatFloat(utils.Round(duration.Seconds(), 0, utils.ROUNDING_MIDDLE), 'f', -1, 64)) - case utils.PDD: - return rsrFld.ParseValue(strconv.FormatFloat(utils.Round(duration.Seconds(), 0, utils.ROUNDING_MIDDLE), 'f', -1, 64)) - case utils.SUPPLIER: - return rsrFld.ParseValue(kev.GetSupplier(utils.META_DEFAULT)) - case utils.DISCONNECT_CAUSE: - return rsrFld.ParseValue(kev.GetDisconnectCause(utils.META_DEFAULT)) - case utils.MEDI_RUNID: - return rsrFld.ParseValue(utils.META_DEFAULT) - case utils.COST: - return rsrFld.ParseValue("-1.0") - default: - return rsrFld.ParseValue(kev.GetExtraFields()[rsrFld.Id]) - } -} -func (kev KamEvent) PassesFieldFilter(*utils.RSRField) (bool, string) { - return false, "" -} - -func (kev KamEvent) AsCDR(timezone string) *engine.CDR { - storCdr := new(engine.CDR) - storCdr.CGRID = kev.GetCgrId(timezone) - storCdr.ToR = utils.VOICE - storCdr.OriginID = kev.GetUUID() - storCdr.OriginHost = kev.GetOriginatorIP(utils.META_DEFAULT) - storCdr.Source = kev.GetCdrSource() - storCdr.RequestType = kev.GetReqType(utils.META_DEFAULT) - storCdr.Tenant = kev.GetTenant(utils.META_DEFAULT) - storCdr.Category = kev.GetCategory(utils.META_DEFAULT) - storCdr.Account = kev.GetAccount(utils.META_DEFAULT) - storCdr.Subject = kev.GetSubject(utils.META_DEFAULT) - storCdr.Destination = kev.GetDestination(utils.META_DEFAULT) - storCdr.SetupTime, _ = kev.GetSetupTime(utils.META_DEFAULT, timezone) - storCdr.AnswerTime, _ = kev.GetAnswerTime(utils.META_DEFAULT, timezone) - storCdr.Usage, _ = kev.GetDuration(utils.META_DEFAULT) - storCdr.ExtraFields = kev.GetExtraFields() - storCdr.Cost = -1 - - return storCdr -} - -func (kev KamEvent) String() string { - mrsh, _ := json.Marshal(kev) - return string(mrsh) -} - -func (kev KamEvent) AsKamAuthReply(maxSessionTime float64, suppliers string, - resAllocated bool, allocationMessage string, rplyErr error) (kar *KamAuthReply, err error) { - kar = &KamAuthReply{Event: CGR_AUTH_REPLY, Suppliers: suppliers, - ResourceAllocated: resAllocated, AllocationMessage: allocationMessage} - if rplyErr != nil { - kar.Error = rplyErr.Error() - } - if _, hasIt := kev[KAM_TR_INDEX]; !hasIt { - return nil, utils.NewErrMandatoryIeMissing(KAM_TR_INDEX, "") - } - if kar.TransactionIndex, err = strconv.Atoi(kev[KAM_TR_INDEX]); err != nil { - return nil, err - } - if _, hasIt := kev[KAM_TR_LABEL]; !hasIt { - return nil, utils.NewErrMandatoryIeMissing(KAM_TR_LABEL, "") - } - if kar.TransactionLabel, err = strconv.Atoi(kev[KAM_TR_LABEL]); err != nil { - return nil, err - } - if maxSessionTime != -1 { // Convert maxSessionTime from nanoseconds into seconds - maxSessionDur := time.Duration(maxSessionTime) - maxSessionTime = maxSessionDur.Seconds() - } - kar.MaxSessionTime = int(utils.Round(maxSessionTime, 0, utils.ROUNDING_MIDDLE)) - - return kar, nil -} - -// Converts into CallDescriptor due to responder interface needs -func (kev KamEvent) AsCallDescriptor() (*engine.CallDescriptor, error) { - lcrReq := &engine.LcrRequest{ - Direction: kev.GetDirection(utils.META_DEFAULT), - Tenant: kev.GetTenant(utils.META_DEFAULT), - Category: kev.GetCategory(utils.META_DEFAULT), - Account: kev.GetAccount(utils.META_DEFAULT), - Subject: kev.GetSubject(utils.META_DEFAULT), - Destination: kev.GetDestination(utils.META_DEFAULT), - SetupTime: utils.FirstNonEmpty(kev[CGR_SETUPTIME], kev[CGR_ANSWERTIME]), - Duration: kev[CGR_DURATION], - } - return lcrReq.AsCallDescriptor(config.CgrConfig().DefaultTimezone) -} - -func (kev KamEvent) ComputeLcr() bool { - if computeLcr, err := strconv.ParseBool(kev[utils.CGR_COMPUTELCR]); err != nil { - return false - } else { - return computeLcr - } -} - -func (kev KamEvent) AsMapStringIface() (mp map[string]interface{}, err error) { - mp = make(map[string]interface{}, len(kev)) - for k, v := range kev { - mp[k] = v - } - return -} diff --git a/sessionmanager/kamevent_test.go b/sessionmanager/kamevent_test.go deleted file mode 100644 index 7a068c94a..000000000 --- a/sessionmanager/kamevent_test.go +++ /dev/null @@ -1,127 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package sessionmanager - -import ( - "reflect" - "testing" - "time" - - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -var kamEv = KamEvent{KAM_TR_INDEX: "29223", KAM_TR_LABEL: "698469260", "callid": "ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ", "from_tag": "eb082607", "to_tag": "4ea9687f", "cgr_account": "dan", - "cgr_reqtype": utils.META_PREPAID, "cgr_subject": "dan", "cgr_destination": "+4986517174963", "cgr_tenant": "itsyscom.com", - "cgr_duration": "20", utils.CGR_SUPPLIER: "suppl2", utils.CGR_DISCONNECT_CAUSE: "200", "extra1": "val1", "extra2": "val2"} - -func TestKamailioEventInterface(t *testing.T) { - var _ engine.Event = engine.Event(kamEv) -} - -func TestNewKamEvent(t *testing.T) { - evStr := `{"event":"CGR_CALL_END", - "callid":"46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0", - "from_tag":"bf71ad59", - "to_tag":"7351fecf", - "cgr_reqtype":"*postpaid", - "cgr_account":"1001", - "cgr_destination":"1002", - "cgr_answertime":"1419839310", - "cgr_duration":"3", - "cgr_supplier":"supplier2", - "cgr_disconnectcause": "200", - "cgr_pdd": "4"}` - eKamEv := KamEvent{"event": "CGR_CALL_END", "callid": "46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0", "from_tag": "bf71ad59", "to_tag": "7351fecf", - "cgr_reqtype": utils.META_POSTPAID, "cgr_account": "1001", "cgr_destination": "1002", "cgr_answertime": "1419839310", "cgr_duration": "3", CGR_PDD: "4", - utils.CGR_SUPPLIER: "supplier2", - utils.CGR_DISCONNECT_CAUSE: "200"} - if kamEv, err := NewKamEvent([]byte(evStr)); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(eKamEv, kamEv) { - t.Error("Received: ", kamEv) - } -} - -func TestKevAsKamAuthReply(t *testing.T) { - expectedKar := &KamAuthReply{Event: CGR_AUTH_REPLY, TransactionIndex: 29223, TransactionLabel: 698469260, - MaxSessionTime: 1200, ResourceAllocated: true, Suppliers: "supplier1,supplier2"} - if rcvKar, err := kamEv.AsKamAuthReply(1200000000000.0, "supplier1,supplier2", true, "", nil); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(expectedKar, rcvKar) { - t.Error("Received KAR: ", rcvKar) - } -} - -func TestKevMissingParameter(t *testing.T) { - kamEv := KamEvent{"event": "CGR_AUTH_REQUEST", "tr_index": "36045", "tr_label": "612369399", "cgr_reqtype": utils.META_POSTPAID, - "cgr_account": "1001", "cgr_destination": "1002"} - if !kamEv.MissingParameter("") { - t.Error("Failed detecting missing parameters") - } - kamEv["cgr_setuptime"] = "1419962256" - if kamEv.MissingParameter("") { - t.Error("False detecting missing parameters") - } - kamEv = KamEvent{"event": "UNKNOWN"} - if !kamEv.MissingParameter("") { - t.Error("Failed detecting missing parameters") - } - kamEv = KamEvent{"event": CGR_LCR_REQUEST, "tr_index": "36045", "tr_label": "612369399", "cgr_reqtype": utils.META_POSTPAID, - "cgr_account": "1001"} - if !kamEv.MissingParameter("") { - t.Error("Failed detecting missing parameters") - } - kamEv = KamEvent{"event": CGR_LCR_REQUEST, CGR_ACCOUNT: "1001", CGR_DESTINATION: "1002", "tr_index": "36045", "tr_label": "612369399"} - if kamEv.MissingParameter("") { - t.Error("False detecting missing parameters") - } - kamEv = KamEvent{"event": "CGR_CALL_START", "callid": "9d28ec3ee068babdfe036623f42c0969@0:0:0:0:0:0:0:0", "from_tag": "3131b566", - "cgr_reqtype": utils.META_POSTPAID, "cgr_account": "1001", "cgr_destination": "1002"} - if !kamEv.MissingParameter("") { - t.Error("Failed detecting missing parameters") - } - kamEv["h_entry"] = "463" - kamEv["h_id"] = "2605" - kamEv["cgr_answertime"] = "1419964961" - if kamEv.MissingParameter("") { - t.Error("False detecting missing parameters") - } -} - -func TestKevAsCallDescriptor(t *testing.T) { - sTime := time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC) - kamEv := KamEvent{"event": CGR_LCR_REQUEST, CGR_ACCOUNT: "1001", CGR_DESTINATION: "1002", CGR_SETUPTIME: sTime.String()} - eCd := &engine.CallDescriptor{ - Direction: utils.OUT, - Tenant: config.CgrConfig().DefaultTenant, - Category: config.CgrConfig().DefaultCategory, - Account: kamEv[CGR_ACCOUNT], - Subject: kamEv[CGR_ACCOUNT], - Destination: kamEv[CGR_DESTINATION], - TimeStart: sTime, - TimeEnd: sTime.Add(time.Duration(1) * time.Minute), - } - - if cd, err := kamEv.AsCallDescriptor(); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(eCd, cd) { - t.Errorf("Expecting: %+v, received: %+v", eCd, cd) - } -} diff --git a/sessionmanager/osipsevent.go b/sessionmanager/osipsevent.go index d246d2ee3..152769f87 100644 --- a/sessionmanager/osipsevent.go +++ b/sessionmanager/osipsevent.go @@ -53,6 +53,11 @@ const ( OSIPS_INSUFFICIENT_FUNDS = "INSUFFICIENT_FUNDS" OSIPS_DIALOG_ID = "dialog_id" OSIPS_SIPCODE = "sip_code" + CGR_SETUPTIME = "cgr_setuptime" + CGR_ANSWERTIME = "cgr_answertime" + CGR_STOPTIME = "cgr_stoptime" + CGR_DURATION = "cgr_duration" + CGR_PDD = "cgr_pdd" ) func NewOsipsEvent(osipsDagramEvent *osipsdagram.OsipsEvent) (*OsipsEvent, error) { diff --git a/sessionmanager/smg_event_test.go b/sessionmanager/smg_event_test.go index e67f668e0..6252713cf 100644 --- a/sessionmanager/smg_event_test.go +++ b/sessionmanager/smg_event_test.go @@ -35,7 +35,6 @@ func TestSMGenericEventParseFields(t *testing.T) { smGev[utils.EVENT_NAME] = "TEST_EVENT" smGev[utils.TOR] = "*voice" smGev[utils.OriginID] = "12345" - smGev[utils.Direction] = "*out" smGev[utils.Account] = "account1" smGev[utils.Subject] = "subject1" smGev[utils.Destination] = "+4986517174963" @@ -46,9 +45,6 @@ func TestSMGenericEventParseFields(t *testing.T) { smGev[utils.AnswerTime] = "2015-11-09 14:22:02" smGev[utils.Usage] = "1m23s" smGev[utils.LastUsed] = "21s" - smGev[utils.PDD] = "300ms" - smGev[utils.SUPPLIER] = "supplier1" - smGev[utils.DISCONNECT_CAUSE] = "NORMAL_DISCONNECT" smGev[utils.OriginHost] = "127.0.0.1" smGev["Extra1"] = "Value1" smGev["Extra2"] = 5 @@ -64,9 +60,6 @@ func TestSMGenericEventParseFields(t *testing.T) { if !reflect.DeepEqual(smGev.GetSessionIds(), []string{"12345"}) { t.Error("Unexpected: ", smGev.GetSessionIds()) } - if smGev.GetDirection(utils.META_DEFAULT) != "*out" { - t.Error("Unexpected: ", smGev.GetDirection(utils.META_DEFAULT)) - } if smGev.GetTOR(utils.META_DEFAULT) != "*voice" { t.Error("Unexpected: ", smGev.GetTOR(utils.META_DEFAULT)) } @@ -113,21 +106,11 @@ func TestSMGenericEventParseFields(t *testing.T) { } else if lastUsed != time.Duration(21)*time.Second { t.Error("Unexpected: ", lastUsed) } - if pdd, err := smGev.GetPdd(utils.META_DEFAULT); err != nil { - t.Error(err) - } else if pdd != time.Duration(300)*time.Millisecond { - t.Error("Unexpected: ", pdd) - } - if smGev.GetSupplier(utils.META_DEFAULT) != "supplier1" { - t.Error("Unexpected: ", smGev.GetSupplier(utils.META_DEFAULT)) - } - if smGev.GetDisconnectCause(utils.META_DEFAULT) != "NORMAL_DISCONNECT" { - t.Error("Unexpected: ", smGev.GetDisconnectCause(utils.META_DEFAULT)) - } if smGev.GetOriginatorIP(utils.META_DEFAULT) != "127.0.0.1" { t.Error("Unexpected: ", smGev.GetOriginatorIP(utils.META_DEFAULT)) } - if extrFlds := smGev.GetExtraFields(); !reflect.DeepEqual(extrFlds, map[string]string{"Extra1": "Value1", "Extra2": "5", "LastUsed": "21s"}) { + if extrFlds := smGev.GetExtraFields(); !reflect.DeepEqual(extrFlds, + map[string]string{"Extra1": "Value1", "Extra2": "5", "LastUsed": "21s"}) { t.Error("Unexpected: ", extrFlds) } } @@ -155,7 +138,6 @@ func TestSMGenericEventAsCDR(t *testing.T) { smGev[utils.EVENT_NAME] = "TEST_EVENT" smGev[utils.TOR] = utils.SMS smGev[utils.OriginID] = "12345" - smGev[utils.Direction] = utils.OUT smGev[utils.Account] = "account1" smGev[utils.Subject] = "subject1" smGev[utils.Destination] = "+4986517174963" @@ -165,9 +147,6 @@ func TestSMGenericEventAsCDR(t *testing.T) { smGev[utils.SetupTime] = "2015-11-09 14:21:24" smGev[utils.AnswerTime] = "2015-11-09 14:22:02" smGev[utils.Usage] = "1m23s" - smGev[utils.PDD] = "300ms" - smGev[utils.SUPPLIER] = "supplier1" - smGev[utils.DISCONNECT_CAUSE] = "NORMAL_DISCONNECT" smGev[utils.OriginHost] = "10.0.3.15" smGev["Extra1"] = "Value1" smGev["Extra2"] = 5 diff --git a/utils/consts.go b/utils/consts.go index e03c65932..f969dfc49 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -20,8 +20,8 @@ package utils var ( CDRExportFormats = []string{DRYRUN, MetaFileCSV, MetaFileFWV, MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap} - PrimaryCdrFields = []string{CGRID, Source, OriginHost, OriginID, TOR, RequestType, Direction, Tenant, Category, Account, Subject, Destination, SetupTime, PDD, AnswerTime, Usage, - SUPPLIER, DISCONNECT_CAUSE, COST, RATED, PartialField, MEDI_RUNID} + PrimaryCdrFields = []string{CGRID, Source, OriginHost, OriginID, TOR, RequestType, Tenant, Category, Account, Subject, Destination, SetupTime, AnswerTime, Usage, + COST, RATED, PartialField, MEDI_RUNID} GitLastLog string // If set, it will be processed as part of versioning PosterTransportContentTypes = map[string]string{ MetaHTTPjsonCDR: CONTENT_JSON, @@ -377,6 +377,8 @@ const ( MetaThresholds = "*thresholds" MetaSuppliers = "*suppliers" MetaAttributes = "*attributes" + MetaResources = "*resources" + MetaCDRs = "*cdrs" Migrator = "migrator" UnsupportedMigrationTask = "unsupported migration task" NoStorDBConnection = "not connected to StorDB" @@ -499,6 +501,7 @@ const ( MetaSessionS = "*sessions" FreeSWITCHAgent = "FreeSWITCHAgent" MetaDefault = "*default" + KamailioAgent = "KamailioAgent" ) //MetaMetrics