Merge branch 'master' into stats

This commit is contained in:
Radu Ioan Fericean
2014-07-30 18:42:54 +03:00
5 changed files with 479 additions and 20 deletions

View File

@@ -141,14 +141,13 @@ func startCdrc(cdrsChan chan struct{}, cdrsAddress, cdrType, cdrInDir, cdrOutDir
}
func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) {
var connector engine.Connector
var raterConn engine.Connector
var client *rpcclient.RpcClient
if cfg.SMRater == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
connector = responder
raterConn = responder
} else {
var client *rpcclient.RpcClient
var err error
for i := 0; i < cfg.SMReconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SMRater, 0, cfg.SMReconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
@@ -160,14 +159,34 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
engine.Logger.Crit(fmt.Sprintf("<SessionManager> Could not connect to engine: %v", err))
exitChan <- true
}
connector = &engine.RPCClientConnector{Client: client}
raterConn = &engine.RPCClientConnector{Client: client}
}
switch cfg.SMSwitchType {
case FS:
dp, _ := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval))
sm = sessionmanager.NewFSSessionManager(cfg, loggerDb, connector, dp)
sm = sessionmanager.NewFSSessionManager(cfg, loggerDb, raterConn, dp)
case OSIPS:
sm, _ = sessionmanager.NewOSipsSessionManager(cfg, connector)
var cdrsConn engine.Connector
if cfg.OsipCDRS == cfg.SMRater {
cdrsConn = raterConn
} else if cfg.OsipCDRS == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = responder
} else {
for i := 0; i < cfg.OsipsReconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.OsipCDRS, 0, cfg.SMReconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i+1) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
sm, _ = sessionmanager.NewOSipsSessionManager(cfg, raterConn, cdrsConn)
default:
engine.Logger.Err(fmt.Sprintf("<SessionManager> Unsupported session manger type: %s!", cfg.SMSwitchType))
}

View File

