diff --git a/apier/v1/callsetup.go b/apier/v1/callsetup.go
index e1e8c3c97..0e8f96202 100644
--- a/apier/v1/callsetup.go
+++ b/apier/v1/callsetup.go
@@ -19,14 +19,48 @@ along with this program. If not, see
package v1
import (
+ "fmt"
"github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+ "strconv"
"time"
)
// Returns MaxSessionTime in seconds, -1 for no limit
-func (self *ApierV1) GetMaxSessionTime(cdr engine.StoredCdr, maxSessionTime *float64) error {
+func (self *ApierV1) GetMaxSessionTime(auth engine.MaxUsageReq, maxSessionTime *float64) error {
+ if auth.TOR == "" {
+ auth.TOR = utils.VOICE
+ }
+ if auth.ReqType == "" {
+ auth.ReqType = self.Config.DefaultReqType
+ }
+ if auth.Direction == "" {
+ auth.Direction = utils.OUT
+ }
+ if auth.Tenant == "" {
+ auth.Tenant = self.Config.DefaultTenant
+ }
+ if auth.Category == "" {
+ auth.Category = self.Config.DefaultCategory
+ }
+ if auth.Subject == "" {
+ auth.Subject = auth.Account
+ }
+ if auth.Subject == "" {
+ auth.Subject = auth.Account
+ }
+ if auth.SetupTime == "" {
+ auth.SetupTime = utils.META_NOW
+ }
+ if auth.Usage == "" {
+ auth.Usage = strconv.FormatFloat(self.Config.MaxCallDuration.Seconds(), 'f', -1, 64)
+ }
+ storedCdr, err := auth.AsStoredCdr()
+ if err != nil {
+ return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
+ }
var maxDur float64
- if err := self.Responder.GetDerivedMaxSessionTime(cdr, &maxDur); err != nil {
+ if err := self.Responder.GetDerivedMaxSessionTime(*storedCdr, &maxDur); err != nil {
return err
}
if maxDur == -1.0 {
diff --git a/data/opensips/etc/opensips/opensips.cfg b/data/opensips/etc/opensips/opensips.cfg
deleted file mode 100644
index 559945c8a..000000000
--- a/data/opensips/etc/opensips/opensips.cfg
+++ /dev/null
@@ -1,362 +0,0 @@
-
-#
-# $Id$
-#
-# OpenSIPS residential configuration script
-# by OpenSIPS Solutions
-#
-# This script was generated via "make menuconfig", from
-# the "Residential" scenario.
-# You can enable / disable more features / functionalities by
-# re-generating the scenario with different options.#
-#
-# Please refer to the Core CookBook at:
-# http://www.opensips.org/Resources/DocsCookbooks
-# for a explanation of possible statements, functions and parameters.
-#
-
-
-####### Global Parameters #########
-
-debug=3
-log_stderror=no
-log_facility=LOG_LOCAL0
-
-fork=yes
-children=4
-listen=udp:lo:5060
-listen=udp:eth0:5060
-
-auto_aliases=no
-
-#disable_tcp=yes
-
-#disable_tls=yes
-
-
-####### Modules Section ########
-
-#set module path
-mpath="/usr/lib/opensips/modules"
-
-#### SIGNALING module
-loadmodule "signaling.so"
-
-#### StateLess module
-loadmodule "sl.so"
-
-#### Transaction Module
-loadmodule "tm.so"
-modparam("tm", "fr_timeout", 5)
-modparam("tm", "fr_inv_timeout", 30)
-modparam("tm", "restart_fr_on_each_reply", 0)
-modparam("tm", "onreply_avp_mode", 1)
-
-#### Record Route Module
-loadmodule "rr.so"
-/* do not append from tag to the RR (no need for this script) */
-modparam("rr", "append_fromtag", 0)
-
-#### MAX ForWarD module
-loadmodule "maxfwd.so"
-
-#### SIP MSG OPerationS module
-loadmodule "sipmsgops.so"
-
-#### FIFO Management Interface
-loadmodule "mi_fifo.so"
-modparam("mi_fifo", "fifo_name", "/tmp/opensips_fifo")
-modparam("mi_fifo", "fifo_mode", 0666)
-
-loadmodule "mi_datagram.so"
-modparam("mi_datagram", "socket_name", "udp:127.0.0.1:8020")
-
-
-#### Eventdatagram module
-loadmodule "event_datagram.so"
-
-
-#### URI module
-loadmodule "uri.so"
-modparam("uri", "use_uri_table", 0)
-
-
-#### USeR LOCation module
-loadmodule "usrloc.so"
-modparam("usrloc", "nat_bflag", "NAT")
-modparam("usrloc", "db_mode", 0)
-
-#### REGISTRAR module
-loadmodule "registrar.so"
-modparam("registrar", "tcp_persistent_flag", "TCP_PERSISTENT")
-
-/* uncomment the next line not to allow more than 10 contacts per AOR */
-#modparam("registrar", "max_contacts", 10)
-
-#### DIALOG module
-loadmodule "dialog.so"
-modparam("dialog", "dlg_match_mode", 1)
-modparam("dialog", "default_timeout", 21600) # 6 hours timeout
-modparam("dialog", "db_mode", 0)
-
-
-#### ACCounting module
-loadmodule "acc.so"
-/* what special events should be accounted ? */
-modparam("acc", "early_media", 0)
-modparam("acc", "report_cancels", 0)
-modparam("acc", "cdr_flag", "CDR")
-modparam("acc", "evi_flag", "CDR")
-modparam("acc", "evi_missed_flag", "CDR")
-modparam("acc", "evi_extra",
- "cgr_reqtype=$avp(cgr_reqtype);
- cgr_account=$avp(cgr_account);
- cgr_subject=$avp(cgr_subject);
- cgr_destination=$avp(cgr_destination);
- originalUri=$ou")
-
-#### CfgUtils module
-loadmodule "cfgutils.so"
-
-#### CacheDB Local
-loadmodule "cachedb_local.so"
-
-#### UDP protocol
-loadmodule "proto_udp.so"
-
-
-####### Routing Logic ########
-
-startup_route {
- subscribe_event("E_OPENSIPS_START", "udp:127.0.0.1:2020");
- raise_event("E_OPENSIPS_START");
-}
-
-# main request routing logic
-
-route{
-
-
- if (!mf_process_maxfwd_header("10")) {
- sl_send_reply("483","Too Many Hops");
- exit;
- }
-
- if (has_totag()) {
- # sequential request withing a dialog should
- # take the path determined by record-routing
- if (loose_route()) {
- if (is_method("BYE")) {
- #setflag(CDR); # do accounting ...
- } else if (is_method("INVITE")) {
- # even if in most of the cases is useless, do RR for
- # re-INVITEs alos, as some buggy clients do change route set
- # during the dialog.
- record_route();
- }
-
-
-
- # route it out to whatever destination was set by loose_route()
- # in $du (destination URI).
- route(relay);
- } else {
-
- if ( is_method("ACK") ) {
- if ( t_check_trans() ) {
- # non loose-route, but stateful ACK; must be an ACK after
- # a 487 or e.g. 404 from upstream server
- t_relay();
- exit;
- } else {
- # ACK without matching transaction ->
- # ignore and discard
- exit;
- }
- }
- sl_send_reply("404","Not here");
- }
- exit;
- }
-
- # CANCEL processing
- if (is_method("CANCEL"))
- {
- if (t_check_trans())
- t_relay();
- exit;
- }
-
- t_check_trans();
-
- if ( !(is_method("REGISTER") ) ) {
-
- if (from_uri==myself)
-
- {
-
- } else {
- # if caller is not local, then called number must be local
-
- if (!uri==myself) {
- send_reply("403","Relay forbidden");
- exit;
- }
- }
-
- }
-
- # preloaded route checking
- if (loose_route()) {
- xlog("L_ERR",
- "Attempt to route with preloaded Route's [$fu/$tu/$ru/$ci]");
- if (!is_method("ACK"))
- sl_send_reply("403","Preload Route denied");
- exit;
- }
-
- # record routing
- if (!is_method("REGISTER|MESSAGE"))
- record_route();
-
- # account only INVITEs
- if (is_method("INVITE")) {
- # create dialog with timeout
- if ( !create_dialog("B") ) {
- send_reply("500","Internal Server Error");
- exit;
- }
- setflag(CDR);
- $avp(cgr_reqtype)="*pseudoprepaid";
- $avp(cgr_account)=$fU;
- $avp(cgr_subject)=$fU;
- $avp(cgr_destination)=$rU;
- if $avp(cgr_reqtype)=="*pseudoprepaid" || $avp(cgr_reqtype)=="*prepaid" { #Make sure we got enough balance for the call
- $avp(auth_keys) = "cgr_reqtype";
- $avp(auth_vals) = $avp(cgr_reqtype);
- $avp(auth_keys) = "callid";
- $avp(auth_vals) = $ci;
- $avp(auth_keys) = "from_tag";
- $avp(auth_vals) = $ft;
- $avp(auth_keys) = "cgr_account";
- $avp(auth_vals) = $avp(cgr_account);
- $avp(auth_keys) = "cgr_subject";
- $avp(auth_vals) = $avp(cgr_subject);
- $avp(auth_keys) = "cgr_destination";
- $avp(auth_vals) = $avp(cgr_destination);
- $avp(auth_keys) = "created";
- $avp(auth_vals) = $Ts;
- raise_event("E_CGR_AUTHORIZE", $avp(auth_keys), $avp(auth_vals));
- $var(accid) = $ci+";"+$ft+";";
- $var(rply_cgr_notify) = $var(accid)+"/"+"cgr_notify"; #Key in localcache for cgr_notify
- $var(rply_cgr_maxdur) = $var(accid)+"/"+"cgr_maxdur"; #Key in localcache for cgr_maxdur
- $var(ms) = 0;
- while($var(ms) < 1000) { # Check for values set every 10 ms for maximum 1 second
- if cache_fetch("local", "$var(rply_cgr_notify)", $avp(cgr_notify) ) $var(ms) = 1000; # Break out
- $var(ms) = $var(ms) + 10;
- usleep("10");
- }
- if $avp(cgr_notify) == NULL { # Cannot check it in switch
- xlog("Checked cgr_notify variable $var(rply_cgr_notify) got value: $avp(cgr_notify)");
- sl_send_reply("503","Prepaid controller error");
- exit;
- }
- switch ($avp(cgr_notify)) {
- case "SERVER_ERROR":
- sl_send_reply("503","Prepaid controller error");
- exit;
- case "INSUFFICENT_FUNDS":
- sl_send_reply("403", "Payment required");
- exit;
- }
- if !cache_fetch("local", "$var(rply_cgr_maxdur)", $avp(cgr_maxdur) ) {
- sl_send_reply("503","Prepaid controller error on maxdur");
- exit;
- }
- $DLG_timeout=$avp(cgr_maxdur);
- }
- }
-
- if (!uri==myself) {
- append_hf("P-hint: outbound\r\n");
-
- route(relay);
- }
-
- # requests for my domain
-
- if (is_method("PUBLISH|SUBSCRIBE"))
- {
- sl_send_reply("503", "Service Unavailable");
- exit;
- }
-
- if (is_method("REGISTER"))
- {
-
-
- if ( 0 ) setflag(TCP_PERSISTENT);
-
- if (!save("location"))
- sl_reply_error();
-
- exit;
- }
-
- if ($rU==NULL) {
- # request with no Username in RURI
- sl_send_reply("484","Address Incomplete");
- exit;
- }
-
-
- # do lookup with method filtering
- if (!lookup("location","m")) {
-
-
- t_newtran();
- t_reply("404", "Not Found");
- exit;
- }
-
- route(relay);
-}
-
-
-route[relay] {
- # for INVITEs enable some additional helper routes
- if (is_method("INVITE")) {
-
-
-
- #t_on_branch("per_branch_ops");
- #t_on_reply("handle_nat");
- t_on_failure("missed_call");
- }
-
-
-
- if (!t_relay()) {
- send_reply("500","Internal Error");
- };
- exit;
-}
-
-
-
-
-failure_route[missed_call] {
- if (t_was_cancelled()) {
- exit;
- }
-
- # uncomment the following lines if you want to block client
- # redirect based on 3xx replies.
- ##if (t_check_status("3[0-9][0-9]")) {
- ##t_reply("404","Not found");
- ## exit;
- ##}
-
-
-}
-
diff --git a/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json b/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json
index 83b03e55e..9b9b4987c 100644
--- a/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json
+++ b/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json
@@ -70,9 +70,6 @@
"listen_udp": "127.0.0.1:2020", // address where to listen for datagram events coming from OpenSIPS
"rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013>
"cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234>
- "debit_interval": "5s", // interval to perform debits on.
- "min_call_duration": "0s", // only authorize calls with allowed duration higher than this
- "max_call_duration": "3h", // maximum call duration a prepaid call can last
"events_subscribe_interval": "60s", // automatic events subscription to OpenSIPS, 0 to disable it
"mi_addr": "127.0.0.1:8020", // address where to reach OpenSIPS MI to send session disconnects
},
diff --git a/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg b/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg
index a7961c6d2..c0ec842a1 100644
--- a/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg
+++ b/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg
@@ -103,7 +103,7 @@ modparam("acc", "report_cancels", 0)
modparam("acc", "cdr_flag", "CDR")
modparam("acc", "evi_flag", "CDR")
modparam("acc", "evi_missed_flag", "CDR")
-#modparam("acc", "acc_created_avp_name", "created_avp")
+modparam("acc", "acc_created_avp_name", "created_avp")
modparam("acc", "evi_extra",
"cgr_reqtype=$avp(cgr_reqtype);
cgr_account=$avp(cgr_account);
@@ -120,6 +120,12 @@ loadmodule "cachedb_local.so"
#### UDP protocol
loadmodule "proto_udp.so"
+#### Rest client
+loadmodule "rest_client.so"
+
+#### JSON parser
+loadmodule "json.so"
+
####### Routing Logic ########
@@ -223,12 +229,7 @@ route{
exit;
}
setflag(CDR);
- #acc_evi_request("DIALOG_START");
- $avp(cgr_reqtype)="*pseudoprepaid";
- $avp(cgr_account)=$fU;
- $avp(cgr_subject)=$fU;
- $avp(cgr_destination)=$rU;
- route(CGR_AUTH);
+ route(CGR_AUTH_REQ);
}
if (!uri==myself) {
@@ -264,62 +265,48 @@ route{
t_newtran();
t_reply("404", "Not Found");
exit;
- }
+ }
route(relay);
}
-route[CGR_AUTH] {
- if $avp(cgr_reqtype)=="*pseudoprepaid" || $avp(cgr_reqtype)=="*prepaid" { #Make sure we got enough balance for the call
- $avp(auth_keys) = "cgr_reqtype";
- $avp(auth_vals) = $avp(cgr_reqtype);
- $avp(auth_keys) = "callid";
- $avp(auth_vals) = $ci;
- $avp(auth_keys) = "from_tag";
- $avp(auth_vals) = $ft;
- $avp(auth_keys) = "cgr_account";
- $avp(auth_vals) = $avp(cgr_account);
- $avp(auth_keys) = "cgr_subject";
- $avp(auth_vals) = $avp(cgr_subject);
- $avp(auth_keys) = "cgr_destination";
- $avp(auth_vals) = $avp(cgr_destination);
- $avp(auth_keys) = "created";
- $avp(auth_vals) = $Ts;
- raise_event("E_CGR_AUTHORIZE", $avp(auth_keys), $avp(auth_vals));
- $var(accid) = $ci+";"+$ft+";";
- $var(rply_cgr_notify) = $var(accid)+"/"+"cgr_notify"; #Key in localcache for cgr_notify
- $var(rply_cgr_maxdur) = $var(accid)+"/"+"cgr_maxdur"; #Key in localcache for cgr_maxdur
- $var(ms) = 0;
- while($var(ms) < 1000) { # Check for values set every 10 ms for maximum 1 second
- if cache_fetch("local", "$var(rply_cgr_notify)", $avp(cgr_notify) ) $var(ms) = 1000; # Break out
- $var(ms) = $var(ms) + 10;
- usleep("10");
- }
- if $avp(cgr_notify) == NULL { # Cannot check it in switch
- xlog("Checked cgr_notify variable $var(rply_cgr_notify) got value: $avp(cgr_notify)");
- sl_send_reply("503","Prepaid controller error");
- exit;
- }
- switch ($avp(cgr_notify)) {
- case "SERVER_ERROR":
- sl_send_reply("503","Prepaid controller error");
- exit;
- case "INSUFFICENT_FUNDS":
- sl_send_reply("403", "Payment required");
- exit;
- }
- if !cache_fetch("local", "$var(rply_cgr_maxdur)", $avp(cgr_maxdur) ) {
- sl_send_reply("503","Prepaid controller error on maxdur");
- exit;
- }
- $DLG_timeout=$avp(cgr_maxdur);
- }
+## Send AUTH request to CGRateS engine
+route[CGR_AUTH_REQ] {
+ $avp(cgr_reqtype)="*pseudoprepaid";
+ $avp(cgr_account)=$fU;
+ $avp(cgr_destination)=$rU;
+
+ # Code to produce the json
+ $json(cgr_auth) := "{}";
+ $json(cgr_auth/id) = "1";
+ $json(cgr_auth/method) = "ApierV1.GetMaxSessionTime";
+ $json(cgr_auth/params) := "[{}]";
+ $json(cgr_auth/params[0]/ReqType) = $avp(cgr_reqtype);
+ $json(cgr_auth/params[0]/Account) = $avp(cgr_account);
+ $json(cgr_auth/params[0]/Destination) = $avp(cgr_destination);
+ async(rest_post("http://127.0.0.1:2080/jsonrpc", "$json(cgr_auth)", "application/json", "$avp(cgr_auth_reply)", "$var(ct)", "$var(rcode)"), CGR_AUTH_REPLY);
+}
+
+## Process answer received from CGRateS engine
+route[CGR_AUTH_REPLY] {
+ $json(cgr_auth_reply) := $avp(cgr_auth_reply);
+ xlog("In CGR_AUTH_REPLY, reply received: $json(cgr_auth_reply)");
+ if $json(cgr_auth_reply/error) != NULL {
+ xlog("Error received from CGRateS: $json(cgr_auth_reply/error)");
+ sl_send_reply("503","Charging controller error");
+ exit;
+ }
+ if $json(cgr_auth_reply/result) > -1 {
+ $DLG_timeout=$json(cgr_auth_reply/result);
+ }
+ route(relay);
}
route[relay] {
# for INVITEs enable some additional helper routes
- if (is_method("INVITE")) {
+ if (is_method("INVITE") && !has_totag()) {
+ t_on_reply("invite_reply");
t_on_failure("missed_call");
}
if (!t_relay()) {
@@ -328,7 +315,10 @@ route[relay] {
exit;
}
-
+onreply_route[invite_reply] {
+ xlog("incoming reply\n");
+ #acc_evi_request("DIALOG_START");
+}
failure_route[missed_call] {
diff --git a/engine/storedcdr.go b/engine/storedcdr.go
index 30e6669cf..6ffe9eda3 100644
--- a/engine/storedcdr.go
+++ b/engine/storedcdr.go
@@ -567,3 +567,34 @@ type ExternalCdr struct {
CostDetails string
Rated bool // Mark the CDR as rated so we do not process it during mediation
}
+
+// Used when authorizing requests from outside, eg ApierV1.GetMaxSessionTime
+type MaxUsageReq struct {
+ TOR string
+ ReqType string
+ Direction string
+ Tenant string
+ Category string
+ Account string
+ Subject string
+ Destination string
+ SetupTime string
+ AnswerTime string
+ Usage string
+}
+
+func (self *MaxUsageReq) AsStoredCdr() (*StoredCdr, error) {
+ var err error
+ storedCdr := &StoredCdr{TOR: self.TOR, ReqType: self.ReqType, Direction: self.Direction, Tenant: self.Tenant, Category: self.Category,
+ Account: self.Account, Subject: self.Subject, Destination: self.Destination}
+ if storedCdr.SetupTime, err = utils.ParseTimeDetectLayout(self.SetupTime); err != nil {
+ return nil, err
+ }
+ if storedCdr.AnswerTime, err = utils.ParseTimeDetectLayout(self.AnswerTime); err != nil {
+ return nil, err
+ }
+ if storedCdr.Usage, err = utils.ParseDurationWithSecs(self.Usage); err != nil {
+ return nil, err
+ }
+ return storedCdr, nil
+}
diff --git a/engine/storedcdr_test.go b/engine/storedcdr_test.go
index acabc9e3c..b06668d24 100644
--- a/engine/storedcdr_test.go
+++ b/engine/storedcdr_test.go
@@ -512,3 +512,18 @@ func TestStoredCdrEventFields(t *testing.T) {
t.Error("Received: ", extraFlds)
}
}
+
+func TestMaxUsageReqAsStoredCdr(t *testing.T) {
+ setupReq := &MaxUsageReq{TOR: utils.VOICE, ReqType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org", Category: "call",
+ Account: "1001", Subject: "1001", Destination: "1002",
+ SetupTime: "2013-11-07T08:42:20Z", AnswerTime: "2013-11-07T08:42:26Z", Usage: "0.00000001",
+ }
+ eStorCdr := &StoredCdr{TOR: utils.VOICE, ReqType: utils.META_RATED, Direction: "*out",
+ Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002",
+ SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), Usage: time.Duration(10)}
+ if storedCdr, err := setupReq.AsStoredCdr(); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(eStorCdr, storedCdr) {
+ t.Errorf("Expected: %+v, received: %+v", eStorCdr, storedCdr)
+ }
+}
diff --git a/sessionmanager/osipssm.go b/sessionmanager/osipssm.go
index 01c1d3fd9..7adabc3ea 100644
--- a/sessionmanager/osipssm.go
+++ b/sessionmanager/osipssm.go
@@ -22,13 +22,11 @@ import (
"bytes"
"errors"
"fmt"
- "strings"
- "time"
-
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
- "github.com/cgrates/cgrates/utils"
"github.com/cgrates/osipsdagram"
+ "strings"
+ "time"
)
/*
@@ -85,7 +83,6 @@ func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, rater, cdrsrv engi
osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){
"E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart},
"E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr},
- "E_CGR_AUTHORIZE": []func(*osipsdagram.OsipsEvent){osm.onAuthorize},
"E_ACC_EVENT": []func(*osipsdagram.OsipsEvent){osm.onCdr},
"E_ACC_MISSED_EVENT": []func(*osipsdagram.OsipsEvent){osm.onCdr},
}
@@ -97,7 +94,7 @@ type OsipsSessionManager struct {
rater engine.Connector
cdrsrv engine.Connector
eventHandlers map[string][]func(*osipsdagram.OsipsEvent)
- evSubscribeStop *chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it
+ evSubscribeStop chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it
stopServing chan struct{} // Stop serving datagrams
miConn *osipsdagram.OsipsMiDatagramConnector // Pool of connections used to various OpenSIPS servers, keep reference towards events received so we can issue commands always to the same remote
}
@@ -107,10 +104,9 @@ func (osm *OsipsSessionManager) Connect() (err error) {
if osm.miConn, err = osipsdagram.NewOsipsMiDatagramConnector(osm.cfg.MiAddr, osm.cfg.Reconnects); err != nil {
return fmt.Errorf("Cannot connect to OpenSIPS at %s, error: %s", osm.cfg.MiAddr, err.Error())
}
- evSubscribeStop := make(chan struct{})
- osm.evSubscribeStop = &evSubscribeStop
- defer close(*osm.evSubscribeStop) // Stop subscribing on disconnect
- go osm.SubscribeEvents(evSubscribeStop)
+ osm.evSubscribeStop = make(chan struct{})
+ defer func() { osm.evSubscribeStop <- struct{}{} }() // Stop subscribing on disconnect
+ go osm.SubscribeEvents(osm.evSubscribeStop)
evsrv, err := osipsdagram.NewEventServer(osm.cfg.ListenUdp, osm.eventHandlers)
if err != nil {
engine.Logger.Err(fmt.Sprintf(" Cannot initialize datagram server, error: <%s>", err.Error()))
@@ -147,54 +143,61 @@ func (osm *OsipsSessionManager) Shutdown() error {
return nil
}
-// Event Handlers
-
// Automatic subscribe to OpenSIPS for events, trigered on Connect or OpenSIPS restart
func (osm *OsipsSessionManager) SubscribeEvents(evStop chan struct{}) error {
+ if err := osm.subscribeEvents(); err != nil { // Init subscribe
+ return err
+ }
for {
select {
case <-evStop: // Break this loop from outside
return nil
- default:
- subscribeInterval := osm.cfg.EventsSubscribeInterval + time.Duration(1)*time.Second // Avoid concurrency on expiry
- listenAddrSplt := strings.Split(osm.cfg.ListenUdp, ":")
- portListen := listenAddrSplt[1]
- addrListen := listenAddrSplt[0]
- if len(addrListen) == 0 { //Listen on all addresses, try finding out from mi connection
- if localAddr := osm.miConn.LocallAddr(); localAddr != nil {
- addrListen = strings.Split(localAddr.String(), ":")[0]
- }
+ case <-time.After(osm.cfg.EventsSubscribeInterval): // Subscribe on interval
+ if err := osm.subscribeEvents(); err != nil {
+ return err
}
- for eventName := range osm.eventHandlers {
- if eventName == "E_OPENSIPS_START" { // Do not subscribe for start since this should be hardcoded
- continue
- }
- cmd := fmt.Sprintf(":event_subscribe:\n%s\nudp:%s:%s\n%d\n", eventName, addrListen, portListen, int(subscribeInterval.Seconds()))
- success := false
- for attempts := 0; attempts < osm.cfg.Reconnects; attempts++ {
- if reply, err := osm.miConn.SendCommand([]byte(cmd)); err == nil && bytes.HasPrefix(reply, []byte("200 OK")) {
- success = true
- break
- }
- time.Sleep(time.Duration((attempts+1)/2) * time.Second) // Allow OpenSIPS to recover from errors
- continue // Try again
- }
- if !success {
- engine.Logger.Err(fmt.Sprintf(" Shutting down, failed subscribing to OpenSIPS at address: <%s>", osm.cfg.MiAddr))
- close(osm.stopServing) // Do not serve anymore since we got errors on subscribing
- return errors.New("Failed subscribing to OpenSIPS events")
- }
- }
- time.Sleep(osm.cfg.EventsSubscribeInterval)
}
}
}
+// One subscribe attempt to OpenSIPS
+func (osm *OsipsSessionManager) subscribeEvents() error {
+ subscribeInterval := osm.cfg.EventsSubscribeInterval + time.Duration(1)*time.Second // Avoid concurrency on expiry
+ listenAddrSplt := strings.Split(osm.cfg.ListenUdp, ":")
+ portListen := listenAddrSplt[1]
+ addrListen := listenAddrSplt[0]
+ if len(addrListen) == 0 { //Listen on all addresses, try finding out from mi connection
+ if localAddr := osm.miConn.LocallAddr(); localAddr != nil {
+ addrListen = strings.Split(localAddr.String(), ":")[0]
+ }
+ }
+ for eventName := range osm.eventHandlers {
+ if eventName == "E_OPENSIPS_START" { // Do not subscribe for start since this should be hardcoded
+ continue
+ }
+ cmd := fmt.Sprintf(":event_subscribe:\n%s\nudp:%s:%s\n%d\n", eventName, addrListen, portListen, int(subscribeInterval.Seconds()))
+ success := false
+ for attempts := 0; attempts < osm.cfg.Reconnects; attempts++ {
+ if reply, err := osm.miConn.SendCommand([]byte(cmd)); err == nil && bytes.HasPrefix(reply, []byte("200 OK")) {
+ success = true
+ break
+ }
+ time.Sleep(time.Duration((attempts+1)/2) * time.Second) // Allow OpenSIPS to recover from errors
+ continue // Try again
+ }
+ if !success {
+ engine.Logger.Err(fmt.Sprintf(" Shutting down, failed subscribing to OpenSIPS at address: <%s>", osm.cfg.MiAddr))
+ close(osm.stopServing) // Do not serve anymore since we got errors on subscribing
+ return errors.New("Failed subscribing to OpenSIPS events")
+ }
+ }
+ return nil
+}
+
func (osm *OsipsSessionManager) onOpensipsStart(cdrDagram *osipsdagram.OsipsEvent) {
- close(*osm.evSubscribeStop) // Cancel previous subscribes
- evStop := make(chan struct{})
- osm.evSubscribeStop = &evStop
- go osm.SubscribeEvents(evStop)
+ osm.evSubscribeStop <- struct{}{} // Cancel previous subscribes
+ osm.evSubscribeStop = make(chan struct{}) // Create a fresh communication channel
+ go osm.SubscribeEvents(osm.evSubscribeStop)
}
func (osm *OsipsSessionManager) onCdr(cdrDagram *osipsdagram.OsipsEvent) {
@@ -209,93 +212,3 @@ func (osm *OsipsSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error {
var reply string
return osm.cdrsrv.ProcessCdr(storedCdr, &reply)
}
-
-// Process Authorize request from OpenSIPS and communicate back maxdur
-func (osm *OsipsSessionManager) onAuthorize(osipsDagram *osipsdagram.OsipsEvent) {
- ev, _ := NewOsipsEvent(osipsDagram)
- if ev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
- return
- }
- if ev.MissingParameter() {
- cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_MANDATORY_IE_MISSING)
- if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
- engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
- }
- return
- }
- var maxCallDuration time.Duration // This will be the maximum duration this channel will be allowed to last
- var durInitialized bool
- attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT),
- Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
- var dcs utils.DerivedChargers
- if err := osm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil {
- engine.Logger.Err(fmt.Sprintf(" onAuthorize: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
- cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_SERVER_ERROR)
- if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
- engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
- }
- return
- }
- dcs, _ = dcs.AppendDefaultRun()
- for _, dc := range dcs {
- runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP)
- matchingAllFilters := true
- for _, dcRunFilter := range runFilters {
- if fltrPass, _ := ev.PassesFieldFilter(dcRunFilter); !fltrPass {
- matchingAllFilters = false
- break
- }
- }
- if !matchingAllFilters { // Do not process the derived charger further if not all filters were matched
- continue
- }
- startTime, err := ev.GetSetupTime(utils.META_DEFAULT)
- if err != nil {
- engine.Logger.Err("Error parsing answer event start time, using time.Now!")
- startTime = time.Now()
- }
- cd := engine.CallDescriptor{
- Direction: ev.GetDirection(dc.DirectionField),
- Tenant: ev.GetTenant(dc.TenantField),
- Category: ev.GetCategory(dc.CategoryField),
- Subject: ev.GetSubject(dc.SubjectField),
- Account: ev.GetAccount(dc.AccountField),
- Destination: ev.GetDestination(dc.DestinationField),
- TimeStart: startTime,
- TimeEnd: startTime.Add(osm.cfg.MaxCallDuration),
- }
- var remainingDurationFloat float64
- err = osm.rater.GetMaxSessionTime(cd, &remainingDurationFloat)
- if err != nil {
- engine.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err))
- cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_SERVER_ERROR)
- if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
- engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
- }
- return
- }
- remainingDuration := time.Duration(remainingDurationFloat)
- // Set maxCallDuration, smallest out of all forked sessions
- if !durInitialized { // first time we set it /not initialized yet
- maxCallDuration = remainingDuration
- durInitialized = true
- } else if maxCallDuration > remainingDuration {
- maxCallDuration = remainingDuration
- }
- }
- if maxCallDuration <= osm.cfg.MinCallDuration {
- cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), OSIPS_INSUFFICIENT_FUNDS)
- if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
- engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
- }
- return
- }
- cmdMaxDur := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_maxdur\n%d\n\n", ev.GetUUID(), int(maxCallDuration.Seconds()))
- if reply, err := osm.miConn.SendCommand([]byte(cmdMaxDur)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
- engine.Logger.Err(fmt.Sprintf("Failed setting cgr_maxdur variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
- }
- cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n", ev.GetUUID(), OSIPS_AUTH_OK)
- if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
- engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))
- }
-}
diff --git a/utils/coreutils.go b/utils/coreutils.go
index 10e329701..e1896aaee 100644
--- a/utils/coreutils.go
+++ b/utils/coreutils.go
@@ -162,6 +162,8 @@ func ParseTimeDetectLayout(tmStr string) (time.Time, error) {
return time.Parse("20060102150405", tmStr)
case oneSpaceTimestampRule.MatchString(tmStr):
return time.Parse("02.01.2006 15:04:05", tmStr)
+ case tmStr == "*now":
+ return time.Now(), nil
}
return nilTime, errors.New("Unsupported time format")
}
diff --git a/utils/utils_test.go b/utils/utils_test.go
index 8fdabdb36..4f2b2a580 100644
--- a/utils/utils_test.go
+++ b/utils/utils_test.go
@@ -219,6 +219,11 @@ func TestParseTimeDetectLayout(t *testing.T) {
} else if !tsTm.Equal(expectedTime) {
t.Errorf("Unexpected time parsed: %v, expecting: %v", tsTm, expectedTime)
}
+ if nowTm, err := ParseTimeDetectLayout(META_NOW); err != nil {
+ t.Error(err)
+ } else if time.Now().Sub(nowTm) > time.Duration(1)*time.Millisecond {
+ t.Errorf("Unexpected time parsed: %v", nowTm)
+ }
}
func TestParseDateUnix(t *testing.T) {