From df65bebd6a3b3a22b9494b4f402fabcd66cf3b4e Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 9 May 2015 19:56:45 +0200 Subject: [PATCH] Partial session manager implementation for opensips --- .../opensips/etc/opensips/opensips.cfg | 49 ++++--------- sessionmanager/kamailiosm.go | 1 - sessionmanager/osipsevent.go | 37 +++++++--- sessionmanager/osipsevent_test.go | 45 ++++++++++++ sessionmanager/osipssm.go | 73 +++++++++++++++---- 5 files changed, 145 insertions(+), 60 deletions(-) diff --git a/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg b/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg index 1f7291b83..9230be6f6 100644 --- a/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg +++ b/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg @@ -86,12 +86,14 @@ modparam("dialog", "db_mode", 0) #### ACCounting module loadmodule "acc.so" -modparam("acc", "cdr_flag", "CDR") +#modparam("acc", "cdr_flag", "CDR") +modparam("acc", "evi_flag", "CDR") modparam("acc", "evi_missed_flag", "CDR") modparam("acc", "evi_extra", "cgr_reqtype=$avp(cgr_reqtype); cgr_account=$avp(cgr_account); - cgr_subject=$avp(cgr_subject); - cgr_destination=$avp(cgr_destination)") + cgr_destination=$avp(cgr_destination); + cgr_supplier=$avp(cgr_supplier); + dialog_id=$DLG_did") #### CfgUtils module loadmodule "cfgutils.so" @@ -119,26 +121,10 @@ startup_route { raise_event("E_OPENSIPS_START"); } -event_route[E_DLG_STATE_CHANGED] { - fetch_event_params("new_state=$var(new_state)"); - if $var(new_state) == 3 { - $avp(dialog_start) = "cgr_reqtype"; - $avp(dialog_start) = $avp(cgr_reqtype); - $avp(dialog_start) = "callid"; - $avp(dialog_start) = $ci; - $avp(dialog_start) = "from_tag"; - $avp(dialog_start) = $ft; - $avp(dialog_start) = "cgr_account"; - $avp(dialog_start) = $avp(cgr_account); - $avp(dialog_start) = "cgr_subject"; - $avp(dialog_start) = $avp(cgr_subject); - $avp(dialog_start) = "cgr_destination"; - $avp(dialog_start) = $avp(cgr_destination); - $avp(dialog_start) = "created"; - $avp(dialog_start) = $Ts; - $avp(dialog_start) = "dialog_id"; - $avp(dialog_start) = $DLG_did; - raise_event("E_CGR_DIALOG_START", $avp(auth_keys), $avp(auth_vals)); +local_route { + if (is_method("BYE") ) { + #setflag(CDR); + acc_evi_request("LOCAL_DISCONNECT"); #FixMe } } @@ -157,7 +143,7 @@ route{ # take the path determined by record-routing if (loose_route()) { if (is_method("BYE")) { - #setflag(CDR); # do accounting ... + setflag(CDR); # do accounting ... } else if (is_method("INVITE")) { # even if in most of the cases is useless, do RR for # re-INVITEs alos, as some buggy clients do change route set @@ -236,6 +222,10 @@ route{ send_reply("500","Internal Server Error"); exit; } + $avp(cgr_reqtype)="*pseudoprepaid"; + #$avp(cgr_account)=$fU; + $avp(cgr_destination)=$rU; + $avp(cgr_supplier)="supplier1"; setflag(CDR); #route(CGR_AUTH_REQ); } @@ -274,10 +264,6 @@ route{ ## Send AUTH request to CGRateS engine route[CGR_AUTH_REQ] { - $avp(cgr_reqtype)="*pseudoprepaid"; - $avp(cgr_account)=$fU; - $avp(cgr_destination)=$rU; - # Code to produce the json $json(cgr_auth) := "{}"; $json(cgr_auth/id) = "1"; @@ -312,7 +298,6 @@ route[CGR_AUTH_REPLY] { route[relay] { # for INVITEs enable some additional helper routes if (is_method("INVITE") && !has_totag()) { - t_on_reply("invite_reply"); t_on_failure("missed_call"); } if (!t_relay()) { @@ -330,12 +315,6 @@ route[location] { } } -onreply_route[invite_reply] { - xlog("incoming reply\n"); - #acc_evi_request("DIALOG_START"); -} - - failure_route[missed_call] { if (t_was_cancelled()) { exit; diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index d2a36654c..af510e6bb 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -140,7 +140,6 @@ func (self *KamailioSessionManager) DisconnectSession(ev engine.Event, connId, n if err := self.conns[connId].Send(disconnectEv.String()); err != nil { engine.Logger.Err(fmt.Sprintf(" Failed sending disconnect request, error %s, connection id: %s", err.Error(), connId)) } - return } func (self *KamailioSessionManager) RemoveSession(uuid string) { for i, ss := range self.sessions { diff --git a/sessionmanager/osipsevent.go b/sessionmanager/osipsevent.go index 249e4d3c8..29b6b4a77 100644 --- a/sessionmanager/osipsevent.go +++ b/sessionmanager/osipsevent.go @@ -46,9 +46,11 @@ const ( TIME = "time" SETUP_DURATION = "setuptime" OSIPS_SETUP_TIME = "created" + OSIPS_EVENT_TIME = "time" OSIPS_DURATION = "duration" OSIPS_AUTH_OK = "AUTH_OK" OSIPS_INSUFFICIENT_FUNDS = "INSUFFICIENT_FUNDS" + OSIPS_DIALOG_ID = "dialog_id" ) func NewOsipsEvent(osipsDagramEvent *osipsdagram.OsipsEvent) (*OsipsEvent, error) { @@ -81,8 +83,9 @@ func (osipsev *OsipsEvent) GetUUID() string { return osipsev.osipsEvent.AttrValues[CALLID] + ";" + osipsev.osipsEvent.AttrValues[FROM_TAG] + ";" + osipsev.osipsEvent.AttrValues[TO_TAG] } +// Returns the dialog identifier which opensips needs to disconnect a dialog func (osipsev *OsipsEvent) GetSessionIds() []string { - return []string{osipsev.GetUUID()} + return strings.Split(osipsev.osipsEvent.AttrValues[OSIPS_DIALOG_ID], ":") } func (osipsev *OsipsEvent) GetDirection(fieldName string) string { @@ -96,7 +99,7 @@ func (osipsev *OsipsEvent) GetSubject(fieldName string) string { if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value return fieldName[len(utils.STATIC_VALUE_PREFIX):] } - return utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[CGR_SUBJECT]) + return utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[CGR_SUBJECT], osipsev.GetAccount(fieldName)) } func (osipsev *OsipsEvent) GetAccount(fieldName string) string { @@ -137,11 +140,9 @@ func (osipsev *OsipsEvent) GetReqType(fieldName string) string { return utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[CGR_REQTYPE], config.CgrConfig().DefaultReqType) } func (osipsev *OsipsEvent) GetSetupTime(fieldName string) (time.Time, error) { - sTimeStr := utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[OSIPS_SETUP_TIME]) + sTimeStr := utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[OSIPS_SETUP_TIME], osipsev.osipsEvent.AttrValues[OSIPS_EVENT_TIME]) if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value sTimeStr = fieldName[len(utils.STATIC_VALUE_PREFIX):] - } else if fieldName == utils.META_DEFAULT { - sTimeStr = osipsev.osipsEvent.AttrValues[OSIPS_SETUP_TIME] } return utils.ParseTimeDetectLayout(sTimeStr) } @@ -186,10 +187,13 @@ func (osipsEv *OsipsEvent) GetOriginatorIP(fieldName string) string { return osipsEv.osipsEvent.OriginatorAddress.IP.String() } func (osipsev *OsipsEvent) MissingParameter() bool { - return len(osipsev.GetUUID()) == 0 || - len(osipsev.GetAccount(utils.META_DEFAULT)) == 0 || - len(osipsev.GetSubject(utils.META_DEFAULT)) == 0 || - len(osipsev.GetDestination(utils.META_DEFAULT)) == 0 + if osipsev.GetName() == "E_ACC_EVENT" && osipsev.osipsEvent.AttrValues["method"] == "INVITE" { + return len(osipsev.GetUUID()) == 0 || + len(osipsev.GetAccount(utils.META_DEFAULT)) == 0 || + len(osipsev.GetDestination(utils.META_DEFAULT)) == 0 || + len(osipsev.osipsEvent.AttrValues[OSIPS_DIALOG_ID]) == 0 + } + return true } func (osipsev *OsipsEvent) ParseEventValue(*utils.RSRField) string { return "" @@ -198,8 +202,8 @@ func (osipsev *OsipsEvent) PassesFieldFilter(*utils.RSRField) (bool, string) { return false, "" } func (osipsev *OsipsEvent) GetExtraFields() map[string]string { - primaryFields := []string{"to_tag", "setuptime", "created", "method", "callid", "sip_reason", "time", "sip_code", "duration", "from_tag", - "cgr_tenant", "cgr_category", "cgr_reqtype", "cgr_account", "cgr_subject", "cgr_destination", utils.CGR_SUPPLIER} + primaryFields := []string{TO_TAG, SETUP_DURATION, OSIPS_SETUP_TIME, "method", "callid", "sip_reason", OSIPS_EVENT_TIME, "sip_code", "duration", "from_tag", "dialog_id", + CGR_TENANT, CGR_CATEGORY, CGR_REQTYPE, CGR_ACCOUNT, CGR_SUBJECT, CGR_DESTINATION, utils.CGR_SUPPLIER} extraFields := make(map[string]string) for field, val := range osipsev.osipsEvent.AttrValues { if !utils.IsSliceMember(primaryFields, field) { @@ -231,3 +235,14 @@ func (osipsEv *OsipsEvent) AsStoredCdr() *engine.StoredCdr { storCdr.Cost = -1 return storCdr } + +// Computes duration out of setup time of the callEnd +func (osipsEv *OsipsEvent) updateDurationFromEvent(updatedOsipsEv *OsipsEvent) error { + endTime, err := updatedOsipsEv.GetSetupTime(TIME) + if err != nil { + return err + } + answerTime, err := osipsEv.GetAnswerTime(utils.META_DEFAULT) + osipsEv.osipsEvent.AttrValues[OSIPS_DURATION] = endTime.Sub(answerTime).String() + return nil +} diff --git a/sessionmanager/osipsevent_test.go b/sessionmanager/osipsevent_test.go index 127275a95..ae75c6bab 100644 --- a/sessionmanager/osipsevent_test.go +++ b/sessionmanager/osipsevent_test.go @@ -142,3 +142,48 @@ func TestOsipsEventAsStoredCdr(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", eStoredCdr, storedCdr) } } + +func TestOsipsAccMissedToStoredCdr(t *testing.T) { + setupTime, _ := utils.ParseTimeDetectLayout("1431182699") + osipsEv := &OsipsEvent{osipsEvent: &osipsdagram.OsipsEvent{Name: "E_ACC_MISSED_EVENT", + AttrValues: map[string]string{"method": "INVITE", "from_tag": "5cb81eaa", "to_tag": "", "callid": "27b1e6679ad0109b5d756e42bb4c9c28@0:0:0:0:0:0:0:0", + "sip_code": "404", "sip_reason": "Not Found", "time": "1431182699", "cgr_reqtype": utils.META_PSEUDOPREPAID, + "cgr_account": "1001", "cgr_destination": "1002", utils.CGR_SUPPLIER: "supplier1", + "duration": "", "dialog_id": "3547:277000822", "extra1": "val1", "extra2": "val2"}, OriginatorAddress: addr, + }} + eStoredCdr := &engine.StoredCdr{CgrId: utils.Sha1("27b1e6679ad0109b5d756e42bb4c9c28@0:0:0:0:0:0:0:0;5cb81eaa;", setupTime.UTC().String()), + TOR: utils.VOICE, AccId: "27b1e6679ad0109b5d756e42bb4c9c28@0:0:0:0:0:0:0:0;5cb81eaa;", CdrHost: "172.16.254.77", CdrSource: "OSIPS_E_ACC_MISSED_EVENT", + ReqType: utils.META_PSEUDOPREPAID, Direction: utils.OUT, Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Supplier: "supplier1", + Destination: "1002", SetupTime: setupTime, AnswerTime: setupTime, + Usage: time.Duration(0), ExtraFields: map[string]string{"extra1": "val1", "extra2": "val2"}, Cost: -1} + if storedCdr := osipsEv.AsStoredCdr(); !reflect.DeepEqual(eStoredCdr, storedCdr) { + t.Errorf("Expecting: %+v, received: %+v", eStoredCdr, storedCdr) + } + +} + +func TestOsipsUpdateDurationFromEvent(t *testing.T) { + osipsEv := &OsipsEvent{osipsEvent: &osipsdagram.OsipsEvent{Name: "E_ACC_EVENT", + AttrValues: map[string]string{"method": "INVITE", "from_tag": "87d02470", "to_tag": "a671a98", "callid": "05dac0aaa716c9814f855f0e8fee6936@0:0:0:0:0:0:0:0", + "sip_code": "200", "sip_reason": "OK", "time": "1430579770", "cgr_reqtype": utils.META_PREPAID, + "cgr_account": "1001", "cgr_destination": "1002", utils.CGR_SUPPLIER: "supplier1", + "duration": "", "dialog_id": "3547:277000822", "extra1": "val1", "extra2": "val2"}, OriginatorAddress: addr, + }} + updatedEv := &OsipsEvent{osipsEvent: &osipsdagram.OsipsEvent{Name: "E_ACC_EVENT", + AttrValues: map[string]string{"method": "BYE", "from_tag": "a671a98", "to_tag": "87d02470", "callid": "05dac0aaa716c9814f855f0e8fee6936@0:0:0:0:0:0:0:0", + "sip_code": "200", "sip_reason": "OK", "time": "1430579797", "cgr_reqtype": "", + "cgr_account": "", "cgr_destination": "", utils.CGR_SUPPLIER: "", + "duration": "", "dialog_id": "3547:277000822"}, OriginatorAddress: addr, + }} + eOsipsEv := &OsipsEvent{osipsEvent: &osipsdagram.OsipsEvent{Name: "E_ACC_EVENT", + AttrValues: map[string]string{"method": "INVITE", "from_tag": "87d02470", "to_tag": "a671a98", "callid": "05dac0aaa716c9814f855f0e8fee6936@0:0:0:0:0:0:0:0", + "sip_code": "200", "sip_reason": "OK", "time": "1430579770", "cgr_reqtype": utils.META_PREPAID, + "cgr_account": "1001", "cgr_destination": "1002", utils.CGR_SUPPLIER: "supplier1", + "duration": "27s", "dialog_id": "3547:277000822", "extra1": "val1", "extra2": "val2"}, OriginatorAddress: addr, + }} + if err := osipsEv.updateDurationFromEvent(updatedEv); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eOsipsEv, osipsEv) { + t.Errorf("Expecting: %+v, received: %+v", eOsipsEv.osipsEvent, osipsEv.osipsEvent) + } +} diff --git a/sessionmanager/osipssm.go b/sessionmanager/osipssm.go index 1d3897970..f0bb9a357 100644 --- a/sessionmanager/osipssm.go +++ b/sessionmanager/osipssm.go @@ -24,6 +24,7 @@ import ( "fmt" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" "github.com/cgrates/osipsdagram" "strings" "time" @@ -81,10 +82,10 @@ duration:: func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, rater, cdrsrv engine.Connector) (*OsipsSessionManager, error) { osm := &OsipsSessionManager{cfg: smOsipsCfg, rater: rater, cdrsrv: cdrsrv} osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){ - "E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart}, - "E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr}, - "E_ACC_MISSED_EVENT": []func(*osipsdagram.OsipsEvent){osm.onCdr}, - "E_CGR_DIALOG_START": []func(*osipsdagram.OsipsEvent){osm.onDialogStart}, + "E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart}, // Raised when OpenSIPS starts so we can register our event handlers + "E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr}, // Raised if cdr_flag is configured + "E_ACC_MISSED_EVENT": []func(*osipsdagram.OsipsEvent){osm.onCdr}, // Raised if evi_missed_flag is configured + "E_ACC_EVENT": []func(*osipsdagram.OsipsEvent){osm.onAccEvent}, // Raised if evi_flag is configured and not cdr_flag containing start/stop events } return osm, nil } @@ -116,10 +117,6 @@ func (osm *OsipsSessionManager) Connect() (err error) { evsrv.ServeEvents(osm.stopServing) // Will break through stopServing on error in other places return errors.New(" Stopped reading events") } - -func (osm *OsipsSessionManager) DisconnectSession(ev engine.Event, cgrId, notify string) { - return -} func (osm *OsipsSessionManager) RemoveSession(uuid string) { return } @@ -200,19 +197,69 @@ func (osm *OsipsSessionManager) onOpensipsStart(cdrDagram *osipsdagram.OsipsEven go osm.SubscribeEvents(osm.evSubscribeStop) } +func (osm *OsipsSessionManager) onAccEvent(osipsDgram *osipsdagram.OsipsEvent) { + osipsEv, _ := NewOsipsEvent(osipsDgram) + if osipsEv.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request + return + } + if osipsDgram.AttrValues["method"] == "INVITE" { // Call start + if err := osm.callStart(osipsEv); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed processing CALL_START out of %+v, error: <%s>", osipsDgram, err.Error())) + } + } else if osipsDgram.AttrValues["method"] == "BYE" { + if err := osm.callEnd(osipsEv); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed processing CALL_END out of %+v, error: <%s>", osipsDgram, err.Error())) + } + } +} + func (osm *OsipsSessionManager) onCdr(cdrDagram *osipsdagram.OsipsEvent) { osipsEv, _ := NewOsipsEvent(cdrDagram) if err := osm.ProcessCdr(osipsEv.AsStoredCdr()); err != nil { engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", osipsEv.GetCgrId(), osipsEv.GetUUID(), err.Error())) } - -} - -func (osm *OsipsSessionManager) onDialogStart(osipsDgram *osipsdagram.OsipsEvent) { - engine.Logger.Debug(fmt.Sprintf("onDialogStart, event: %+v", osipsDgram)) } func (osm *OsipsSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error { var reply string return osm.cdrsrv.ProcessCdr(storedCdr, &reply) } + +func (osm *OsipsSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error { + sessionIds := ev.GetSessionIds() + if len(sessionIds) != 2 { + errMsg := fmt.Sprintf("Failed disconnecting session for event: %+v, notify: %s, dialogId: %v", ev, notify, sessionIds) + engine.Logger.Err(fmt.Sprintf(" " + errMsg)) + return errors.New(errMsg) + } + cmd := fmt.Sprintf(":dlg_end_dlg:\n%s\n%s\n\n", sessionIds[0], sessionIds[1]) + success := false + for attempts := 0; attempts < osm.cfg.Reconnects; attempts++ { + if reply, err := osm.miConn.SendCommand([]byte(cmd)); err == nil && bytes.HasPrefix(reply, []byte("200 OK")) { + success = true + break + } + time.Sleep(time.Duration((attempts+1)/2) * time.Second) // Allow OpenSIPS to recover from errors + continue // Try again + } + if !success { + errStr := fmt.Sprintf("Failed disconnecting session for event: %+v, notify: %s, dialogId: %v", ev, notify, sessionIds) + engine.Logger.Err(" " + errStr) + return errors.New(errStr) + } + return nil +} + +func (osm *OsipsSessionManager) callStart(osipsEv *OsipsEvent) error { + engine.Logger.Debug(fmt.Sprintf("callStart, event: %+v", osipsEv.osipsEvent)) + if osipsEv.MissingParameter() { + osm.DisconnectSession(osipsEv, "", utils.ERR_MANDATORY_IE_MISSING) + return errors.New(utils.ERR_MANDATORY_IE_MISSING) + } + return nil +} + +func (osm *OsipsSessionManager) callEnd(osipsEv *OsipsEvent) error { + engine.Logger.Debug(fmt.Sprintf("callEnd, event: %+v", osipsEv.osipsEvent)) + return nil +}