@@ -0,0 +1,359 @@
#
# $Id$
#
# OpenSIPS residential configuration script
# by OpenSIPS Solutions <team@opensips-solutions.com>
#
# This script was generated via "make menuconfig", from
# the "Residential" scenario.
# You can enable / disable more features / functionalities by
# re-generating the scenario with different options.#
#
# Please refer to the Core CookBook at:
# http://www.opensips.org/Resources/DocsCookbooks
# for a explanation of possible statements, functions and parameters.
#
####### Global Parameters #########
debug=3
log_stderror=no
log_facility=LOG_LOCAL0
fork=yes
children=4
auto_aliases=no
disable_tcp=yes
disable_tls=yes
####### Modules Section ########
#set module path
mpath="/usr/lib/opensips/modules"
#### SIGNALING module
loadmodule "signaling.so"
#### StateLess module
loadmodule "sl.so"
#### Transaction Module
loadmodule "tm.so"
modparam("tm", "fr_timer", 5)
modparam("tm", "fr_inv_timer", 30)
modparam("tm", "restart_fr_on_each_reply", 0)
modparam("tm", "onreply_avp_mode", 1)
#### Record Route Module
loadmodule "rr.so"
/* do not append from tag to the RR (no need for this script) */
modparam("rr", "append_fromtag", 0)
#### MAX ForWarD module
loadmodule "maxfwd.so"
#### SIP MSG OPerationS module
loadmodule "sipmsgops.so"
#### FIFO Management Interface
loadmodule "mi_fifo.so"
modparam("mi_fifo", "fifo_name", "/tmp/opensips_fifo")
modparam("mi_fifo", "fifo_mode", 0666)
loadmodule "mi_datagram.so"
modparam("mi_datagram", "socket_name", "udp:127.0.0.1:8020")
#### Eventdatagram module
loadmodule "event_datagram.so"
#### URI module
loadmodule "uri.so"
modparam("uri", "use_uri_table", 0)
#### USeR LOCation module
loadmodule "usrloc.so"
modparam("usrloc", "nat_bflag", "NAT")
modparam("usrloc", "db_mode", 0)
#### REGISTRAR module
loadmodule "registrar.so"
modparam("registrar", "tcp_persistent_flag", "TCP_PERSISTENT")
/* uncomment the next line not to allow more than 10 contacts per AOR */
#modparam("registrar", "max_contacts", 10)
#### DIALOG module
loadmodule "dialog.so"
modparam("dialog", "dlg_match_mode", 1)
modparam("dialog", "default_timeout", 21600) # 6 hours timeout
modparam("dialog", "db_mode", 0)
#### ACCounting module
loadmodule "acc.so"
/* what special events should be accounted ? */
modparam("acc", "early_media", 0)
modparam("acc", "report_cancels", 0)
modparam("acc", "cdr_flag", "CDR")
modparam("acc", "evi_flag", "CDR")
modparam("acc", "evi_missed_flag", "CDR")
modparam("acc", "evi_extra",
"cgr_reqtype=$avp(cgr_reqtype);
cgr_account=$avp(cgr_account);
cgr_subject=$avp(cgr_subject);
cgr_destination=$avp(cgr_destination);
originalUri=$ou")
#### CfgUtils module
loadmodule "cfgutils.so"
#### CacheDB Local
loadmodule "cachedb_local.so"
####### Routing Logic ########
startup_route {
subscribe_event("E_OPENSIPS_START", "udp:127.0.0.1:2020");
raise_event("E_OPENSIPS_START");
}
# main request routing logic
route{
if (!mf_process_maxfwd_header("10")) {
sl_send_reply("483","Too Many Hops");
exit;
}
if (has_totag()) {
# sequential request withing a dialog should
# take the path determined by record-routing
if (loose_route()) {
if (is_method("BYE")) {
#setflag(CDR); # do accounting ...
} else if (is_method("INVITE")) {
# even if in most of the cases is useless, do RR for
# re-INVITEs alos, as some buggy clients do change route set
# during the dialog.
record_route();
}
# route it out to whatever destination was set by loose_route()
# in $du (destination URI).
route(relay);
} else {
if ( is_method("ACK") ) {
if ( t_check_trans() ) {
# non loose-route, but stateful ACK; must be an ACK after
# a 487 or e.g. 404 from upstream server
t_relay();
exit;
} else {
# ACK without matching transaction ->
# ignore and discard
exit;
}
}
sl_send_reply("404","Not here");
}
exit;
}
# CANCEL processing
if (is_method("CANCEL"))
{
if (t_check_trans())
t_relay();
exit;
}
t_check_trans();
if ( !(is_method("REGISTER") ) ) {
if (from_uri==myself)
{
} else {
# if caller is not local, then called number must be local
if (!uri==myself) {
send_reply("403","Rely forbidden");
exit;
}
}
}
# preloaded route checking
if (loose_route()) {
xlog("L_ERR",
"Attempt to route with preloaded Route's [$fu/$tu/$ru/$ci]");
if (!is_method("ACK"))
sl_send_reply("403","Preload Route denied");
exit;
}
# record routing
if (!is_method("REGISTER|MESSAGE"))
record_route();
# account only INVITEs
if (is_method("INVITE")) {
# create dialog with timeout
if ( !create_dialog("B") ) {
send_reply("500","Internal Server Error");
exit;
}
setflag(CDR);
$avp(cgr_reqtype)="pseudoprepaid";
$avp(cgr_account)=$fU;
$avp(cgr_subject)=$fU;
$avp(cgr_destination)=$rU;
if $avp(cgr_reqtype)=="pseudoprepaid" || $avp(cgr_reqtype)=="prepaid" { #Make sure we got enough balance for the call
$avp(auth_keys) = "cgr_reqtype";
$avp(auth_vals) = $avp(cgr_reqtype);
$avp(auth_keys) = "callid";
$avp(auth_vals) = $ci;
$avp(auth_keys) = "from_tag";
$avp(auth_vals) = $ft;
$avp(auth_keys) = "cgr_account";
$avp(auth_vals) = $avp(cgr_account);
$avp(auth_keys) = "cgr_subject";
$avp(auth_vals) = $avp(cgr_subject);
$avp(auth_keys) = "cgr_destination";
$avp(auth_vals) = $avp(cgr_destination);
$avp(auth_keys) = "created";
$avp(auth_vals) = $Ts;
raise_event("E_CGR_AUTHORIZE", $avp(auth_keys), $avp(auth_vals));
$var(accid) = $ci+";"+$ft+";";
$var(rply_cgr_notify) = $var(accid)+"/"+"cgr_notify"; #Key in localcache for cgr_notify
$var(rply_cgr_maxdur) = $var(accid)+"/"+"cgr_maxdur"; #Key in localcache for cgr_maxdur
$var(ms) = 0;
while($var(ms) < 1000) { # Check for values set every 2 ms for maximum 1 second
if cache_fetch("local", "$var(rply_cgr_notify)", $avp(cgr_notify) ) $var(ms) = 1000; # Break out
xlog("Check ms: $var(ms), cgr_notify: $avp(cgr_notify)");
$var(ms) = $var(ms) + 10;
usleep("10");
}
if $avp(cgr_notify) == NULL { # Cannot check it in switch
xlog("Checked cgr_notify variable $var(rply_cgr_notify) got value: $avp(cgr_notify)");
sl_send_reply("503","Prepaid controller error");
exit;
}
switch ($avp(cgr_notify)) {
case "SERVER_ERROR":
sl_send_reply("503","Prepaid controller error");
exit;
case "INSUFFICENT_FUNDS":
sl_send_reply("403", "Payment required");
exit;
}
if !cache_fetch("local", "$var(rply_cgr_maxdur)", $avp(cgr_maxdur) ) {
sl_send_reply("503","Prepaid controller error on maxdur");
exit;
}
$DLG_timeout=$avp(cgr_maxdur);
}
}
if (!uri==myself) {
append_hf("P-hint: outbound\r\n");
route(relay);
}
# requests for my domain
if (is_method("PUBLISH|SUBSCRIBE"))
{
sl_send_reply("503", "Service Unavailable");
exit;
}
if (is_method("REGISTER"))
{
if ( 0 ) setflag(TCP_PERSISTENT);
if (!save("location"))
sl_reply_error();
exit;
}
if ($rU==NULL) {
# request with no Username in RURI
sl_send_reply("484","Address Incomplete");
exit;
}
# do lookup with method filtering
if (!lookup("location","m")) {
t_newtran();
t_reply("404", "Not Found");
exit;
}
route(relay);
}
route[relay] {
# for INVITEs enable some additional helper routes
if (is_method("INVITE")) {
#t_on_branch("per_branch_ops");
#t_on_reply("handle_nat");
t_on_failure("missed_call");
}
if (!t_relay()) {
send_reply("500","Internal Error");
};
exit;
}
failure_route[missed_call] {
if (t_was_cancelled()) {
exit;
}
# uncomment the following lines if you want to block client
# redirect based on 3xx replies.
##if (t_check_status("3[0-9][0-9]")) {
##t_reply("404","Not found");
## exit;
##}
}

View File

@@ -187,10 +187,6 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) {
engine.Logger.Err("Error parsing answer event start time, using time.Now!")
startTime = time.Now()
}
// if there is no account configured leave the call alone
if dc.RunId == utils.DEFAULT_RUNID && !utils.IsSliceMember([]string{utils.PREPAID, utils.PSEUDOPREPAID}, ev.GetReqType(utils.META_DEFAULT)) {
return // we unpark only prepaid and pseudoprepaid calls
}
if ev.MissingParameter() {
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(dc.DestinationField), MISSING_PARAMETER)
engine.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID()))

