Partial session manager implementation for opensips

This commit is contained in:
DanB
2015-05-09 19:56:45 +02:00
parent 6b3c5430a3
commit df65bebd6a
5 changed files with 145 additions and 60 deletions

View File

@@ -86,12 +86,14 @@ modparam("dialog", "db_mode", 0)
#### ACCounting module
loadmodule "acc.so"
modparam("acc", "cdr_flag", "CDR")
#modparam("acc", "cdr_flag", "CDR")
modparam("acc", "evi_flag", "CDR")
modparam("acc", "evi_missed_flag", "CDR")
modparam("acc", "evi_extra", "cgr_reqtype=$avp(cgr_reqtype);
cgr_account=$avp(cgr_account);
cgr_subject=$avp(cgr_subject);
cgr_destination=$avp(cgr_destination)")
cgr_destination=$avp(cgr_destination);
cgr_supplier=$avp(cgr_supplier);
dialog_id=$DLG_did")
#### CfgUtils module
loadmodule "cfgutils.so"
@@ -119,26 +121,10 @@ startup_route {
raise_event("E_OPENSIPS_START");
}
event_route[E_DLG_STATE_CHANGED] {
fetch_event_params("new_state=$var(new_state)");
if $var(new_state) == 3 {
$avp(dialog_start) = "cgr_reqtype";
$avp(dialog_start) = $avp(cgr_reqtype);
$avp(dialog_start) = "callid";
$avp(dialog_start) = $ci;
$avp(dialog_start) = "from_tag";
$avp(dialog_start) = $ft;
$avp(dialog_start) = "cgr_account";
$avp(dialog_start) = $avp(cgr_account);
$avp(dialog_start) = "cgr_subject";
$avp(dialog_start) = $avp(cgr_subject);
$avp(dialog_start) = "cgr_destination";
$avp(dialog_start) = $avp(cgr_destination);
$avp(dialog_start) = "created";
$avp(dialog_start) = $Ts;
$avp(dialog_start) = "dialog_id";
$avp(dialog_start) = $DLG_did;
raise_event("E_CGR_DIALOG_START", $avp(auth_keys), $avp(auth_vals));
local_route {
if (is_method("BYE") ) {
#setflag(CDR);
acc_evi_request("LOCAL_DISCONNECT"); #FixMe
}
}
@@ -157,7 +143,7 @@ route{
# take the path determined by record-routing
if (loose_route()) {
if (is_method("BYE")) {
#setflag(CDR); # do accounting ...
setflag(CDR); # do accounting ...
} else if (is_method("INVITE")) {
# even if in most of the cases is useless, do RR for
# re-INVITEs alos, as some buggy clients do change route set
@@ -236,6 +222,10 @@ route{
send_reply("500","Internal Server Error");
exit;
}
$avp(cgr_reqtype)="*pseudoprepaid";
#$avp(cgr_account)=$fU;
$avp(cgr_destination)=$rU;
$avp(cgr_supplier)="supplier1";
setflag(CDR);
#route(CGR_AUTH_REQ);
}
@@ -274,10 +264,6 @@ route{
## Send AUTH request to CGRateS engine
route[CGR_AUTH_REQ] {
$avp(cgr_reqtype)="*pseudoprepaid";
$avp(cgr_account)=$fU;
$avp(cgr_destination)=$rU;
# Code to produce the json
$json(cgr_auth) := "{}";
$json(cgr_auth/id) = "1";
@@ -312,7 +298,6 @@ route[CGR_AUTH_REPLY] {
route[relay] {
# for INVITEs enable some additional helper routes
if (is_method("INVITE") && !has_totag()) {
t_on_reply("invite_reply");
t_on_failure("missed_call");
}
if (!t_relay()) {
@@ -330,12 +315,6 @@ route[location] {
}
}
onreply_route[invite_reply] {
xlog("incoming reply\n");
#acc_evi_request("DIALOG_START");
}
failure_route[missed_call] {
if (t_was_cancelled()) {
exit;

View File

@@ -140,7 +140,6 @@ func (self *KamailioSessionManager) DisconnectSession(ev engine.Event, connId, n
if err := self.conns[connId].Send(disconnectEv.String()); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed sending disconnect request, error %s, connection id: %s", err.Error(), connId))
}
return
}
func (self *KamailioSessionManager) RemoveSession(uuid string) {
for i, ss := range self.sessions {

View File

@@ -46,9 +46,11 @@ const (
TIME = "time"
SETUP_DURATION = "setuptime"
OSIPS_SETUP_TIME = "created"
OSIPS_EVENT_TIME = "time"
OSIPS_DURATION = "duration"
OSIPS_AUTH_OK = "AUTH_OK"
OSIPS_INSUFFICIENT_FUNDS = "INSUFFICIENT_FUNDS"
OSIPS_DIALOG_ID = "dialog_id"
)
func NewOsipsEvent(osipsDagramEvent *osipsdagram.OsipsEvent) (*OsipsEvent, error) {
@@ -81,8 +83,9 @@ func (osipsev *OsipsEvent) GetUUID() string {
return osipsev.osipsEvent.AttrValues[CALLID] + ";" + osipsev.osipsEvent.AttrValues[FROM_TAG] + ";" + osipsev.osipsEvent.AttrValues[TO_TAG]
}
// Returns the dialog identifier which opensips needs to disconnect a dialog
func (osipsev *OsipsEvent) GetSessionIds() []string {
return []string{osipsev.GetUUID()}
return strings.Split(osipsev.osipsEvent.AttrValues[OSIPS_DIALOG_ID], ":")
}
func (osipsev *OsipsEvent) GetDirection(fieldName string) string {
@@ -96,7 +99,7 @@ func (osipsev *OsipsEvent) GetSubject(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[CGR_SUBJECT])
return utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[CGR_SUBJECT], osipsev.GetAccount(fieldName))
}
func (osipsev *OsipsEvent) GetAccount(fieldName string) string {
@@ -137,11 +140,9 @@ func (osipsev *OsipsEvent) GetReqType(fieldName string) string {
return utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[CGR_REQTYPE], config.CgrConfig().DefaultReqType)
}
func (osipsev *OsipsEvent) GetSetupTime(fieldName string) (time.Time, error) {
sTimeStr := utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[OSIPS_SETUP_TIME])
sTimeStr := utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[OSIPS_SETUP_TIME], osipsev.osipsEvent.AttrValues[OSIPS_EVENT_TIME])
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
sTimeStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
} else if fieldName == utils.META_DEFAULT {
sTimeStr = osipsev.osipsEvent.AttrValues[OSIPS_SETUP_TIME]
}
return utils.ParseTimeDetectLayout(sTimeStr)
}
@@ -186,10 +187,13 @@ func (osipsEv *OsipsEvent) GetOriginatorIP(fieldName string) string {
return osipsEv.osipsEvent.OriginatorAddress.IP.String()
}
func (osipsev *OsipsEvent) MissingParameter() bool {
return len(osipsev.GetUUID()) == 0 ||
len(osipsev.GetAccount(utils.META_DEFAULT)) == 0 ||
len(osipsev.GetSubject(utils.META_DEFAULT)) == 0 ||
len(osipsev.GetDestination(utils.META_DEFAULT)) == 0
if osipsev.GetName() == "E_ACC_EVENT" && osipsev.osipsEvent.AttrValues["method"] == "INVITE" {
return len(osipsev.GetUUID()) == 0 ||
len(osipsev.GetAccount(utils.META_DEFAULT)) == 0 ||
len(osipsev.GetDestination(utils.META_DEFAULT)) == 0 ||
len(osipsev.osipsEvent.AttrValues[OSIPS_DIALOG_ID]) == 0
}
return true
}
func (osipsev *OsipsEvent) ParseEventValue(*utils.RSRField) string {
return ""
@@ -198,8 +202,8 @@ func (osipsev *OsipsEvent) PassesFieldFilter(*utils.RSRField) (bool, string) {
return false, ""
}
func (osipsev *OsipsEvent) GetExtraFields() map[string]string {
primaryFields := []string{"to_tag", "setuptime", "created", "method", "callid", "sip_reason", "time", "sip_code", "duration", "from_tag",
"cgr_tenant", "cgr_category", "cgr_reqtype", "cgr_account", "cgr_subject", "cgr_destination", utils.CGR_SUPPLIER}
primaryFields := []string{TO_TAG, SETUP_DURATION, OSIPS_SETUP_TIME, "method", "callid", "sip_reason", OSIPS_EVENT_TIME, "sip_code", "duration", "from_tag", "dialog_id",
CGR_TENANT, CGR_CATEGORY, CGR_REQTYPE, CGR_ACCOUNT, CGR_SUBJECT, CGR_DESTINATION, utils.CGR_SUPPLIER}
extraFields := make(map[string]string)
for field, val := range osipsev.osipsEvent.AttrValues {
if !utils.IsSliceMember(primaryFields, field) {
@@ -231,3 +235,14 @@ func (osipsEv *OsipsEvent) AsStoredCdr() *engine.StoredCdr {
storCdr.Cost = -1
return storCdr
}
// Computes duration out of setup time of the callEnd
func (osipsEv *OsipsEvent) updateDurationFromEvent(updatedOsipsEv *OsipsEvent) error {
endTime, err := updatedOsipsEv.GetSetupTime(TIME)
if err != nil {
return err
}
answerTime, err := osipsEv.GetAnswerTime(utils.META_DEFAULT)
osipsEv.osipsEvent.AttrValues[OSIPS_DURATION] = endTime.Sub(answerTime).String()
return nil
}

View File

@@ -142,3 +142,48 @@ func TestOsipsEventAsStoredCdr(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", eStoredCdr, storedCdr)
}
}
func TestOsipsAccMissedToStoredCdr(t *testing.T) {
setupTime, _ := utils.ParseTimeDetectLayout("1431182699")
osipsEv := &OsipsEvent{osipsEvent: &osipsdagram.OsipsEvent{Name: "E_ACC_MISSED_EVENT",
AttrValues: map[string]string{"method": "INVITE", "from_tag": "5cb81eaa", "to_tag": "", "callid": "27b1e6679ad0109b5d756e42bb4c9c28@0:0:0:0:0:0:0:0",
"sip_code": "404", "sip_reason": "Not Found", "time": "1431182699", "cgr_reqtype": utils.META_PSEUDOPREPAID,
"cgr_account": "1001", "cgr_destination": "1002", utils.CGR_SUPPLIER: "supplier1",
"duration": "", "dialog_id": "3547:277000822", "extra1": "val1", "extra2": "val2"}, OriginatorAddress: addr,
}}
eStoredCdr := &engine.StoredCdr{CgrId: utils.Sha1("27b1e6679ad0109b5d756e42bb4c9c28@0:0:0:0:0:0:0:0;5cb81eaa;", setupTime.UTC().String()),
TOR: utils.VOICE, AccId: "27b1e6679ad0109b5d756e42bb4c9c28@0:0:0:0:0:0:0:0;5cb81eaa;", CdrHost: "172.16.254.77", CdrSource: "OSIPS_E_ACC_MISSED_EVENT",
ReqType: utils.META_PSEUDOPREPAID, Direction: utils.OUT, Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Supplier: "supplier1",
Destination: "1002", SetupTime: setupTime, AnswerTime: setupTime,
Usage: time.Duration(0), ExtraFields: map[string]string{"extra1": "val1", "extra2": "val2"}, Cost: -1}
if storedCdr := osipsEv.AsStoredCdr(); !reflect.DeepEqual(eStoredCdr, storedCdr) {
t.Errorf("Expecting: %+v, received: %+v", eStoredCdr, storedCdr)
}
}
func TestOsipsUpdateDurationFromEvent(t *testing.T) {
osipsEv := &OsipsEvent{osipsEvent: &osipsdagram.OsipsEvent{Name: "E_ACC_EVENT",
AttrValues: map[string]string{"method": "INVITE", "from_tag": "87d02470", "to_tag": "a671a98", "callid": "05dac0aaa716c9814f855f0e8fee6936@0:0:0:0:0:0:0:0",
"sip_code": "200", "sip_reason": "OK", "time": "1430579770", "cgr_reqtype": utils.META_PREPAID,
"cgr_account": "1001", "cgr_destination": "1002", utils.CGR_SUPPLIER: "supplier1",
"duration": "", "dialog_id": "3547:277000822", "extra1": "val1", "extra2": "val2"}, OriginatorAddress: addr,
}}
updatedEv := &OsipsEvent{osipsEvent: &osipsdagram.OsipsEvent{Name: "E_ACC_EVENT",
AttrValues: map[string]string{"method": "BYE", "from_tag": "a671a98", "to_tag": "87d02470", "callid": "05dac0aaa716c9814f855f0e8fee6936@0:0:0:0:0:0:0:0",
"sip_code": "200", "sip_reason": "OK", "time": "1430579797", "cgr_reqtype": "",
"cgr_account": "", "cgr_destination": "", utils.CGR_SUPPLIER: "",
"duration": "", "dialog_id": "3547:277000822"}, OriginatorAddress: addr,
}}
eOsipsEv := &OsipsEvent{osipsEvent: &osipsdagram.OsipsEvent{Name: "E_ACC_EVENT",
AttrValues: map[string]string{"method": "INVITE", "from_tag": "87d02470", "to_tag": "a671a98", "callid": "05dac0aaa716c9814f855f0e8fee6936@0:0:0:0:0:0:0:0",
"sip_code": "200", "sip_reason": "OK", "time": "1430579770", "cgr_reqtype": utils.META_PREPAID,
"cgr_account": "1001", "cgr_destination": "1002", utils.CGR_SUPPLIER: "supplier1",
"duration": "27s", "dialog_id": "3547:277000822", "extra1": "val1", "extra2": "val2"}, OriginatorAddress: addr,
}}
if err := osipsEv.updateDurationFromEvent(updatedEv); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eOsipsEv, osipsEv) {
t.Errorf("Expecting: %+v, received: %+v", eOsipsEv.osipsEvent, osipsEv.osipsEvent)
}
}

View File

@@ -24,6 +24,7 @@ import (
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/osipsdagram"
"strings"
"time"
@@ -81,10 +82,10 @@ duration::
func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, rater, cdrsrv engine.Connector) (*OsipsSessionManager, error) {
osm := &OsipsSessionManager{cfg: smOsipsCfg, rater: rater, cdrsrv: cdrsrv}
osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){
"E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart},
"E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr},
"E_ACC_MISSED_EVENT": []func(*osipsdagram.OsipsEvent){osm.onCdr},
"E_CGR_DIALOG_START": []func(*osipsdagram.OsipsEvent){osm.onDialogStart},
"E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart}, // Raised when OpenSIPS starts so we can register our event handlers
"E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr}, // Raised if cdr_flag is configured
"E_ACC_MISSED_EVENT": []func(*osipsdagram.OsipsEvent){osm.onCdr}, // Raised if evi_missed_flag is configured
"E_ACC_EVENT": []func(*osipsdagram.OsipsEvent){osm.onAccEvent}, // Raised if evi_flag is configured and not cdr_flag containing start/stop events
}
return osm, nil
}
@@ -116,10 +117,6 @@ func (osm *OsipsSessionManager) Connect() (err error) {
evsrv.ServeEvents(osm.stopServing) // Will break through stopServing on error in other places
return errors.New("<SM-OpenSIPS> Stopped reading events")
}
func (osm *OsipsSessionManager) DisconnectSession(ev engine.Event, cgrId, notify string) {
return
}
func (osm *OsipsSessionManager) RemoveSession(uuid string) {
return
}
@@ -200,19 +197,69 @@ func (osm *OsipsSessionManager) onOpensipsStart(cdrDagram *osipsdagram.OsipsEven
go osm.SubscribeEvents(osm.evSubscribeStop)
}
func (osm *OsipsSessionManager) onAccEvent(osipsDgram *osipsdagram.OsipsEvent) {
osipsEv, _ := NewOsipsEvent(osipsDgram)
if osipsEv.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
return
}
if osipsDgram.AttrValues["method"] == "INVITE" { // Call start
if err := osm.callStart(osipsEv); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing CALL_START out of %+v, error: <%s>", osipsDgram, err.Error()))
}
} else if osipsDgram.AttrValues["method"] == "BYE" {
if err := osm.callEnd(osipsEv); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing CALL_END out of %+v, error: <%s>", osipsDgram, err.Error()))
}
}
}
func (osm *OsipsSessionManager) onCdr(cdrDagram *osipsdagram.OsipsEvent) {
osipsEv, _ := NewOsipsEvent(cdrDagram)
if err := osm.ProcessCdr(osipsEv.AsStoredCdr()); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", osipsEv.GetCgrId(), osipsEv.GetUUID(), err.Error()))
}
}
func (osm *OsipsSessionManager) onDialogStart(osipsDgram *osipsdagram.OsipsEvent) {
engine.Logger.Debug(fmt.Sprintf("onDialogStart, event: %+v", osipsDgram))
}
func (osm *OsipsSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error {
var reply string
return osm.cdrsrv.ProcessCdr(storedCdr, &reply)
}
func (osm *OsipsSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error {
sessionIds := ev.GetSessionIds()
if len(sessionIds) != 2 {
errMsg := fmt.Sprintf("Failed disconnecting session for event: %+v, notify: %s, dialogId: %v", ev, notify, sessionIds)
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> " + errMsg))
return errors.New(errMsg)
}
cmd := fmt.Sprintf(":dlg_end_dlg:\n%s\n%s\n\n", sessionIds[0], sessionIds[1])
success := false
for attempts := 0; attempts < osm.cfg.Reconnects; attempts++ {
if reply, err := osm.miConn.SendCommand([]byte(cmd)); err == nil && bytes.HasPrefix(reply, []byte("200 OK")) {
success = true
break
}
time.Sleep(time.Duration((attempts+1)/2) * time.Second) // Allow OpenSIPS to recover from errors
continue // Try again
}
if !success {
errStr := fmt.Sprintf("Failed disconnecting session for event: %+v, notify: %s, dialogId: %v", ev, notify, sessionIds)
engine.Logger.Err("<SM-OpenSIPS> " + errStr)
return errors.New(errStr)
}
return nil
}
func (osm *OsipsSessionManager) callStart(osipsEv *OsipsEvent) error {
engine.Logger.Debug(fmt.Sprintf("callStart, event: %+v", osipsEv.osipsEvent))
if osipsEv.MissingParameter() {
osm.DisconnectSession(osipsEv, "", utils.ERR_MANDATORY_IE_MISSING)
return errors.New(utils.ERR_MANDATORY_IE_MISSING)
}
return nil
}
func (osm *OsipsSessionManager) callEnd(osipsEv *OsipsEvent) error {
engine.Logger.Debug(fmt.Sprintf("callEnd, event: %+v", osipsEv.osipsEvent))
return nil
}