From a826604eca3583aa15738706daeb1d1d53e99789 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 7 May 2015 11:30:02 +0200 Subject: [PATCH] MaxUsageReq to be used in callsetup APIs, OpenSIPS-SM modifications for auth, opensips.cfg changes in tutorial, adding *now in ParseTimeDetectLayout function --- apier/v1/callsetup.go | 38 +- data/opensips/etc/opensips/opensips.cfg | 362 ------------------ .../cgrates/etc/cgrates/cgrates.json | 3 - .../opensips/etc/opensips/opensips.cfg | 100 +++-- engine/storedcdr.go | 31 ++ engine/storedcdr_test.go | 15 + sessionmanager/osipssm.go | 185 +++------ utils/coreutils.go | 2 + utils/utils_test.go | 5 + 9 files changed, 183 insertions(+), 558 deletions(-) delete mode 100644 data/opensips/etc/opensips/opensips.cfg diff --git a/apier/v1/callsetup.go b/apier/v1/callsetup.go index e1e8c3c97..0e8f96202 100644 --- a/apier/v1/callsetup.go +++ b/apier/v1/callsetup.go @@ -19,14 +19,48 @@ along with this program. If not, see package v1 import ( + "fmt" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "strconv" "time" ) // Returns MaxSessionTime in seconds, -1 for no limit -func (self *ApierV1) GetMaxSessionTime(cdr engine.StoredCdr, maxSessionTime *float64) error { +func (self *ApierV1) GetMaxSessionTime(auth engine.MaxUsageReq, maxSessionTime *float64) error { + if auth.TOR == "" { + auth.TOR = utils.VOICE + } + if auth.ReqType == "" { + auth.ReqType = self.Config.DefaultReqType + } + if auth.Direction == "" { + auth.Direction = utils.OUT + } + if auth.Tenant == "" { + auth.Tenant = self.Config.DefaultTenant + } + if auth.Category == "" { + auth.Category = self.Config.DefaultCategory + } + if auth.Subject == "" { + auth.Subject = auth.Account + } + if auth.Subject == "" { + auth.Subject = auth.Account + } + if auth.SetupTime == "" { + auth.SetupTime = utils.META_NOW + } + if auth.Usage == "" { + auth.Usage = strconv.FormatFloat(self.Config.MaxCallDuration.Seconds(), 'f', -1, 64) + } + storedCdr, err := auth.AsStoredCdr() + if err != nil { + return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) + } var maxDur float64 - if err := self.Responder.GetDerivedMaxSessionTime(cdr, &maxDur); err != nil { + if err := self.Responder.GetDerivedMaxSessionTime(*storedCdr, &maxDur); err != nil { return err } if maxDur == -1.0 { diff --git a/data/opensips/etc/opensips/opensips.cfg b/data/opensips/etc/opensips/opensips.cfg deleted file mode 100644 index 559945c8a..000000000 --- a/data/opensips/etc/opensips/opensips.cfg +++ /dev/null @@ -1,362 +0,0 @@ - -# -# $Id$ -# -# OpenSIPS residential configuration script -# by OpenSIPS Solutions -# -# This script was generated via "make menuconfig", from -# the "Residential" scenario. -# You can enable / disable more features / functionalities by -# re-generating the scenario with different options.# -# -# Please refer to the Core CookBook at: -# http://www.opensips.org/Resources/DocsCookbooks -# for a explanation of possible statements, functions and parameters. -# - - -####### Global Parameters ######### - -debug=3 -log_stderror=no -log_facility=LOG_LOCAL0 - -fork=yes -children=4 -listen=udp:lo:5060 -listen=udp:eth0:5060 - -auto_aliases=no - -#disable_tcp=yes - -#disable_tls=yes - - -####### Modules Section ######## - -#set module path -mpath="/usr/lib/opensips/modules" - -#### SIGNALING module -loadmodule "signaling.so" - -#### StateLess module -loadmodule "sl.so" - -#### Transaction Module -loadmodule "tm.so" -modparam("tm", "fr_timeout", 5) -modparam("tm", "fr_inv_timeout", 30) -modparam("tm", "restart_fr_on_each_reply", 0) -modparam("tm", "onreply_avp_mode", 1) - -#### Record Route Module -loadmodule "rr.so" -/* do not append from tag to the RR (no need for this script) */ -modparam("rr", "append_fromtag", 0) - -#### MAX ForWarD module -loadmodule "maxfwd.so" - -#### SIP MSG OPerationS module -loadmodule "sipmsgops.so" - -#### FIFO Management Interface -loadmodule "mi_fifo.so" -modparam("mi_fifo", "fifo_name", "/tmp/opensips_fifo") -modparam("mi_fifo", "fifo_mode", 0666) - -loadmodule "mi_datagram.so" -modparam("mi_datagram", "socket_name", "udp:127.0.0.1:8020") - - -#### Eventdatagram module -loadmodule "event_datagram.so" - - -#### URI module -loadmodule "uri.so" -modparam("uri", "use_uri_table", 0) - - -#### USeR LOCation module -loadmodule "usrloc.so" -modparam("usrloc", "nat_bflag", "NAT") -modparam("usrloc", "db_mode", 0) - -#### REGISTRAR module -loadmodule "registrar.so" -modparam("registrar", "tcp_persistent_flag", "TCP_PERSISTENT") - -/* uncomment the next line not to allow more than 10 contacts per AOR */ -#modparam("registrar", "max_contacts", 10) - -#### DIALOG module -loadmodule "dialog.so" -modparam("dialog", "dlg_match_mode", 1) -modparam("dialog", "default_timeout", 21600) # 6 hours timeout -modparam("dialog", "db_mode", 0) - - -#### ACCounting module -loadmodule "acc.so" -/* what special events should be accounted ? */ -modparam("acc", "early_media", 0) -modparam("acc", "report_cancels", 0) -modparam("acc", "cdr_flag", "CDR") -modparam("acc", "evi_flag", "CDR") -modparam("acc", "evi_missed_flag", "CDR") -modparam("acc", "evi_extra", - "cgr_reqtype=$avp(cgr_reqtype); - cgr_account=$avp(cgr_account); - cgr_subject=$avp(cgr_subject); - cgr_destination=$avp(cgr_destination); - originalUri=$ou") - -#### CfgUtils module -loadmodule "cfgutils.so" - -#### CacheDB Local -loadmodule "cachedb_local.so" - -#### UDP protocol -loadmodule "proto_udp.so" - - -####### Routing Logic ######## - -startup_route { - subscribe_event("E_OPENSIPS_START", "udp:127.0.0.1:2020"); - raise_event("E_OPENSIPS_START"); -} - -# main request routing logic - -route{ - - - if (!mf_process_maxfwd_header("10")) { - sl_send_reply("483","Too Many Hops"); - exit; - } - - if (has_totag()) { - # sequential request withing a dialog should - # take the path determined by record-routing - if (loose_route()) { - if (is_method("BYE")) { - #setflag(CDR); # do accounting ... - } else if (is_method("INVITE")) { - # even if in most of the cases is useless, do RR for - # re-INVITEs alos, as some buggy clients do change route set - # during the dialog. - record_route(); - } - - - - # route it out to whatever destination was set by loose_route() - # in $du (destination URI). - route(relay); - } else { - - if ( is_method("ACK") ) { - if ( t_check_trans() ) { - # non loose-route, but stateful ACK; must be an ACK after - # a 487 or e.g. 404 from upstream server - t_relay(); - exit; - } else { - # ACK without matching transaction -> - # ignore and discard - exit; - } - } - sl_send_reply("404","Not here"); - } - exit; - } - - # CANCEL processing - if (is_method("CANCEL")) - { - if (t_check_trans()) - t_relay(); - exit; - } - - t_check_trans(); - - if ( !(is_method("REGISTER") ) ) { - - if (from_uri==myself) - - { - - } else { - # if caller is not local, then called number must be local - - if (!uri==myself) { - send_reply("403","Relay forbidden"); - exit; - } - } - - } - - # preloaded route checking - if (loose_route()) { - xlog("L_ERR", - "Attempt to route with preloaded Route's [$fu/$tu/$ru/$ci]"); - if (!is_method("ACK")) - sl_send_reply("403","Preload Route denied"); - exit; - } - - # record routing - if (!is_method("REGISTER|MESSAGE")) - record_route(); - - # account only INVITEs - if (is_method("INVITE")) { - # create dialog with timeout - if ( !create_dialog("B") ) { - send_reply("500","Internal Server Error"); - exit; - } - setflag(CDR); - $avp(cgr_reqtype)="*pseudoprepaid"; - $avp(cgr_account)=$fU; - $avp(cgr_subject)=$fU; - $avp(cgr_destination)=$rU; - if $avp(cgr_reqtype)=="*pseudoprepaid" || $avp(cgr_reqtype)=="*prepaid" { #Make sure we got enough balance for the call - $avp(auth_keys) = "cgr_reqtype"; - $avp(auth_vals) = $avp(cgr_reqtype); - $avp(auth_keys) = "callid"; - $avp(auth_vals) = $ci; - $avp(auth_keys) = "from_tag"; - $avp(auth_vals) = $ft; - $avp(auth_keys) = "cgr_account"; - $avp(auth_vals) = $avp(cgr_account); - $avp(auth_keys) = "cgr_subject"; - $avp(auth_vals) = $avp(cgr_subject); - $avp(auth_keys) = "cgr_destination"; - $avp(auth_vals) = $avp(cgr_destination); - $avp(auth_keys) = "created"; - $avp(auth_vals) = $Ts; - raise_event("E_CGR_AUTHORIZE", $avp(auth_keys), $avp(auth_vals)); - $var(accid) = $ci+";"+$ft+";"; - $var(rply_cgr_notify) = $var(accid)+"/"+"cgr_notify"; #Key in localcache for cgr_notify - $var(rply_cgr_maxdur) = $var(accid)+"/"+"cgr_maxdur"; #Key in localcache for cgr_maxdur - $var(ms) = 0; - while($var(ms) < 1000) { # Check for values set every 10 ms for maximum 1 second - if cache_fetch("local", "$var(rply_cgr_notify)", $avp(cgr_notify) ) $var(ms) = 1000; # Break out - $var(ms) = $var(ms) + 10; - usleep("10"); - } - if $avp(cgr_notify) == NULL { # Cannot check it in switch - xlog("Checked cgr_notify variable $var(rply_cgr_notify) got value: $avp(cgr_notify)"); - sl_send_reply("503","Prepaid controller error"); - exit; - } - switch ($avp(cgr_notify)) { - case "SERVER_ERROR": - sl_send_reply("503","Prepaid controller error"); - exit; - case "INSUFFICENT_FUNDS": - sl_send_reply("403", "Payment required"); - exit; - } - if !cache_fetch("local", "$var(rply_cgr_maxdur)", $avp(cgr_maxdur) ) { - sl_send_reply("503","Prepaid controller error on maxdur"); - exit; - } - $DLG_timeout=$avp(cgr_maxdur); - } - } - - if (!uri==myself) { - append_hf("P-hint: outbound\r\n"); - - route(relay); - } - - # requests for my domain - - if (is_method("PUBLISH|SUBSCRIBE")) - { - sl_send_reply("503", "Service Unavailable"); - exit; - } - - if (is_method("REGISTER")) - { - - - if ( 0 ) setflag(TCP_PERSISTENT); - - if (!save("location")) - sl_reply_error(); - - exit; - } - - if ($rU==NULL) { - # request with no Username in RURI - sl_send_reply("484","Address Incomplete"); - exit; - } - - - # do lookup with method filtering - if (!lookup("location","m")) { - - - t_newtran(); - t_reply("404", "Not Found"); - exit; - } - - route(relay); -} - - -route[relay] { - # for INVITEs enable some additional helper routes - if (is_method("INVITE")) { - - - - #t_on_branch("per_branch_ops"); - #t_on_reply("handle_nat"); - t_on_failure("missed_call"); - } - - - - if (!t_relay()) { - send_reply("500","Internal Error"); - }; - exit; -} - - - - -failure_route[missed_call] { - if (t_was_cancelled()) { - exit; - } - - # uncomment the following lines if you want to block client - # redirect based on 3xx replies. - ##if (t_check_status("3[0-9][0-9]")) { - ##t_reply("404","Not found"); - ## exit; - ##} - - -} - diff --git a/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json b/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json index 83b03e55e..9b9b4987c 100644 --- a/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json @@ -70,9 +70,6 @@ "listen_udp": "127.0.0.1:2020", // address where to listen for datagram events coming from OpenSIPS "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013> "cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234> - "debit_interval": "5s", // interval to perform debits on. - "min_call_duration": "0s", // only authorize calls with allowed duration higher than this - "max_call_duration": "3h", // maximum call duration a prepaid call can last "events_subscribe_interval": "60s", // automatic events subscription to OpenSIPS, 0 to disable it "mi_addr": "127.0.0.1:8020", // address where to reach OpenSIPS MI to send session disconnects }, diff --git a/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg b/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg index a7961c6d2..c0ec842a1 100644 --- a/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg +++ b/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg @@ -103,7 +103,7 @@ modparam("acc", "report_cancels", 0) modparam("acc", "cdr_flag", "CDR") modparam("acc", "evi_flag", "CDR") modparam("acc", "evi_missed_flag", "CDR") -#modparam("acc", "acc_created_avp_name", "created_avp") +modparam("acc", "acc_created_avp_name", "created_avp") modparam("acc", "evi_extra", "cgr_reqtype=$avp(cgr_reqtype); cgr_account=$avp(cgr_account); @@ -120,6 +120,12 @@ loadmodule "cachedb_local.so" #### UDP protocol loadmodule "proto_udp.so" +#### Rest client +loadmodule "rest_client.so" + +#### JSON parser +loadmodule "json.so" + ####### Routing Logic ######## @@ -223,12 +229,7 @@ route{ exit; } setflag(CDR); - #acc_evi_request("DIALOG_START"); - $avp(cgr_reqtype)="*pseudoprepaid"; - $avp(cgr_account)=$fU; - $avp(cgr_subject)=$fU; - $avp(cgr_destination)=$rU; - route(CGR_AUTH); + route(CGR_AUTH_REQ); } if (!uri==myself) { @@ -264,62 +265,48 @@ route{ t_newtran(); t_reply("404", "Not Found"); exit; - } + } route(relay); } -route[CGR_AUTH] { - if $avp(cgr_reqtype)=="*pseudoprepaid" || $avp(cgr_reqtype)=="*prepaid" { #Make sure we got enough balance for the call - $avp(auth_keys) = "cgr_reqtype"; - $avp(auth_vals) = $avp(cgr_reqtype); - $avp(auth_keys) = "callid"; - $avp(auth_vals) = $ci; - $avp(auth_keys) = "from_tag"; - $avp(auth_vals) = $ft; - $avp(auth_keys) = "cgr_account"; - $avp(auth_vals) = $avp(cgr_account); - $avp(auth_keys) = "cgr_subject"; - $avp(auth_vals) = $avp(cgr_subject); - $avp(auth_keys) = "cgr_destination"; - $avp(auth_vals) = $avp(cgr_destination); - $avp(auth_keys) = "created"; - $avp(auth_vals) = $Ts; - raise_event("E_CGR_AUTHORIZE", $avp(auth_keys), $avp(auth_vals)); - $var(accid) = $ci+";"+$ft+";"; - $var(rply_cgr_notify) = $var(accid)+"/"+"cgr_notify"; #Key in localcache for cgr_notify - $var(rply_cgr_maxdur) = $var(accid)+"/"+"cgr_maxdur"; #Key in localcache for cgr_maxdur - $var(ms) = 0; - while($var(ms) < 1000) { # Check for values set every 10 ms for maximum 1 second - if cache_fetch("local", "$var(rply_cgr_notify)", $avp(cgr_notify) ) $var(ms) = 1000; # Break out - $var(ms) = $var(ms) + 10; - usleep("10"); - } - if $avp(cgr_notify) == NULL { # Cannot check it in switch - xlog("Checked cgr_notify variable $var(rply_cgr_notify) got value: $avp(cgr_notify)"); - sl_send_reply("503","Prepaid controller error"); - exit; - } - switch ($avp(cgr_notify)) { - case "SERVER_ERROR": - sl_send_reply("503","Prepaid controller error"); - exit; - case "INSUFFICENT_FUNDS": - sl_send_reply("403", "Payment required"); - exit; - } - if !cache_fetch("local", "$var(rply_cgr_maxdur)", $avp(cgr_maxdur) ) { - sl_send_reply("503","Prepaid controller error on maxdur"); - exit; - } - $DLG_timeout=$avp(cgr_maxdur); - } +## Send AUTH request to CGRateS engine +route[CGR_AUTH_REQ] { + $avp(cgr_reqtype)="*pseudoprepaid"; + $avp(cgr_account)=$fU; + $avp(cgr_destination)=$rU; + + # Code to produce the json + $json(cgr_auth) := "{}"; + $json(cgr_auth/id) = "1"; + $json(cgr_auth/method) = "ApierV1.GetMaxSessionTime"; + $json(cgr_auth/params) := "[{}]"; + $json(cgr_auth/params[0]/ReqType) = $avp(cgr_reqtype); + $json(cgr_auth/params[0]/Account) = $avp(cgr_account); + $json(cgr_auth/params[0]/Destination) = $avp(cgr_destination); + async(rest_post("http://127.0.0.1:2080/jsonrpc", "$json(cgr_auth)", "application/json", "$avp(cgr_auth_reply)", "$var(ct)", "$var(rcode)"), CGR_AUTH_REPLY); +} + +## Process answer received from CGRateS engine +route[CGR_AUTH_REPLY] { + $json(cgr_auth_reply) := $avp(cgr_auth_reply); + xlog("In CGR_AUTH_REPLY, reply received: $json(cgr_auth_reply)"); + if $json(cgr_auth_reply/error) != NULL { + xlog("Error received from CGRateS: $json(cgr_auth_reply/error)"); + sl_send_reply("503","Charging controller error"); + exit; + } + if $json(cgr_auth_reply/result) > -1 { + $DLG_timeout=$json(cgr_auth_reply/result); + } + route(relay); } route[relay] { # for INVITEs enable some additional helper routes - if (is_method("INVITE")) { + if (is_method("INVITE") && !has_totag()) { + t_on_reply("invite_reply"); t_on_failure("missed_call"); } if (!t_relay()) { @@ -328,7 +315,10 @@ route[relay] { exit; } - +onreply_route[invite_reply] { + xlog("incoming reply\n"); + #acc_evi_request("DIALOG_START"); +} failure_route[missed_call] { diff --git a/engine/storedcdr.go b/engine/storedcdr.go index 30e6669cf..6ffe9eda3 100644 --- a/engine/storedcdr.go +++ b/engine/storedcdr.go @@ -567,3 +567,34 @@ type ExternalCdr struct { CostDetails string Rated bool // Mark the CDR as rated so we do not process it during mediation } + +// Used when authorizing requests from outside, eg ApierV1.GetMaxSessionTime +type MaxUsageReq struct { + TOR string + ReqType string + Direction string + Tenant string + Category string + Account string + Subject string + Destination string + SetupTime string + AnswerTime string + Usage string +} + +func (self *MaxUsageReq) AsStoredCdr() (*StoredCdr, error) { + var err error + storedCdr := &StoredCdr{TOR: self.TOR, ReqType: self.ReqType, Direction: self.Direction, Tenant: self.Tenant, Category: self.Category, + Account: self.Account, Subject: self.Subject, Destination: self.Destination} + if storedCdr.SetupTime, err = utils.ParseTimeDetectLayout(self.SetupTime); err != nil { + return nil, err + } + if storedCdr.AnswerTime, err = utils.ParseTimeDetectLayout(self.AnswerTime); err != nil { + return nil, err + } + if storedCdr.Usage, err = utils.ParseDurationWithSecs(self.Usage); err != nil { + return nil, err + } + return storedCdr, nil +} diff --git a/engine/storedcdr_test.go b/engine/storedcdr_test.go index acabc9e3c..b06668d24 100644 --- a/engine/storedcdr_test.go +++ b/engine/storedcdr_test.go @@ -512,3 +512,18 @@ func TestStoredCdrEventFields(t *testing.T) { t.Error("Received: ", extraFlds) } } + +func TestMaxUsageReqAsStoredCdr(t *testing.T) { + setupReq := &MaxUsageReq{TOR: utils.VOICE, ReqType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org", Category: "call", + Account: "1001", Subject: "1001", Destination: "1002", + SetupTime: "2013-11-07T08:42:20Z", AnswerTime: "2013-11-07T08:42:26Z", Usage: "0.00000001", + } + eStorCdr := &StoredCdr{TOR: utils.VOICE, ReqType: utils.META_RATED, Direction: "*out", + Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002", + SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), Usage: time.Duration(10)} + if storedCdr, err := setupReq.AsStoredCdr(); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eStorCdr, storedCdr) { + t.Errorf("Expected: %+v, received: %+v", eStorCdr, storedCdr) + } +} diff --git a/sessionmanager/osipssm.go b/sessionmanager/osipssm.go index 01c1d3fd9..7adabc3ea 100644 --- a/sessionmanager/osipssm.go +++ b/sessionmanager/osipssm.go @@ -22,13 +22,11 @@ import ( "bytes" "errors" "fmt" - "strings" - "time" - "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" "github.com/cgrates/osipsdagram" + "strings" + "time" ) /* @@ -85,7 +83,6 @@ func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, rater, cdrsrv engi osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){ "E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart}, "E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr}, - "E_CGR_AUTHORIZE": []func(*osipsdagram.OsipsEvent){osm.onAuthorize}, "E_ACC_EVENT": []func(*osipsdagram.OsipsEvent){osm.onCdr}, "E_ACC_MISSED_EVENT": []func(*osipsdagram.OsipsEvent){osm.onCdr}, } @@ -97,7 +94,7 @@ type OsipsSessionManager struct { rater engine.Connector cdrsrv engine.Connector eventHandlers map[string][]func(*osipsdagram.OsipsEvent) - evSubscribeStop *chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it + evSubscribeStop chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it stopServing chan struct{} // Stop serving datagrams miConn *osipsdagram.OsipsMiDatagramConnector // Pool of connections used to various OpenSIPS servers, keep reference towards events received so we can issue commands always to the same remote } @@ -107,10 +104,9 @@ func (osm *OsipsSessionManager) Connect() (err error) { if osm.miConn, err = osipsdagram.NewOsipsMiDatagramConnector(osm.cfg.MiAddr, osm.cfg.Reconnects); err != nil { return fmt.Errorf("Cannot connect to OpenSIPS at %s, error: %s", osm.cfg.MiAddr, err.Error()) } - evSubscribeStop := make(chan struct{}) - osm.evSubscribeStop = &evSubscribeStop - defer close(*osm.evSubscribeStop) // Stop subscribing on disconnect - go osm.SubscribeEvents(evSubscribeStop) + osm.evSubscribeStop = make(chan struct{}) + defer func() { osm.evSubscribeStop <- struct{}{} }() // Stop subscribing on disconnect + go osm.SubscribeEvents(osm.evSubscribeStop) evsrv, err := osipsdagram.NewEventServer(osm.cfg.ListenUdp, osm.eventHandlers) if err != nil { engine.Logger.Err(fmt.Sprintf(" Cannot initialize datagram server, error: <%s>", err.Error())) @@ -147,54 +143,61 @@ func (osm *OsipsSessionManager) Shutdown() error { return nil } -// Event Handlers - // Automatic subscribe to OpenSIPS for events, trigered on Connect or OpenSIPS restart func (osm *OsipsSessionManager) SubscribeEvents(evStop chan struct{}) error { + if err := osm.subscribeEvents(); err != nil { // Init subscribe + return err + } for { select { case <-evStop: // Break this loop from outside return nil - default: - subscribeInterval := osm.cfg.EventsSubscribeInterval + time.Duration(1)*time.Second // Avoid concurrency on expiry - listenAddrSplt := strings.Split(osm.cfg.ListenUdp, ":") - portListen := listenAddrSplt[1] - addrListen := listenAddrSplt[0] - if len(addrListen) == 0 { //Listen on all addresses, try finding out from mi connection - if localAddr := osm.miConn.LocallAddr(); localAddr != nil { - addrListen = strings.Split(localAddr.String(), ":")[0] - } + case <-time.After(osm.cfg.EventsSubscribeInterval): // Subscribe on interval + if err := osm.subscribeEvents(); err != nil { + return err } - for eventName := range osm.eventHandlers { - if eventName == "E_OPENSIPS_START" { // Do not subscribe for start since this should be hardcoded - continue - } - cmd := fmt.Sprintf(":event_subscribe:\n%s\nudp:%s:%s\n%d\n", eventName, addrListen, portListen, int(subscribeInterval.Seconds())) - success := false - for attempts := 0; attempts < osm.cfg.Reconnects; attempts++ { - if reply, err := osm.miConn.SendCommand([]byte(cmd)); err == nil && bytes.HasPrefix(reply, []byte("200 OK")) { - success = true - break - } - time.Sleep(time.Duration((attempts+1)/2) * time.Second) // Allow OpenSIPS to recover from errors - continue // Try again - } - if !success { - engine.Logger.Err(fmt.Sprintf(" Shutting down, failed subscribing to OpenSIPS at address: <%s>", osm.cfg.MiAddr)) - close(osm.stopServing) // Do not serve anymore since we got errors on subscribing - return errors.New("Failed subscribing to OpenSIPS events") - } - } - time.Sleep(osm.cfg.EventsSubscribeInterval) } } } +// One subscribe attempt to OpenSIPS +func (osm *OsipsSessionManager) subscribeEvents() error { + subscribeInterval := osm.cfg.EventsSubscribeInterval + time.Duration(1)*time.Second // Avoid concurrency on expiry + listenAddrSplt := strings.Split(osm.cfg.ListenUdp, ":") + portListen := listenAddrSplt[1] + addrListen := listenAddrSplt[0] + if len(addrListen) == 0 { //Listen on all addresses, try finding out from mi connection + if localAddr := osm.miConn.LocallAddr(); localAddr != nil { + addrListen = strings.Split(localAddr.String(), ":")[0] + } + } + for eventName := range osm.eventHandlers { + if eventName == "E_OPENSIPS_START" { // Do not subscribe for start since this should be hardcoded + continue + } + cmd := fmt.Sprintf(":event_subscribe:\n%s\nudp:%s:%s\n%d\n", eventName, addrListen, portListen, int(subscribeInterval.Seconds())) + success := false + for attempts := 0; attempts < osm.cfg.Reconnects; attempts++ { + if reply, err := osm.miConn.SendCommand([]byte(cmd)); err == nil && bytes.HasPrefix(reply, []byte("200 OK")) { + success = true + break + } + time.Sleep(time.Duration((attempts+1)/2) * time.Second) // Allow OpenSIPS to recover from errors + continue // Try again + } + if !success { + engine.Logger.Err(fmt.Sprintf(" Shutting down, failed subscribing to OpenSIPS at address: <%s>", osm.cfg.MiAddr)) + close(osm.stopServing) // Do not serve anymore since we got errors on subscribing + return errors.New("Failed subscribing to OpenSIPS events") + } + } + return nil +} + func (osm *OsipsSessionManager) onOpensipsStart(cdrDagram *osipsdagram.OsipsEvent) { - close(*osm.evSubscribeStop) // Cancel previous subscribes - evStop := make(chan struct{}) - osm.evSubscribeStop = &evStop - go osm.SubscribeEvents(evStop) + osm.evSubscribeStop <- struct{}{} // Cancel previous subscribes + osm.evSubscribeStop = make(chan struct{}) // Create a fresh communication channel + go osm.SubscribeEvents(osm.evSubscribeStop) } func (osm *OsipsSessionManager) onCdr(cdrDagram *osipsdagram.OsipsEvent) { @@ -209,93 +212,3 @@ func (osm *OsipsSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error { var reply string return osm.cdrsrv.ProcessCdr(storedCdr, &reply) } - -// Process Authorize request from OpenSIPS and communicate back maxdur -func (osm *OsipsSessionManager) onAuthorize(osipsDagram *osipsdagram.OsipsEvent) { - ev, _ := NewOsipsEvent(osipsDagram) - if ev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request - return - } - if ev.MissingParameter() { - cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_MANDATORY_IE_MISSING) - if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) { - engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply))) - } - return - } - var maxCallDuration time.Duration // This will be the maximum duration this channel will be allowed to last - var durInitialized bool - attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), - Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} - var dcs utils.DerivedChargers - if err := osm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil { - engine.Logger.Err(fmt.Sprintf(" onAuthorize: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error())) - cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_SERVER_ERROR) - if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) { - engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply))) - } - return - } - dcs, _ = dcs.AppendDefaultRun() - for _, dc := range dcs { - runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP) - matchingAllFilters := true - for _, dcRunFilter := range runFilters { - if fltrPass, _ := ev.PassesFieldFilter(dcRunFilter); !fltrPass { - matchingAllFilters = false - break - } - } - if !matchingAllFilters { // Do not process the derived charger further if not all filters were matched - continue - } - startTime, err := ev.GetSetupTime(utils.META_DEFAULT) - if err != nil { - engine.Logger.Err("Error parsing answer event start time, using time.Now!") - startTime = time.Now() - } - cd := engine.CallDescriptor{ - Direction: ev.GetDirection(dc.DirectionField), - Tenant: ev.GetTenant(dc.TenantField), - Category: ev.GetCategory(dc.CategoryField), - Subject: ev.GetSubject(dc.SubjectField), - Account: ev.GetAccount(dc.AccountField), - Destination: ev.GetDestination(dc.DestinationField), - TimeStart: startTime, - TimeEnd: startTime.Add(osm.cfg.MaxCallDuration), - } - var remainingDurationFloat float64 - err = osm.rater.GetMaxSessionTime(cd, &remainingDurationFloat) - if err != nil { - engine.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err)) - cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_SERVER_ERROR) - if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) { - engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply))) - } - return - } - remainingDuration := time.Duration(remainingDurationFloat) - // Set maxCallDuration, smallest out of all forked sessions - if !durInitialized { // first time we set it /not initialized yet - maxCallDuration = remainingDuration - durInitialized = true - } else if maxCallDuration > remainingDuration { - maxCallDuration = remainingDuration - } - } - if maxCallDuration <= osm.cfg.MinCallDuration { - cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), OSIPS_INSUFFICIENT_FUNDS) - if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) { - engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply))) - } - return - } - cmdMaxDur := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_maxdur\n%d\n\n", ev.GetUUID(), int(maxCallDuration.Seconds())) - if reply, err := osm.miConn.SendCommand([]byte(cmdMaxDur)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) { - engine.Logger.Err(fmt.Sprintf("Failed setting cgr_maxdur variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply))) - } - cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n", ev.GetUUID(), OSIPS_AUTH_OK) - if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) { - engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply))) - } -} diff --git a/utils/coreutils.go b/utils/coreutils.go index 10e329701..e1896aaee 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -162,6 +162,8 @@ func ParseTimeDetectLayout(tmStr string) (time.Time, error) { return time.Parse("20060102150405", tmStr) case oneSpaceTimestampRule.MatchString(tmStr): return time.Parse("02.01.2006 15:04:05", tmStr) + case tmStr == "*now": + return time.Now(), nil } return nilTime, errors.New("Unsupported time format") } diff --git a/utils/utils_test.go b/utils/utils_test.go index 8fdabdb36..4f2b2a580 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -219,6 +219,11 @@ func TestParseTimeDetectLayout(t *testing.T) { } else if !tsTm.Equal(expectedTime) { t.Errorf("Unexpected time parsed: %v, expecting: %v", tsTm, expectedTime) } + if nowTm, err := ParseTimeDetectLayout(META_NOW); err != nil { + t.Error(err) + } else if time.Now().Sub(nowTm) > time.Duration(1)*time.Millisecond { + t.Errorf("Unexpected time parsed: %v", nowTm) + } } func TestParseDateUnix(t *testing.T) {