View File

@@ -44,6 +44,7 @@ const (
SETUP_DURATION = "setuptime"
OSIPS__SETUP_TIME = "created"
OSIPS_DURATION = "duration"
OSIPS_AUTH_OK = "AUTH_OK"
)
func NewOsipsEvent(osipsDagramEvent *osipsdagram.OsipsEvent) (*OsipsEvent, error) {
@@ -160,16 +161,10 @@ func (osipsev *OsipsEvent) GetDuration(fieldName string) (time.Duration, error)
return utils.ParseDurationWithSecs(durStr)
}
func (osipsev *OsipsEvent) MissingParameter() bool {
var nilTime time.Time
var nilDur time.Duration
aTime, _ := osipsev.GetAnswerTime(utils.META_DEFAULT)
dur, _ := osipsev.GetDuration(utils.META_DEFAULT)
return len(osipsev.GetUUID()) == 0 ||
len(osipsev.GetAccount(utils.META_DEFAULT)) == 0 ||
len(osipsev.GetSubject(utils.META_DEFAULT)) == 0 ||
len(osipsev.GetDestination(utils.META_DEFAULT)) == 0 ||
aTime == nilTime ||
dur == nilDur
len(osipsev.GetDestination(utils.META_DEFAULT)) == 0
}
func (osipsev *OsipsEvent) ParseEventValue(*utils.RSRField) string {
return ""

View File

@@ -24,22 +24,25 @@ import (
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/osipsdagram"
"strings"
"time"
)
func NewOSipsSessionManager(cfg *config.CGRConfig, cdrsrv engine.Connector) (*OsipsSessionManager, error) {
osm := &OsipsSessionManager{cgrCfg: cfg, cdrsrv: cdrsrv}
func NewOSipsSessionManager(cfg *config.CGRConfig, rater, cdrsrv engine.Connector) (*OsipsSessionManager, error) {
osm := &OsipsSessionManager{cgrCfg: cfg, rater: rater, cdrsrv: cdrsrv}
osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){
"E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.OnOpensipsStart},
"E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.OnCdr},
"E_CGR_AUTHORIZE": []func(*osipsdagram.OsipsEvent){osm.OnAuthorize},
}
return osm, nil
}
type OsipsSessionManager struct {
cgrCfg *config.CGRConfig
rater engine.Connector
cdrsrv engine.Connector
eventHandlers map[string][]func(*osipsdagram.OsipsEvent)
evSubscribeStop *chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it
@@ -144,3 +147,90 @@ func (osm *OsipsSessionManager) OnCdr(cdrDagram *osipsdagram.OsipsEvent) {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CgrId, storedCdr.AccId, err.Error()))
}
}
// Process Authorize request from OpenSIPS and communicate back maxdur
func (osm *OsipsSessionManager) OnAuthorize(osipsDagram *osipsdagram.OsipsEvent) {
ev, _ := NewOsipsEvent(osipsDagram)
if ev.MissingParameter() {
cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_MANDATORY_IE_MISSING)
if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
}
return
}
var maxCallDuration time.Duration // This will be the maximum duration this channel will be allowed to last
var durInitialized bool
attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT),
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := osm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> OnAuthorize: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_SERVER_ERROR)
if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
}
return
}
dcs, _ = dcs.AppendDefaultRun()
for _, dc := range dcs {
runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP)
matchingAllFilters := true
for _, dcRunFilter := range runFilters {
if fltrPass, _ := ev.PassesFieldFilter(dcRunFilter); !fltrPass {
matchingAllFilters = false
break
}
}
if !matchingAllFilters { // Do not process the derived charger further if not all filters were matched
continue
}
startTime, err := ev.GetSetupTime(utils.META_DEFAULT)
if err != nil {
engine.Logger.Err("Error parsing answer event start time, using time.Now!")
startTime = time.Now()
}
cd := engine.CallDescriptor{
Direction: ev.GetDirection(dc.DirectionField),
Tenant: ev.GetTenant(dc.TenantField),
Category: ev.GetCategory(dc.CategoryField),
Subject: ev.GetSubject(dc.SubjectField),
Account: ev.GetAccount(dc.AccountField),
Destination: ev.GetDestination(dc.DestinationField),
TimeStart: startTime,
TimeEnd: startTime.Add(osm.cgrCfg.SMMaxCallDuration),
}
var remainingDurationFloat float64
err = osm.rater.GetMaxSessionTime(cd, &remainingDurationFloat)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err))
cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_SERVER_ERROR)
if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
}
return
}
remainingDuration := time.Duration(remainingDurationFloat)
// Set maxCallDuration, smallest out of all forked sessions
if !durInitialized { // first time we set it /not initialized yet
maxCallDuration = remainingDuration
durInitialized = true
} else if maxCallDuration > remainingDuration {
maxCallDuration = remainingDuration
}
}
if maxCallDuration <= osm.cgrCfg.SMMinCallDuration {
cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), INSUFFICIENT_FUNDS)
if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
}
return
}
cmdMaxDur := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_maxdur\n%d\n\n", ev.GetUUID(), int(maxCallDuration.Seconds()))
if reply, err := osm.miConn.SendCommand([]byte(cmdMaxDur)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
engine.Logger.Err(fmt.Sprintf("Failed setting cgr_maxdur variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
}
cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n", ev.GetUUID(), OSIPS_AUTH_OK)
if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
}
}