diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 9995826c7..673135689 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -140,14 +140,13 @@ func startCdrc(cdrsChan chan struct{}, cdrsAddress, cdrType, cdrInDir, cdrOutDir } func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) { - var connector engine.Connector + var raterConn engine.Connector + var client *rpcclient.RpcClient if cfg.SMRater == utils.INTERNAL { <-cacheChan // Wait for the cache to init before start doing queries - connector = responder + raterConn = responder } else { - var client *rpcclient.RpcClient var err error - for i := 0; i < cfg.SMReconnects; i++ { client, err = rpcclient.NewRpcClient("tcp", cfg.SMRater, 0, cfg.SMReconnects, utils.GOB) if err == nil { //Connected so no need to reiterate @@ -159,14 +158,34 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage engine.Logger.Crit(fmt.Sprintf(" Could not connect to engine: %v", err)) exitChan <- true } - connector = &engine.RPCClientConnector{Client: client} + raterConn = &engine.RPCClientConnector{Client: client} } switch cfg.SMSwitchType { case FS: dp, _ := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval)) - sm = sessionmanager.NewFSSessionManager(cfg, loggerDb, connector, dp) + sm = sessionmanager.NewFSSessionManager(cfg, loggerDb, raterConn, dp) case OSIPS: - sm, _ = sessionmanager.NewOSipsSessionManager(cfg, connector) + var cdrsConn engine.Connector + if cfg.OsipCDRS == cfg.SMRater { + cdrsConn = raterConn + } else if cfg.OsipCDRS == utils.INTERNAL { + <-cacheChan // Wait for the cache to init before start doing queries + cdrsConn = responder + } else { + for i := 0; i < cfg.OsipsReconnects; i++ { + client, err = rpcclient.NewRpcClient("tcp", cfg.OsipCDRS, 0, cfg.SMReconnects, utils.GOB) + if err == nil { //Connected so no need to reiterate + break + } + time.Sleep(time.Duration(i+1) * time.Second) + } + if err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) + exitChan <- true + } + cdrsConn = &engine.RPCClientConnector{Client: client} + } + sm, _ = sessionmanager.NewOSipsSessionManager(cfg, raterConn, cdrsConn) default: engine.Logger.Err(fmt.Sprintf(" Unsupported session manger type: %s!", cfg.SMSwitchType)) } diff --git a/data/opensips/etc/opensips/opensips.cfg b/data/opensips/etc/opensips/opensips.cfg new file mode 100644 index 000000000..8f02158de --- /dev/null +++ b/data/opensips/etc/opensips/opensips.cfg @@ -0,0 +1,359 @@ + +# +# $Id$ +# +# OpenSIPS residential configuration script +# by OpenSIPS Solutions +# +# This script was generated via "make menuconfig", from +# the "Residential" scenario. +# You can enable / disable more features / functionalities by +# re-generating the scenario with different options.# +# +# Please refer to the Core CookBook at: +# http://www.opensips.org/Resources/DocsCookbooks +# for a explanation of possible statements, functions and parameters. +# + + +####### Global Parameters ######### + +debug=3 +log_stderror=no +log_facility=LOG_LOCAL0 + +fork=yes +children=4 + +auto_aliases=no + +disable_tcp=yes + +disable_tls=yes + + +####### Modules Section ######## + +#set module path +mpath="/usr/lib/opensips/modules" + +#### SIGNALING module +loadmodule "signaling.so" + +#### StateLess module +loadmodule "sl.so" + +#### Transaction Module +loadmodule "tm.so" +modparam("tm", "fr_timer", 5) +modparam("tm", "fr_inv_timer", 30) +modparam("tm", "restart_fr_on_each_reply", 0) +modparam("tm", "onreply_avp_mode", 1) + +#### Record Route Module +loadmodule "rr.so" +/* do not append from tag to the RR (no need for this script) */ +modparam("rr", "append_fromtag", 0) + +#### MAX ForWarD module +loadmodule "maxfwd.so" + +#### SIP MSG OPerationS module +loadmodule "sipmsgops.so" + +#### FIFO Management Interface +loadmodule "mi_fifo.so" +modparam("mi_fifo", "fifo_name", "/tmp/opensips_fifo") +modparam("mi_fifo", "fifo_mode", 0666) + +loadmodule "mi_datagram.so" +modparam("mi_datagram", "socket_name", "udp:127.0.0.1:8020") + + +#### Eventdatagram module +loadmodule "event_datagram.so" + + +#### URI module +loadmodule "uri.so" +modparam("uri", "use_uri_table", 0) + + +#### USeR LOCation module +loadmodule "usrloc.so" +modparam("usrloc", "nat_bflag", "NAT") +modparam("usrloc", "db_mode", 0) + +#### REGISTRAR module +loadmodule "registrar.so" +modparam("registrar", "tcp_persistent_flag", "TCP_PERSISTENT") + +/* uncomment the next line not to allow more than 10 contacts per AOR */ +#modparam("registrar", "max_contacts", 10) + +#### DIALOG module +loadmodule "dialog.so" +modparam("dialog", "dlg_match_mode", 1) +modparam("dialog", "default_timeout", 21600) # 6 hours timeout +modparam("dialog", "db_mode", 0) + + +#### ACCounting module +loadmodule "acc.so" +/* what special events should be accounted ? */ +modparam("acc", "early_media", 0) +modparam("acc", "report_cancels", 0) +modparam("acc", "cdr_flag", "CDR") +modparam("acc", "evi_flag", "CDR") +modparam("acc", "evi_missed_flag", "CDR") +modparam("acc", "evi_extra", + "cgr_reqtype=$avp(cgr_reqtype); + cgr_account=$avp(cgr_account); + cgr_subject=$avp(cgr_subject); + cgr_destination=$avp(cgr_destination); + originalUri=$ou") + +#### CfgUtils module +loadmodule "cfgutils.so" + +#### CacheDB Local +loadmodule "cachedb_local.so" + + +####### Routing Logic ######## + +startup_route { + subscribe_event("E_OPENSIPS_START", "udp:127.0.0.1:2020"); + raise_event("E_OPENSIPS_START"); +} + +# main request routing logic + +route{ + + + if (!mf_process_maxfwd_header("10")) { + sl_send_reply("483","Too Many Hops"); + exit; + } + + if (has_totag()) { + # sequential request withing a dialog should + # take the path determined by record-routing + if (loose_route()) { + if (is_method("BYE")) { + #setflag(CDR); # do accounting ... + } else if (is_method("INVITE")) { + # even if in most of the cases is useless, do RR for + # re-INVITEs alos, as some buggy clients do change route set + # during the dialog. + record_route(); + } + + + + # route it out to whatever destination was set by loose_route() + # in $du (destination URI). + route(relay); + } else { + + if ( is_method("ACK") ) { + if ( t_check_trans() ) { + # non loose-route, but stateful ACK; must be an ACK after + # a 487 or e.g. 404 from upstream server + t_relay(); + exit; + } else { + # ACK without matching transaction -> + # ignore and discard + exit; + } + } + sl_send_reply("404","Not here"); + } + exit; + } + + # CANCEL processing + if (is_method("CANCEL")) + { + if (t_check_trans()) + t_relay(); + exit; + } + + t_check_trans(); + + if ( !(is_method("REGISTER") ) ) { + + if (from_uri==myself) + + { + + } else { + # if caller is not local, then called number must be local + + if (!uri==myself) { + send_reply("403","Rely forbidden"); + exit; + } + } + + } + + # preloaded route checking + if (loose_route()) { + xlog("L_ERR", + "Attempt to route with preloaded Route's [$fu/$tu/$ru/$ci]"); + if (!is_method("ACK")) + sl_send_reply("403","Preload Route denied"); + exit; + } + + # record routing + if (!is_method("REGISTER|MESSAGE")) + record_route(); + + # account only INVITEs + if (is_method("INVITE")) { + # create dialog with timeout + if ( !create_dialog("B") ) { + send_reply("500","Internal Server Error"); + exit; + } + setflag(CDR); + $avp(cgr_reqtype)="pseudoprepaid"; + $avp(cgr_account)=$fU; + $avp(cgr_subject)=$fU; + $avp(cgr_destination)="+4986517174963"; + if $avp(cgr_reqtype)=="pseudoprepaid" || $avp(cgr_reqtype)=="prepaid" { #Make sure we got enough balance for the call + $avp(auth_keys) = "cgr_reqtype"; + $avp(auth_vals) = $avp(cgr_reqtype); + $avp(auth_keys) = "callid"; + $avp(auth_vals) = $ci; + $avp(auth_keys) = "from_tag"; + $avp(auth_vals) = $ft; + $avp(auth_keys) = "cgr_account"; + $avp(auth_vals) = $avp(cgr_account); + $avp(auth_keys) = "cgr_subject"; + $avp(auth_vals) = $avp(cgr_subject); + $avp(auth_keys) = "cgr_destination"; + $avp(auth_vals) = $avp(cgr_destination); + $avp(auth_keys) = "created"; + $avp(auth_vals) = $Ts; + raise_event("E_CGR_AUTHORIZE", $avp(auth_keys), $avp(auth_vals)); + $var(accid) = $ci+";"+$ft+";"; + $var(rply_cgr_notify) = $var(accid)+"/"+"cgr_notify"; #Key in localcache for cgr_notify + $var(rply_cgr_maxdur) = $var(accid)+"/"+"cgr_maxdur"; #Key in localcache for cgr_maxdur + $var(ms) = 0; + while($var(ms) < 1000) { # Check for values set every 2 ms for maximum 1 second + if cache_fetch("local", "$var(rply_cgr_notify)", $avp(cgr_notify) ) $var(ms) = 1000; # Break out + xlog("Check ms: $var(ms), cgr_notify: $avp(cgr_notify)"); + $var(ms) = $var(ms) + 10; + usleep("10"); + + } + if $avp(cgr_notify) == NULL { # Cannot check it in switch + xlog("Checked cgr_notify variable $var(rply_cgr_notify) got value: $avp(cgr_notify)"); + sl_send_reply("503","Prepaid controller error"); + exit; + } + switch ($avp(cgr_notify)) { + case "SERVER_ERROR": + sl_send_reply("503","Prepaid controller error"); + exit; + case "INSUFFICENT_FUNDS": + sl_send_reply("403", "Payment required"); + exit; + } + if !cache_fetch("local", "$var(rply_cgr_maxdur)", $avp(cgr_maxdur) ) { + sl_send_reply("503","Prepaid controller error on maxdur"); + exit; + } + $DLG_timeout=$avp(cgr_maxdur); + } + } + + if (!uri==myself) { + append_hf("P-hint: outbound\r\n"); + + route(relay); + } + + # requests for my domain + + if (is_method("PUBLISH|SUBSCRIBE")) + { + sl_send_reply("503", "Service Unavailable"); + exit; + } + + if (is_method("REGISTER")) + { + + + if ( 0 ) setflag(TCP_PERSISTENT); + + if (!save("location")) + sl_reply_error(); + + exit; + } + + if ($rU==NULL) { + # request with no Username in RURI + sl_send_reply("484","Address Incomplete"); + exit; + } + + + # do lookup with method filtering + if (!lookup("location","m")) { + + + t_newtran(); + t_reply("404", "Not Found"); + exit; + } + + route(relay); +} + + +route[relay] { + # for INVITEs enable some additional helper routes + if (is_method("INVITE")) { + + + + #t_on_branch("per_branch_ops"); + #t_on_reply("handle_nat"); + t_on_failure("missed_call"); + } + + + + if (!t_relay()) { + send_reply("500","Internal Error"); + }; + exit; +} + + + + +failure_route[missed_call] { + if (t_was_cancelled()) { + exit; + } + + # uncomment the following lines if you want to block client + # redirect based on 3xx replies. + ##if (t_check_status("3[0-9][0-9]")) { + ##t_reply("404","Not found"); + ## exit; + ##} + + +} + diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 6b924cee6..8632068b0 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -187,10 +187,6 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) { engine.Logger.Err("Error parsing answer event start time, using time.Now!") startTime = time.Now() } - // if there is no account configured leave the call alone - if dc.RunId == utils.DEFAULT_RUNID && !utils.IsSliceMember([]string{utils.PREPAID, utils.PSEUDOPREPAID}, ev.GetReqType(utils.META_DEFAULT)) { - return // we unpark only prepaid and pseudoprepaid calls - } if ev.MissingParameter() { sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(dc.DestinationField), MISSING_PARAMETER) engine.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID())) diff --git a/sessionmanager/osipsevent.go b/sessionmanager/osipsevent.go index 37431f991..1ff94ab8e 100644 --- a/sessionmanager/osipsevent.go +++ b/sessionmanager/osipsevent.go @@ -44,6 +44,7 @@ const ( SETUP_DURATION = "setuptime" OSIPS__SETUP_TIME = "created" OSIPS_DURATION = "duration" + OSIPS_AUTH_OK = "AUTH_OK" ) func NewOsipsEvent(osipsDagramEvent *osipsdagram.OsipsEvent) (*OsipsEvent, error) { @@ -160,16 +161,10 @@ func (osipsev *OsipsEvent) GetDuration(fieldName string) (time.Duration, error) return utils.ParseDurationWithSecs(durStr) } func (osipsev *OsipsEvent) MissingParameter() bool { - var nilTime time.Time - var nilDur time.Duration - aTime, _ := osipsev.GetAnswerTime(utils.META_DEFAULT) - dur, _ := osipsev.GetDuration(utils.META_DEFAULT) return len(osipsev.GetUUID()) == 0 || len(osipsev.GetAccount(utils.META_DEFAULT)) == 0 || len(osipsev.GetSubject(utils.META_DEFAULT)) == 0 || - len(osipsev.GetDestination(utils.META_DEFAULT)) == 0 || - aTime == nilTime || - dur == nilDur + len(osipsev.GetDestination(utils.META_DEFAULT)) == 0 } func (osipsev *OsipsEvent) ParseEventValue(*utils.RSRField) string { return "" diff --git a/sessionmanager/osipssm.go b/sessionmanager/osipssm.go index 20db4cac8..88d529abd 100644 --- a/sessionmanager/osipssm.go +++ b/sessionmanager/osipssm.go @@ -24,22 +24,25 @@ import ( "fmt" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" "github.com/cgrates/osipsdagram" "strings" "time" ) -func NewOSipsSessionManager(cfg *config.CGRConfig, cdrsrv engine.Connector) (*OsipsSessionManager, error) { - osm := &OsipsSessionManager{cgrCfg: cfg, cdrsrv: cdrsrv} +func NewOSipsSessionManager(cfg *config.CGRConfig, rater, cdrsrv engine.Connector) (*OsipsSessionManager, error) { + osm := &OsipsSessionManager{cgrCfg: cfg, rater: rater, cdrsrv: cdrsrv} osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){ "E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.OnOpensipsStart}, "E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.OnCdr}, + "E_CGR_AUTHORIZE": []func(*osipsdagram.OsipsEvent){osm.OnAuthorize}, } return osm, nil } type OsipsSessionManager struct { cgrCfg *config.CGRConfig + rater engine.Connector cdrsrv engine.Connector eventHandlers map[string][]func(*osipsdagram.OsipsEvent) evSubscribeStop *chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it @@ -144,3 +147,90 @@ func (osm *OsipsSessionManager) OnCdr(cdrDagram *osipsdagram.OsipsEvent) { engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CgrId, storedCdr.AccId, err.Error())) } } + +// Process Authorize request from OpenSIPS and communicate back maxdur +func (osm *OsipsSessionManager) OnAuthorize(osipsDagram *osipsdagram.OsipsEvent) { + ev, _ := NewOsipsEvent(osipsDagram) + if ev.MissingParameter() { + cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_MANDATORY_IE_MISSING) + if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) { + engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply))) + } + return + } + var maxCallDuration time.Duration // This will be the maximum duration this channel will be allowed to last + var durInitialized bool + attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), + Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} + var dcs utils.DerivedChargers + if err := osm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil { + engine.Logger.Err(fmt.Sprintf(" OnAuthorize: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error())) + cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_SERVER_ERROR) + if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) { + engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply))) + } + return + } + dcs, _ = dcs.AppendDefaultRun() + for _, dc := range dcs { + runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP) + matchingAllFilters := true + for _, dcRunFilter := range runFilters { + if fltrPass, _ := ev.PassesFieldFilter(dcRunFilter); !fltrPass { + matchingAllFilters = false + break + } + } + if !matchingAllFilters { // Do not process the derived charger further if not all filters were matched + continue + } + startTime, err := ev.GetSetupTime(utils.META_DEFAULT) + if err != nil { + engine.Logger.Err("Error parsing answer event start time, using time.Now!") + startTime = time.Now() + } + cd := engine.CallDescriptor{ + Direction: ev.GetDirection(dc.DirectionField), + Tenant: ev.GetTenant(dc.TenantField), + Category: ev.GetCategory(dc.CategoryField), + Subject: ev.GetSubject(dc.SubjectField), + Account: ev.GetAccount(dc.AccountField), + Destination: ev.GetDestination(dc.DestinationField), + TimeStart: startTime, + TimeEnd: startTime.Add(osm.cgrCfg.SMMaxCallDuration), + } + var remainingDurationFloat float64 + err = osm.rater.GetMaxSessionTime(cd, &remainingDurationFloat) + if err != nil { + engine.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err)) + cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_SERVER_ERROR) + if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) { + engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply))) + } + return + } + remainingDuration := time.Duration(remainingDurationFloat) + // Set maxCallDuration, smallest out of all forked sessions + if !durInitialized { // first time we set it /not initialized yet + maxCallDuration = remainingDuration + durInitialized = true + } else if maxCallDuration > remainingDuration { + maxCallDuration = remainingDuration + } + } + if maxCallDuration <= osm.cgrCfg.SMMinCallDuration { + cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), INSUFFICIENT_FUNDS) + if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) { + engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply))) + } + return + } + cmdMaxDur := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_maxdur\n%d\n\n", ev.GetUUID(), int(maxCallDuration.Seconds())) + if reply, err := osm.miConn.SendCommand([]byte(cmdMaxDur)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) { + engine.Logger.Err(fmt.Sprintf("Failed setting cgr_maxdur variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply))) + } + cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n", ev.GetUUID(), OSIPS_AUTH_OK) + if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) { + engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply))) + } +}