MaxUsageReq to be used in callsetup APIs, OpenSIPS-SM modifications for auth, opensips.cfg changes in tutorial, adding *now in ParseTimeDetectLayout function

This commit is contained in:
DanB
2015-05-07 11:30:02 +02:00
parent dbfa50d8ea
commit a826604eca
9 changed files with 183 additions and 558 deletions

View File

@@ -19,14 +19,48 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"fmt"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"strconv"
"time"
)
// Returns MaxSessionTime in seconds, -1 for no limit
func (self *ApierV1) GetMaxSessionTime(cdr engine.StoredCdr, maxSessionTime *float64) error {
func (self *ApierV1) GetMaxSessionTime(auth engine.MaxUsageReq, maxSessionTime *float64) error {
if auth.TOR == "" {
auth.TOR = utils.VOICE
}
if auth.ReqType == "" {
auth.ReqType = self.Config.DefaultReqType
}
if auth.Direction == "" {
auth.Direction = utils.OUT
}
if auth.Tenant == "" {
auth.Tenant = self.Config.DefaultTenant
}
if auth.Category == "" {
auth.Category = self.Config.DefaultCategory
}
if auth.Subject == "" {
auth.Subject = auth.Account
}
if auth.Subject == "" {
auth.Subject = auth.Account
}
if auth.SetupTime == "" {
auth.SetupTime = utils.META_NOW
}
if auth.Usage == "" {
auth.Usage = strconv.FormatFloat(self.Config.MaxCallDuration.Seconds(), 'f', -1, 64)
}
storedCdr, err := auth.AsStoredCdr()
if err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
var maxDur float64
if err := self.Responder.GetDerivedMaxSessionTime(cdr, &maxDur); err != nil {
if err := self.Responder.GetDerivedMaxSessionTime(*storedCdr, &maxDur); err != nil {
return err
}
if maxDur == -1.0 {

View File

@@ -1,362 +0,0 @@
#
# $Id$
#
# OpenSIPS residential configuration script
# by OpenSIPS Solutions <team@opensips-solutions.com>
#
# This script was generated via "make menuconfig", from
# the "Residential" scenario.
# You can enable / disable more features / functionalities by
# re-generating the scenario with different options.#
#
# Please refer to the Core CookBook at:
# http://www.opensips.org/Resources/DocsCookbooks
# for a explanation of possible statements, functions and parameters.
#
####### Global Parameters #########
debug=3
log_stderror=no
log_facility=LOG_LOCAL0
fork=yes
children=4
listen=udp:lo:5060
listen=udp:eth0:5060
auto_aliases=no
#disable_tcp=yes
#disable_tls=yes
####### Modules Section ########
#set module path
mpath="/usr/lib/opensips/modules"
#### SIGNALING module
loadmodule "signaling.so"
#### StateLess module
loadmodule "sl.so"
#### Transaction Module
loadmodule "tm.so"
modparam("tm", "fr_timeout", 5)
modparam("tm", "fr_inv_timeout", 30)
modparam("tm", "restart_fr_on_each_reply", 0)
modparam("tm", "onreply_avp_mode", 1)
#### Record Route Module
loadmodule "rr.so"
/* do not append from tag to the RR (no need for this script) */
modparam("rr", "append_fromtag", 0)
#### MAX ForWarD module
loadmodule "maxfwd.so"
#### SIP MSG OPerationS module
loadmodule "sipmsgops.so"
#### FIFO Management Interface
loadmodule "mi_fifo.so"
modparam("mi_fifo", "fifo_name", "/tmp/opensips_fifo")
modparam("mi_fifo", "fifo_mode", 0666)
loadmodule "mi_datagram.so"
modparam("mi_datagram", "socket_name", "udp:127.0.0.1:8020")
#### Eventdatagram module
loadmodule "event_datagram.so"
#### URI module
loadmodule "uri.so"
modparam("uri", "use_uri_table", 0)
#### USeR LOCation module
loadmodule "usrloc.so"
modparam("usrloc", "nat_bflag", "NAT")
modparam("usrloc", "db_mode", 0)
#### REGISTRAR module
loadmodule "registrar.so"
modparam("registrar", "tcp_persistent_flag", "TCP_PERSISTENT")
/* uncomment the next line not to allow more than 10 contacts per AOR */
#modparam("registrar", "max_contacts", 10)
#### DIALOG module
loadmodule "dialog.so"
modparam("dialog", "dlg_match_mode", 1)
modparam("dialog", "default_timeout", 21600) # 6 hours timeout
modparam("dialog", "db_mode", 0)
#### ACCounting module
loadmodule "acc.so"
/* what special events should be accounted ? */
modparam("acc", "early_media", 0)
modparam("acc", "report_cancels", 0)
modparam("acc", "cdr_flag", "CDR")
modparam("acc", "evi_flag", "CDR")
modparam("acc", "evi_missed_flag", "CDR")
modparam("acc", "evi_extra",
"cgr_reqtype=$avp(cgr_reqtype);
cgr_account=$avp(cgr_account);
cgr_subject=$avp(cgr_subject);
cgr_destination=$avp(cgr_destination);
originalUri=$ou")
#### CfgUtils module
loadmodule "cfgutils.so"
#### CacheDB Local
loadmodule "cachedb_local.so"
#### UDP protocol
loadmodule "proto_udp.so"
####### Routing Logic ########
startup_route {
subscribe_event("E_OPENSIPS_START", "udp:127.0.0.1:2020");
raise_event("E_OPENSIPS_START");
}
# main request routing logic
route{
if (!mf_process_maxfwd_header("10")) {
sl_send_reply("483","Too Many Hops");
exit;
}
if (has_totag()) {
# sequential request withing a dialog should
# take the path determined by record-routing
if (loose_route()) {
if (is_method("BYE")) {
#setflag(CDR); # do accounting ...
} else if (is_method("INVITE")) {
# even if in most of the cases is useless, do RR for
# re-INVITEs alos, as some buggy clients do change route set
# during the dialog.
record_route();
}
# route it out to whatever destination was set by loose_route()
# in $du (destination URI).
route(relay);
} else {
if ( is_method("ACK") ) {
if ( t_check_trans() ) {
# non loose-route, but stateful ACK; must be an ACK after
# a 487 or e.g. 404 from upstream server
t_relay();
exit;
} else {
# ACK without matching transaction ->
# ignore and discard
exit;
}
}
sl_send_reply("404","Not here");
}
exit;
}
# CANCEL processing
if (is_method("CANCEL"))
{
if (t_check_trans())
t_relay();
exit;
}
t_check_trans();
if ( !(is_method("REGISTER") ) ) {
if (from_uri==myself)
{
} else {
# if caller is not local, then called number must be local
if (!uri==myself) {
send_reply("403","Relay forbidden");
exit;
}
}
}
# preloaded route checking
if (loose_route()) {
xlog("L_ERR",
"Attempt to route with preloaded Route's [$fu/$tu/$ru/$ci]");
if (!is_method("ACK"))
sl_send_reply("403","Preload Route denied");
exit;
}
# record routing
if (!is_method("REGISTER|MESSAGE"))
record_route();
# account only INVITEs
if (is_method("INVITE")) {
# create dialog with timeout
if ( !create_dialog("B") ) {
send_reply("500","Internal Server Error");
exit;
}
setflag(CDR);
$avp(cgr_reqtype)="*pseudoprepaid";
$avp(cgr_account)=$fU;
$avp(cgr_subject)=$fU;
$avp(cgr_destination)=$rU;
if $avp(cgr_reqtype)=="*pseudoprepaid" || $avp(cgr_reqtype)=="*prepaid" { #Make sure we got enough balance for the call
$avp(auth_keys) = "cgr_reqtype";
$avp(auth_vals) = $avp(cgr_reqtype);
$avp(auth_keys) = "callid";
$avp(auth_vals) = $ci;
$avp(auth_keys) = "from_tag";
$avp(auth_vals) = $ft;
$avp(auth_keys) = "cgr_account";
$avp(auth_vals) = $avp(cgr_account);
$avp(auth_keys) = "cgr_subject";
$avp(auth_vals) = $avp(cgr_subject);
$avp(auth_keys) = "cgr_destination";
$avp(auth_vals) = $avp(cgr_destination);
$avp(auth_keys) = "created";
$avp(auth_vals) = $Ts;
raise_event("E_CGR_AUTHORIZE", $avp(auth_keys), $avp(auth_vals));
$var(accid) = $ci+";"+$ft+";";
$var(rply_cgr_notify) = $var(accid)+"/"+"cgr_notify"; #Key in localcache for cgr_notify
$var(rply_cgr_maxdur) = $var(accid)+"/"+"cgr_maxdur"; #Key in localcache for cgr_maxdur
$var(ms) = 0;
while($var(ms) < 1000) { # Check for values set every 10 ms for maximum 1 second
if cache_fetch("local", "$var(rply_cgr_notify)", $avp(cgr_notify) ) $var(ms) = 1000; # Break out
$var(ms) = $var(ms) + 10;
usleep("10");
}
if $avp(cgr_notify) == NULL { # Cannot check it in switch
xlog("Checked cgr_notify variable $var(rply_cgr_notify) got value: $avp(cgr_notify)");
sl_send_reply("503","Prepaid controller error");
exit;
}
switch ($avp(cgr_notify)) {
case "SERVER_ERROR":
sl_send_reply("503","Prepaid controller error");
exit;
case "INSUFFICENT_FUNDS":
sl_send_reply("403", "Payment required");
exit;
}
if !cache_fetch("local", "$var(rply_cgr_maxdur)", $avp(cgr_maxdur) ) {
sl_send_reply("503","Prepaid controller error on maxdur");
exit;
}
$DLG_timeout=$avp(cgr_maxdur);
}
}
if (!uri==myself) {
append_hf("P-hint: outbound\r\n");
route(relay);
}
# requests for my domain
if (is_method("PUBLISH|SUBSCRIBE"))
{
sl_send_reply("503", "Service Unavailable");
exit;
}
if (is_method("REGISTER"))
{
if ( 0 ) setflag(TCP_PERSISTENT);
if (!save("location"))
sl_reply_error();
exit;
}
if ($rU==NULL) {
# request with no Username in RURI
sl_send_reply("484","Address Incomplete");
exit;
}
# do lookup with method filtering
if (!lookup("location","m")) {
t_newtran();
t_reply("404", "Not Found");
exit;
}
route(relay);
}
route[relay] {
# for INVITEs enable some additional helper routes
if (is_method("INVITE")) {
#t_on_branch("per_branch_ops");
#t_on_reply("handle_nat");
t_on_failure("missed_call");
}
if (!t_relay()) {
send_reply("500","Internal Error");
};
exit;
}
failure_route[missed_call] {
if (t_was_cancelled()) {
exit;
}
# uncomment the following lines if you want to block client
# redirect based on 3xx replies.
##if (t_check_status("3[0-9][0-9]")) {
##t_reply("404","Not found");
## exit;
##}
}

View File

@@ -70,9 +70,6 @@
"listen_udp": "127.0.0.1:2020", // address where to listen for datagram events coming from OpenSIPS
"rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013>
"cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234>
"debit_interval": "5s", // interval to perform debits on.
"min_call_duration": "0s", // only authorize calls with allowed duration higher than this
"max_call_duration": "3h", // maximum call duration a prepaid call can last
"events_subscribe_interval": "60s", // automatic events subscription to OpenSIPS, 0 to disable it
"mi_addr": "127.0.0.1:8020", // address where to reach OpenSIPS MI to send session disconnects
},

View File

@@ -103,7 +103,7 @@ modparam("acc", "report_cancels", 0)
modparam("acc", "cdr_flag", "CDR")
modparam("acc", "evi_flag", "CDR")
modparam("acc", "evi_missed_flag", "CDR")
#modparam("acc", "acc_created_avp_name", "created_avp")
modparam("acc", "acc_created_avp_name", "created_avp")
modparam("acc", "evi_extra",
"cgr_reqtype=$avp(cgr_reqtype);
cgr_account=$avp(cgr_account);
@@ -120,6 +120,12 @@ loadmodule "cachedb_local.so"
#### UDP protocol
loadmodule "proto_udp.so"
#### Rest client
loadmodule "rest_client.so"
#### JSON parser
loadmodule "json.so"
####### Routing Logic ########
@@ -223,12 +229,7 @@ route{
exit;
}
setflag(CDR);
#acc_evi_request("DIALOG_START");
$avp(cgr_reqtype)="*pseudoprepaid";
$avp(cgr_account)=$fU;
$avp(cgr_subject)=$fU;
$avp(cgr_destination)=$rU;
route(CGR_AUTH);
route(CGR_AUTH_REQ);
}
if (!uri==myself) {
@@ -264,62 +265,48 @@ route{
t_newtran();
t_reply("404", "Not Found");
exit;
}
}
route(relay);
}
route[CGR_AUTH] {
if $avp(cgr_reqtype)=="*pseudoprepaid" || $avp(cgr_reqtype)=="*prepaid" { #Make sure we got enough balance for the call
$avp(auth_keys) = "cgr_reqtype";
$avp(auth_vals) = $avp(cgr_reqtype);
$avp(auth_keys) = "callid";
$avp(auth_vals) = $ci;
$avp(auth_keys) = "from_tag";
$avp(auth_vals) = $ft;
$avp(auth_keys) = "cgr_account";
$avp(auth_vals) = $avp(cgr_account);
$avp(auth_keys) = "cgr_subject";
$avp(auth_vals) = $avp(cgr_subject);
$avp(auth_keys) = "cgr_destination";
$avp(auth_vals) = $avp(cgr_destination);
$avp(auth_keys) = "created";
$avp(auth_vals) = $Ts;
raise_event("E_CGR_AUTHORIZE", $avp(auth_keys), $avp(auth_vals));
$var(accid) = $ci+";"+$ft+";";
$var(rply_cgr_notify) = $var(accid)+"/"+"cgr_notify"; #Key in localcache for cgr_notify
$var(rply_cgr_maxdur) = $var(accid)+"/"+"cgr_maxdur"; #Key in localcache for cgr_maxdur
$var(ms) = 0;
while($var(ms) < 1000) { # Check for values set every 10 ms for maximum 1 second
if cache_fetch("local", "$var(rply_cgr_notify)", $avp(cgr_notify) ) $var(ms) = 1000; # Break out
$var(ms) = $var(ms) + 10;
usleep("10");
}
if $avp(cgr_notify) == NULL { # Cannot check it in switch
xlog("Checked cgr_notify variable $var(rply_cgr_notify) got value: $avp(cgr_notify)");
sl_send_reply("503","Prepaid controller error");
exit;
}
switch ($avp(cgr_notify)) {
case "SERVER_ERROR":
sl_send_reply("503","Prepaid controller error");
exit;
case "INSUFFICENT_FUNDS":
sl_send_reply("403", "Payment required");
exit;
}
if !cache_fetch("local", "$var(rply_cgr_maxdur)", $avp(cgr_maxdur) ) {
sl_send_reply("503","Prepaid controller error on maxdur");
exit;
}
$DLG_timeout=$avp(cgr_maxdur);
}
## Send AUTH request to CGRateS engine
route[CGR_AUTH_REQ] {
$avp(cgr_reqtype)="*pseudoprepaid";
$avp(cgr_account)=$fU;
$avp(cgr_destination)=$rU;
# Code to produce the json
$json(cgr_auth) := "{}";
$json(cgr_auth/id) = "1";
$json(cgr_auth/method) = "ApierV1.GetMaxSessionTime";
$json(cgr_auth/params) := "[{}]";
$json(cgr_auth/params[0]/ReqType) = $avp(cgr_reqtype);
$json(cgr_auth/params[0]/Account) = $avp(cgr_account);
$json(cgr_auth/params[0]/Destination) = $avp(cgr_destination);
async(rest_post("http://127.0.0.1:2080/jsonrpc", "$json(cgr_auth)", "application/json", "$avp(cgr_auth_reply)", "$var(ct)", "$var(rcode)"), CGR_AUTH_REPLY);
}
## Process answer received from CGRateS engine
route[CGR_AUTH_REPLY] {
$json(cgr_auth_reply) := $avp(cgr_auth_reply);
xlog("In CGR_AUTH_REPLY, reply received: $json(cgr_auth_reply)");
if $json(cgr_auth_reply/error) != NULL {
xlog("Error received from CGRateS: $json(cgr_auth_reply/error)");
sl_send_reply("503","Charging controller error");
exit;
}
if $json(cgr_auth_reply/result) > -1 {
$DLG_timeout=$json(cgr_auth_reply/result);
}
route(relay);
}
route[relay] {
# for INVITEs enable some additional helper routes
if (is_method("INVITE")) {
if (is_method("INVITE") && !has_totag()) {
t_on_reply("invite_reply");
t_on_failure("missed_call");
}
if (!t_relay()) {
@@ -328,7 +315,10 @@ route[relay] {
exit;
}
onreply_route[invite_reply] {
xlog("incoming reply\n");
#acc_evi_request("DIALOG_START");
}
failure_route[missed_call] {

View File

@@ -567,3 +567,34 @@ type ExternalCdr struct {
CostDetails string
Rated bool // Mark the CDR as rated so we do not process it during mediation
}
// Used when authorizing requests from outside, eg ApierV1.GetMaxSessionTime
type MaxUsageReq struct {
TOR string
ReqType string
Direction string
Tenant string
Category string
Account string
Subject string
Destination string
SetupTime string
AnswerTime string
Usage string
}
func (self *MaxUsageReq) AsStoredCdr() (*StoredCdr, error) {
var err error
storedCdr := &StoredCdr{TOR: self.TOR, ReqType: self.ReqType, Direction: self.Direction, Tenant: self.Tenant, Category: self.Category,
Account: self.Account, Subject: self.Subject, Destination: self.Destination}
if storedCdr.SetupTime, err = utils.ParseTimeDetectLayout(self.SetupTime); err != nil {
return nil, err
}
if storedCdr.AnswerTime, err = utils.ParseTimeDetectLayout(self.AnswerTime); err != nil {
return nil, err
}
if storedCdr.Usage, err = utils.ParseDurationWithSecs(self.Usage); err != nil {
return nil, err
}
return storedCdr, nil
}

View File

@@ -512,3 +512,18 @@ func TestStoredCdrEventFields(t *testing.T) {
t.Error("Received: ", extraFlds)
}
}
func TestMaxUsageReqAsStoredCdr(t *testing.T) {
setupReq := &MaxUsageReq{TOR: utils.VOICE, ReqType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org", Category: "call",
Account: "1001", Subject: "1001", Destination: "1002",
SetupTime: "2013-11-07T08:42:20Z", AnswerTime: "2013-11-07T08:42:26Z", Usage: "0.00000001",
}
eStorCdr := &StoredCdr{TOR: utils.VOICE, ReqType: utils.META_RATED, Direction: "*out",
Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002",
SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), Usage: time.Duration(10)}
if storedCdr, err := setupReq.AsStoredCdr(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eStorCdr, storedCdr) {
t.Errorf("Expected: %+v, received: %+v", eStorCdr, storedCdr)
}
}

View File

@@ -22,13 +22,11 @@ import (
"bytes"
"errors"
"fmt"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/osipsdagram"
"strings"
"time"
)
/*
@@ -85,7 +83,6 @@ func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, rater, cdrsrv engi
osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){
"E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart},
"E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr},
"E_CGR_AUTHORIZE": []func(*osipsdagram.OsipsEvent){osm.onAuthorize},
"E_ACC_EVENT": []func(*osipsdagram.OsipsEvent){osm.onCdr},
"E_ACC_MISSED_EVENT": []func(*osipsdagram.OsipsEvent){osm.onCdr},
}
@@ -97,7 +94,7 @@ type OsipsSessionManager struct {
rater engine.Connector
cdrsrv engine.Connector
eventHandlers map[string][]func(*osipsdagram.OsipsEvent)
evSubscribeStop *chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it
evSubscribeStop chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it
stopServing chan struct{} // Stop serving datagrams
miConn *osipsdagram.OsipsMiDatagramConnector // Pool of connections used to various OpenSIPS servers, keep reference towards events received so we can issue commands always to the same remote
}
@@ -107,10 +104,9 @@ func (osm *OsipsSessionManager) Connect() (err error) {
if osm.miConn, err = osipsdagram.NewOsipsMiDatagramConnector(osm.cfg.MiAddr, osm.cfg.Reconnects); err != nil {
return fmt.Errorf("Cannot connect to OpenSIPS at %s, error: %s", osm.cfg.MiAddr, err.Error())
}
evSubscribeStop := make(chan struct{})
osm.evSubscribeStop = &evSubscribeStop
defer close(*osm.evSubscribeStop) // Stop subscribing on disconnect
go osm.SubscribeEvents(evSubscribeStop)
osm.evSubscribeStop = make(chan struct{})
defer func() { osm.evSubscribeStop <- struct{}{} }() // Stop subscribing on disconnect
go osm.SubscribeEvents(osm.evSubscribeStop)
evsrv, err := osipsdagram.NewEventServer(osm.cfg.ListenUdp, osm.eventHandlers)
if err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Cannot initialize datagram server, error: <%s>", err.Error()))
@@ -147,54 +143,61 @@ func (osm *OsipsSessionManager) Shutdown() error {
return nil
}
// Event Handlers
// Automatic subscribe to OpenSIPS for events, trigered on Connect or OpenSIPS restart
func (osm *OsipsSessionManager) SubscribeEvents(evStop chan struct{}) error {
if err := osm.subscribeEvents(); err != nil { // Init subscribe
return err
}
for {
select {
case <-evStop: // Break this loop from outside
return nil
default:
subscribeInterval := osm.cfg.EventsSubscribeInterval + time.Duration(1)*time.Second // Avoid concurrency on expiry
listenAddrSplt := strings.Split(osm.cfg.ListenUdp, ":")
portListen := listenAddrSplt[1]
addrListen := listenAddrSplt[0]
if len(addrListen) == 0 { //Listen on all addresses, try finding out from mi connection
if localAddr := osm.miConn.LocallAddr(); localAddr != nil {
addrListen = strings.Split(localAddr.String(), ":")[0]
}
case <-time.After(osm.cfg.EventsSubscribeInterval): // Subscribe on interval
if err := osm.subscribeEvents(); err != nil {
return err
}
for eventName := range osm.eventHandlers {
if eventName == "E_OPENSIPS_START" { // Do not subscribe for start since this should be hardcoded
continue
}
cmd := fmt.Sprintf(":event_subscribe:\n%s\nudp:%s:%s\n%d\n", eventName, addrListen, portListen, int(subscribeInterval.Seconds()))
success := false
for attempts := 0; attempts < osm.cfg.Reconnects; attempts++ {
if reply, err := osm.miConn.SendCommand([]byte(cmd)); err == nil && bytes.HasPrefix(reply, []byte("200 OK")) {
success = true
break
}
time.Sleep(time.Duration((attempts+1)/2) * time.Second) // Allow OpenSIPS to recover from errors
continue // Try again
}
if !success {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Shutting down, failed subscribing to OpenSIPS at address: <%s>", osm.cfg.MiAddr))
close(osm.stopServing) // Do not serve anymore since we got errors on subscribing
return errors.New("Failed subscribing to OpenSIPS events")
}
}
time.Sleep(osm.cfg.EventsSubscribeInterval)
}
}
}
// One subscribe attempt to OpenSIPS
func (osm *OsipsSessionManager) subscribeEvents() error {
subscribeInterval := osm.cfg.EventsSubscribeInterval + time.Duration(1)*time.Second // Avoid concurrency on expiry
listenAddrSplt := strings.Split(osm.cfg.ListenUdp, ":")
portListen := listenAddrSplt[1]
addrListen := listenAddrSplt[0]
if len(addrListen) == 0 { //Listen on all addresses, try finding out from mi connection
if localAddr := osm.miConn.LocallAddr(); localAddr != nil {
addrListen = strings.Split(localAddr.String(), ":")[0]
}
}
for eventName := range osm.eventHandlers {
if eventName == "E_OPENSIPS_START" { // Do not subscribe for start since this should be hardcoded
continue
}
cmd := fmt.Sprintf(":event_subscribe:\n%s\nudp:%s:%s\n%d\n", eventName, addrListen, portListen, int(subscribeInterval.Seconds()))
success := false
for attempts := 0; attempts < osm.cfg.Reconnects; attempts++ {
if reply, err := osm.miConn.SendCommand([]byte(cmd)); err == nil && bytes.HasPrefix(reply, []byte("200 OK")) {
success = true
break
}
time.Sleep(time.Duration((attempts+1)/2) * time.Second) // Allow OpenSIPS to recover from errors
continue // Try again
}
if !success {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Shutting down, failed subscribing to OpenSIPS at address: <%s>", osm.cfg.MiAddr))
close(osm.stopServing) // Do not serve anymore since we got errors on subscribing
return errors.New("Failed subscribing to OpenSIPS events")
}
}
return nil
}
func (osm *OsipsSessionManager) onOpensipsStart(cdrDagram *osipsdagram.OsipsEvent) {
close(*osm.evSubscribeStop) // Cancel previous subscribes
evStop := make(chan struct{})
osm.evSubscribeStop = &evStop
go osm.SubscribeEvents(evStop)
osm.evSubscribeStop <- struct{}{} // Cancel previous subscribes
osm.evSubscribeStop = make(chan struct{}) // Create a fresh communication channel
go osm.SubscribeEvents(osm.evSubscribeStop)
}
func (osm *OsipsSessionManager) onCdr(cdrDagram *osipsdagram.OsipsEvent) {
@@ -209,93 +212,3 @@ func (osm *OsipsSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error {
var reply string
return osm.cdrsrv.ProcessCdr(storedCdr, &reply)
}
// Process Authorize request from OpenSIPS and communicate back maxdur
func (osm *OsipsSessionManager) onAuthorize(osipsDagram *osipsdagram.OsipsEvent) {
ev, _ := NewOsipsEvent(osipsDagram)
if ev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
return
}
if ev.MissingParameter() {
cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_MANDATORY_IE_MISSING)
if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
}
return
}
var maxCallDuration time.Duration // This will be the maximum duration this channel will be allowed to last
var durInitialized bool
attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT),
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := osm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> onAuthorize: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_SERVER_ERROR)
if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
}
return
}
dcs, _ = dcs.AppendDefaultRun()
for _, dc := range dcs {
runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP)
matchingAllFilters := true
for _, dcRunFilter := range runFilters {
if fltrPass, _ := ev.PassesFieldFilter(dcRunFilter); !fltrPass {
matchingAllFilters = false
break
}
}
if !matchingAllFilters { // Do not process the derived charger further if not all filters were matched
continue
}
startTime, err := ev.GetSetupTime(utils.META_DEFAULT)
if err != nil {
engine.Logger.Err("Error parsing answer event start time, using time.Now!")
startTime = time.Now()
}
cd := engine.CallDescriptor{
Direction: ev.GetDirection(dc.DirectionField),
Tenant: ev.GetTenant(dc.TenantField),
Category: ev.GetCategory(dc.CategoryField),
Subject: ev.GetSubject(dc.SubjectField),
Account: ev.GetAccount(dc.AccountField),
Destination: ev.GetDestination(dc.DestinationField),
TimeStart: startTime,
TimeEnd: startTime.Add(osm.cfg.MaxCallDuration),
}
var remainingDurationFloat float64
err = osm.rater.GetMaxSessionTime(cd, &remainingDurationFloat)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err))
cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_SERVER_ERROR)
if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
}
return
}
remainingDuration := time.Duration(remainingDurationFloat)
// Set maxCallDuration, smallest out of all forked sessions
if !durInitialized { // first time we set it /not initialized yet
maxCallDuration = remainingDuration
durInitialized = true
} else if maxCallDuration > remainingDuration {
maxCallDuration = remainingDuration
}
}
if maxCallDuration <= osm.cfg.MinCallDuration {
cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), OSIPS_INSUFFICIENT_FUNDS)
if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
}
return
}
cmdMaxDur := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_maxdur\n%d\n\n", ev.GetUUID(), int(maxCallDuration.Seconds()))
if reply, err := osm.miConn.SendCommand([]byte(cmdMaxDur)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
engine.Logger.Err(fmt.Sprintf("Failed setting cgr_maxdur variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
}
cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n", ev.GetUUID(), OSIPS_AUTH_OK)
if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
}
}

View File

@@ -162,6 +162,8 @@ func ParseTimeDetectLayout(tmStr string) (time.Time, error) {
return time.Parse("20060102150405", tmStr)
case oneSpaceTimestampRule.MatchString(tmStr):
return time.Parse("02.01.2006 15:04:05", tmStr)
case tmStr == "*now":
return time.Now(), nil
}
return nilTime, errors.New("Unsupported time format")
}

View File

@@ -219,6 +219,11 @@ func TestParseTimeDetectLayout(t *testing.T) {
} else if !tsTm.Equal(expectedTime) {
t.Errorf("Unexpected time parsed: %v, expecting: %v", tsTm, expectedTime)
}
if nowTm, err := ParseTimeDetectLayout(META_NOW); err != nil {
t.Error(err)
} else if time.Now().Sub(nowTm) > time.Duration(1)*time.Millisecond {
t.Errorf("Unexpected time parsed: %v", nowTm)
}
}
func TestParseDateUnix(t *testing.T) {