mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Merge branch 'master' into shared_balances
Conflicts: apier/v1/apier_local_test.go apier/v1/tutfscsv_local_test.go cmd/cgr-engine/cgr-engine.go engine/storage_map.go engine/storage_mongo.go engine/storage_redis.go
This commit is contained in:
@@ -21,8 +21,8 @@ package apier
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -33,10 +33,10 @@ type AttrAcntAction struct {
|
||||
}
|
||||
|
||||
type AccountActionTiming struct {
|
||||
Id string // The id to reference this particular ActionTiming
|
||||
Id string // The id to reference this particular ActionTiming
|
||||
ActionPlanId string // The id of the ActionPlanId profile attached to the account
|
||||
ActionsId string // The id of actions which will be executed
|
||||
NextExecTime time.Time // Next execution time
|
||||
ActionsId string // The id of actions which will be executed
|
||||
NextExecTime time.Time // Next execution time
|
||||
}
|
||||
|
||||
func (self *ApierV1) GetAccountActionPlan(attrs AttrAcntAction, reply *[]*AccountActionTiming) error {
|
||||
@@ -60,12 +60,12 @@ func (self *ApierV1) GetAccountActionPlan(attrs AttrAcntAction, reply *[]*Accoun
|
||||
}
|
||||
|
||||
type AttrRemActionTiming struct {
|
||||
ActionPlanId string // Id identifying the ActionTimings profile
|
||||
ActionPlanId string // Id identifying the ActionTimings profile
|
||||
ActionTimingId string // Internal CGR id identifying particular ActionTiming, *all for all user related ActionTimings to be canceled
|
||||
Tenant string // Tenant he account belongs to
|
||||
Account string // Account name
|
||||
Direction string // Traffic direction
|
||||
ReloadScheduler bool // If set it will reload the scheduler after adding
|
||||
ReloadScheduler bool // If set it will reload the scheduler after adding
|
||||
}
|
||||
|
||||
// Removes an ActionTimings or parts of it depending on filters being set
|
||||
@@ -155,12 +155,11 @@ func (self *ApierV1) RemAccountActionTriggers(attrs AttrRemAcntActionTriggers, r
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
type AttrSetAccount struct {
|
||||
Tenant string
|
||||
Direction string
|
||||
Account string
|
||||
Type string // <*prepaid|*postpaid>
|
||||
Tenant string
|
||||
Direction string
|
||||
Account string
|
||||
Type string // <*prepaid|*postpaid>
|
||||
ActionPlanId string
|
||||
}
|
||||
|
||||
@@ -186,8 +185,8 @@ func (self *ApierV1) SetAccount(attr AttrSetAccount, reply *string) error {
|
||||
Type: attr.Type,
|
||||
}
|
||||
}
|
||||
|
||||
if len(attr.ActionPlanId) != 0 {
|
||||
|
||||
if len(attr.ActionPlanId) != 0 {
|
||||
var err error
|
||||
ats, err = self.AccountDb.GetActionTimings(attr.ActionPlanId)
|
||||
if err != nil {
|
||||
|
||||
@@ -19,21 +19,20 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package apier
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/rpc"
|
||||
"net/url"
|
||||
"os/exec"
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
"os/exec"
|
||||
"testing"
|
||||
"time"
|
||||
"strings"
|
||||
"net/url"
|
||||
"net/http"
|
||||
"net/rpc"
|
||||
"flag"
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
)
|
||||
|
||||
// ToDo: Replace rpc.Client with internal rpc server and Apier using internal map as both data and stor so we can run the tests non-local
|
||||
@@ -1229,7 +1228,7 @@ func TestCdrServer(t *testing.T) {
|
||||
"answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}}
|
||||
for _, cdrForm := range []url.Values{cdrForm1, cdrForm2} {
|
||||
cdrForm.Set(utils.CDRSOURCE, engine.TEST_SQL)
|
||||
if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", cfg.CdrcCdrs), cdrForm); err != nil {
|
||||
if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", "127.0.0.1:2080"), cdrForm); err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,8 +43,8 @@ func (self *ApierV1) SetTPActionPlan(attrs utils.TPActionPlan, reply *string) er
|
||||
}
|
||||
|
||||
type AttrGetTPActionPlan struct {
|
||||
TPid string // Tariff plan id
|
||||
Id string // ActionTimings id
|
||||
TPid string // Tariff plan id
|
||||
Id string // ActionTimings id
|
||||
}
|
||||
|
||||
// Queries specific ActionPlan profile on tariff plan
|
||||
|
||||
62
cdrc/cdrc.go
62
cdrc/cdrc.go
@@ -26,13 +26,13 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/cdrs"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -44,8 +44,8 @@ const (
|
||||
FS_CSV = "freeswitch_csv"
|
||||
)
|
||||
|
||||
func NewCdrc(config *config.CGRConfig) (*Cdrc, error) {
|
||||
cdrc := &Cdrc{cgrCfg: config}
|
||||
func NewCdrc(config *config.CGRConfig, cdrServer *cdrs.CDRS) (*Cdrc, error) {
|
||||
cdrc := &Cdrc{cgrCfg: config, cdrServer: cdrServer}
|
||||
// Before processing, make sure in and out folders exist
|
||||
for _, dir := range []string{cdrc.cgrCfg.CdrcCdrInDir, cdrc.cgrCfg.CdrcCdrOutDir} {
|
||||
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
|
||||
@@ -61,6 +61,7 @@ func NewCdrc(config *config.CGRConfig) (*Cdrc, error) {
|
||||
|
||||
type Cdrc struct {
|
||||
cgrCfg *config.CGRConfig
|
||||
cdrServer *cdrs.CDRS
|
||||
cfgCdrFields map[string]string // Key is the name of the field
|
||||
httpClient *http.Client
|
||||
}
|
||||
@@ -116,10 +117,9 @@ func (self *Cdrc) parseFieldsConfig() error {
|
||||
}
|
||||
|
||||
// Takes the record out of csv and turns it into http form which can be posted
|
||||
func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) {
|
||||
// engine.Logger.Info(fmt.Sprintf("Processing record %v", record))
|
||||
v := url.Values{}
|
||||
v.Set(utils.CDRSOURCE, self.cgrCfg.CdrcSourceId)
|
||||
func (self *Cdrc) recordAsRatedCdr(record []string) (*utils.RatedCDR, error) {
|
||||
ratedCdr := &utils.RatedCDR{CdrSource: self.cgrCfg.CdrcSourceId, ExtraFields: map[string]string{}, Cost: -1}
|
||||
var err error
|
||||
for cfgFieldName, cfgFieldVal := range self.cfgCdrFields {
|
||||
var fieldVal string
|
||||
if strings.HasPrefix(cfgFieldVal, utils.STATIC_VALUE_PREFIX) {
|
||||
@@ -135,9 +135,38 @@ func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) {
|
||||
} else { // Modify here when we add more supported cdr formats
|
||||
fieldVal = "UNKNOWN"
|
||||
}
|
||||
v.Set(cfgFieldName, fieldVal)
|
||||
switch cfgFieldName {
|
||||
case utils.ACCID:
|
||||
ratedCdr.CgrId = utils.FSCgrId(fieldVal)
|
||||
ratedCdr.AccId = fieldVal
|
||||
case utils.REQTYPE:
|
||||
ratedCdr.ReqType = fieldVal
|
||||
case utils.DIRECTION:
|
||||
ratedCdr.Direction = fieldVal
|
||||
case utils.TENANT:
|
||||
ratedCdr.Tenant = fieldVal
|
||||
case utils.TOR:
|
||||
ratedCdr.TOR = fieldVal
|
||||
case utils.ACCOUNT:
|
||||
ratedCdr.Account = fieldVal
|
||||
case utils.SUBJECT:
|
||||
ratedCdr.Subject = fieldVal
|
||||
case utils.DESTINATION:
|
||||
ratedCdr.Destination = fieldVal
|
||||
case utils.ANSWER_TIME:
|
||||
if ratedCdr.AnswerTime, err = utils.ParseTimeDetectLayout(fieldVal); err != nil {
|
||||
return nil, fmt.Errorf("Cannot parse answer time field, err: %s", err.Error())
|
||||
}
|
||||
case utils.DURATION:
|
||||
if ratedCdr.Duration, err = utils.ParseDurationWithSecs(fieldVal); err != nil {
|
||||
return nil, fmt.Errorf("Cannot parse duration field, err: %s", err.Error())
|
||||
}
|
||||
default: // Extra fields will not match predefined so they all show up here
|
||||
ratedCdr.ExtraFields[cfgFieldName] = fieldVal
|
||||
}
|
||||
|
||||
}
|
||||
return v, nil
|
||||
return ratedCdr, nil
|
||||
}
|
||||
|
||||
// One run over the CDR folder
|
||||
@@ -199,14 +228,21 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Error in csv file: %s", err.Error()))
|
||||
continue // Other csv related errors, ignore
|
||||
}
|
||||
cdrAsForm, err := self.cdrAsHttpForm(record)
|
||||
rawCdr, err := self.recordAsRatedCdr(record)
|
||||
if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Error in csv file: %s", err.Error()))
|
||||
continue
|
||||
}
|
||||
if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.HTTPListen), cdrAsForm); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, error: %s", err.Error()))
|
||||
continue
|
||||
if self.cgrCfg.CdrcCdrs == utils.INTERNAL {
|
||||
if err := self.cdrServer.ProcessRawCdr(rawCdr); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, error: %s", err.Error()))
|
||||
continue
|
||||
}
|
||||
} else { // CDRs listening on IP
|
||||
if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.HTTPListen), rawCdr.AsRawCdrHttpForm()); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, error: %s", err.Error()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
// Finished with file, move it to processed folder
|
||||
|
||||
@@ -130,10 +130,13 @@ func TestProcessCdrDir(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
if cfg.CdrcCdrs == utils.INTERNAL { // For now we only test over network
|
||||
return
|
||||
}
|
||||
if err := startEngine(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
cdrc, err := NewCdrc(cfg)
|
||||
cdrc, err := NewCdrc(cfg, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
@@ -21,7 +21,9 @@ package cdrc
|
||||
import (
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestParseFieldsConfig(t *testing.T) {
|
||||
@@ -50,33 +52,55 @@ func TestParseFieldsConfig(t *testing.T) {
|
||||
cgrConfig.CdrcExtraFields = []string{"supplier1:^top_supplier", "orig_ip:11"}
|
||||
cdrc = &Cdrc{cgrCfg: cgrConfig}
|
||||
if err := cdrc.parseFieldsConfig(); err != nil {
|
||||
t.Errorf("Failed to corectly parse extra fields %v",cdrc.cfgCdrFields)
|
||||
t.Errorf("Failed to corectly parse extra fields %v", cdrc.cfgCdrFields)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCdrAsHttpForm(t *testing.T) {
|
||||
func TestRecordAsRatedCdr(t *testing.T) {
|
||||
cgrConfig, _ := config.NewDefaultCGRConfig()
|
||||
cgrConfig.CdrcExtraFields = []string{"supplier:10"}
|
||||
cdrc := &Cdrc{cgrCfg: cgrConfig}
|
||||
if err := cdrc.parseFieldsConfig(); err != nil {
|
||||
t.Error("Failed parsing default fieldIndexesFromConfig", err)
|
||||
}
|
||||
cdrRow := []string{"firstField", "secondField"}
|
||||
_, err := cdrc.cdrAsHttpForm(cdrRow)
|
||||
_, err := cdrc.recordAsRatedCdr(cdrRow)
|
||||
if err == nil {
|
||||
t.Error("Failed to corectly detect missing fields from record")
|
||||
}
|
||||
cdrRow = []string{"acc1", "prepaid", "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1"}
|
||||
cdrAsForm, err := cdrc.cdrAsHttpForm(cdrRow)
|
||||
rtCdr, err := cdrc.recordAsRatedCdr(cdrRow)
|
||||
if err != nil {
|
||||
t.Error("Failed to parse CDR in form", err)
|
||||
t.Error("Failed to parse CDR in rated cdr", err)
|
||||
}
|
||||
if cdrAsForm.Get(utils.CDRSOURCE) != cgrConfig.CdrcSourceId {
|
||||
t.Error("Unexpected cdrsource received", cdrAsForm.Get(utils.CDRSOURCE))
|
||||
expectedCdr := &utils.RatedCDR{
|
||||
CgrId: utils.FSCgrId(cdrRow[0]),
|
||||
AccId: cdrRow[0],
|
||||
CdrSource: cgrConfig.CdrcSourceId,
|
||||
ReqType: cdrRow[1],
|
||||
Direction: cdrRow[2],
|
||||
Tenant: cdrRow[3],
|
||||
TOR: cdrRow[4],
|
||||
Account: cdrRow[5],
|
||||
Subject: cdrRow[6],
|
||||
Destination: cdrRow[7],
|
||||
AnswerTime: time.Date(2013, 2, 3, 19, 54, 0, 0, time.UTC),
|
||||
Duration: time.Duration(62) * time.Second,
|
||||
ExtraFields: map[string]string{"supplier": "supplier1"},
|
||||
Cost: -1,
|
||||
}
|
||||
if cdrAsForm.Get(utils.REQTYPE) != "prepaid" {
|
||||
t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE))
|
||||
if !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
|
||||
}
|
||||
//if cdrAsForm.Get("supplier") != "supplier1" {
|
||||
// t.Error("Unexpected CDR value received", cdrAsForm.Get("supplier"))
|
||||
//}
|
||||
/*
|
||||
if cdrAsForm.Get(utils.CDRSOURCE) != cgrConfig.CdrcSourceId {
|
||||
t.Error("Unexpected cdrsource received", cdrAsForm.Get(utils.CDRSOURCE))
|
||||
}
|
||||
if cdrAsForm.Get(utils.REQTYPE) != "prepaid" {
|
||||
t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE))
|
||||
}
|
||||
if cdrAsForm.Get("supplier") != "supplier1" {
|
||||
t.Error("Unexpected CDR value received", cdrAsForm.Get("supplier"))
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
55
cdrs/cdrs.go
55
cdrs/cdrs.go
@@ -35,36 +35,42 @@ var (
|
||||
medi *mediator.Mediator
|
||||
)
|
||||
|
||||
func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
|
||||
body, _ := ioutil.ReadAll(r.Body)
|
||||
if fsCdr, err := new(FSCdr).New(body); err == nil {
|
||||
storage.SetCdr(fsCdr)
|
||||
go func() { //FS will not send us hangup_complete until we have send back the answer to CDR, so we need to handle mediation async
|
||||
if cfg.CDRSMediator == "internal" {
|
||||
medi.MediateRawCDR(fsCdr)
|
||||
} else {
|
||||
//TODO: use the connection to mediator
|
||||
// Returns error if not able to properly store the CDR, mediation is async since we can always recover offline
|
||||
func storeAndMediate(rawCdr utils.RawCDR) error {
|
||||
if err := storage.SetCdr(rawCdr); err != nil {
|
||||
return err
|
||||
}
|
||||
if cfg.CDRSMediator == utils.INTERNAL {
|
||||
go func() {
|
||||
if err := medi.MediateRawCDR(rawCdr); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", err.Error()))
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Handler for generic cgr cdr http
|
||||
func cgrCdrHandler(w http.ResponseWriter, r *http.Request) {
|
||||
cgrCdr, err := utils.NewCgrCdrFromHttpReq(r)
|
||||
if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Could not create CDR entry: %s", err.Error()))
|
||||
}
|
||||
if err := storeAndMediate(cgrCdr); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Errors when storing CDR entry: %s", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
func cgrCdrHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if cgrCdr, err := utils.NewCgrCdrFromHttpReq(r); err == nil {
|
||||
storage.SetCdr(cgrCdr)
|
||||
if cfg.CDRSMediator == "internal" {
|
||||
errMedi := medi.MediateRawCDR(cgrCdr)
|
||||
if errMedi != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", errMedi.Error()))
|
||||
}
|
||||
} else {
|
||||
//TODO: use the connection to mediator
|
||||
}
|
||||
} else {
|
||||
// Handler for fs http
|
||||
func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
|
||||
body, _ := ioutil.ReadAll(r.Body)
|
||||
fsCdr, err := new(FSCdr).New(body)
|
||||
if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Could not create CDR entry: %s", err.Error()))
|
||||
}
|
||||
if err := storeAndMediate(fsCdr); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Errors when storing CDR entry: %s", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
type CDRS struct{}
|
||||
@@ -80,3 +86,8 @@ func (cdrs *CDRS) RegisterHanlersToServer(server *engine.Server) {
|
||||
server.RegisterHttpFunc("/cgr", cgrCdrHandler)
|
||||
server.RegisterHttpFunc("/freeswitch_json", fsCdrHandler)
|
||||
}
|
||||
|
||||
// Used to internally process CDR
|
||||
func (cdrs *CDRS) ProcessRawCdr(rawCdr utils.RawCDR) error {
|
||||
return storeAndMediate(rawCdr)
|
||||
}
|
||||
|
||||
@@ -24,8 +24,8 @@ import (
|
||||
"fmt"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"strconv"
|
||||
"time"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -172,7 +172,7 @@ func (fsCdr FSCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld
|
||||
rtCdr := new(utils.RatedCDR)
|
||||
rtCdr.MediationRunId = runId
|
||||
rtCdr.Cost = -1.0 // Default for non-rated CDR
|
||||
if rtCdr.AccId = fsCdr.GetAccId(); len(rtCdr.AccId)==0 {
|
||||
if rtCdr.AccId = fsCdr.GetAccId(); len(rtCdr.AccId) == 0 {
|
||||
if fieldsMandatory {
|
||||
return nil, errors.New(fmt.Sprintf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, utils.ACCID))
|
||||
} else { // Not mandatory, need to generate here CgrId
|
||||
@@ -181,10 +181,10 @@ func (fsCdr FSCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld
|
||||
} else { // hasKey, use it to generate cgrid
|
||||
rtCdr.CgrId = utils.FSCgrId(rtCdr.AccId)
|
||||
}
|
||||
if rtCdr.CdrHost = fsCdr.GetCdrHost(); len(rtCdr.CdrHost)==0 && fieldsMandatory {
|
||||
if rtCdr.CdrHost = fsCdr.GetCdrHost(); len(rtCdr.CdrHost) == 0 && fieldsMandatory {
|
||||
return nil, errors.New(fmt.Sprintf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, utils.CDRHOST))
|
||||
}
|
||||
if rtCdr.CdrSource = fsCdr.GetCdrSource(); len(rtCdr.CdrSource)==0 && fieldsMandatory {
|
||||
if rtCdr.CdrSource = fsCdr.GetCdrSource(); len(rtCdr.CdrSource) == 0 && fieldsMandatory {
|
||||
return nil, errors.New(fmt.Sprintf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, utils.CDRSOURCE))
|
||||
}
|
||||
if strings.HasPrefix(reqTypeFld, utils.STATIC_VALUE_PREFIX) { // Values starting with prefix are not dynamically populated
|
||||
@@ -232,7 +232,7 @@ func (fsCdr FSCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if durStr, hasKey = fsCdr[durationFld]; !hasKey && fieldsMandatory && !strings.HasPrefix(durationFld, utils.STATIC_VALUE_PREFIX){
|
||||
if durStr, hasKey = fsCdr[durationFld]; !hasKey && fieldsMandatory && !strings.HasPrefix(durationFld, utils.STATIC_VALUE_PREFIX) {
|
||||
return nil, errors.New(fmt.Sprintf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, durationFld))
|
||||
} else {
|
||||
if strings.HasPrefix(durationFld, utils.STATIC_VALUE_PREFIX) {
|
||||
|
||||
@@ -21,8 +21,8 @@ package cdrs
|
||||
import (
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"testing"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -100,28 +100,28 @@ func TestFsCdrAsRatedCdr(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Error loading cdr: %v", err)
|
||||
}
|
||||
rtCdrOut, err := fsCdr.AsRatedCdr("wholesale_run", "^"+utils.RATED, "^*out", "cgr_tenant", "cgr_tor", "cgr_account", "cgr_subject", "cgr_destination",
|
||||
rtCdrOut, err := fsCdr.AsRatedCdr("wholesale_run", "^"+utils.RATED, "^*out", "cgr_tenant", "cgr_tor", "cgr_account", "cgr_subject", "cgr_destination",
|
||||
"answer_epoch", "billsec", []string{"effective_caller_id_number"}, true)
|
||||
if err != nil {
|
||||
t.Error("Unexpected error received", err)
|
||||
}
|
||||
expctRatedCdr := &utils.RatedCDR{CgrId: utils.FSCgrId("01df56f4-d99a-4ef6-b7fe-b924b2415b7f"), AccId: "01df56f4-d99a-4ef6-b7fe-b924b2415b7f",
|
||||
expctRatedCdr := &utils.RatedCDR{CgrId: utils.FSCgrId("01df56f4-d99a-4ef6-b7fe-b924b2415b7f"), AccId: "01df56f4-d99a-4ef6-b7fe-b924b2415b7f",
|
||||
CdrHost: "127.0.0.1", CdrSource: FS_CDR_SOURCE, ReqType: utils.RATED,
|
||||
Direction: "*out", Tenant: "ipbx.itsyscom.com", TOR: "call", Account: "dan", Subject: "dan", Destination: "+4986517174963",
|
||||
AnswerTime: time.Date(2013, 8, 4, 9, 50, 56, 0, time.UTC).Local(), Duration: time.Duration(4)*time.Second,
|
||||
Direction: "*out", Tenant: "ipbx.itsyscom.com", TOR: "call", Account: "dan", Subject: "dan", Destination: "+4986517174963",
|
||||
AnswerTime: time.Date(2013, 8, 4, 9, 50, 56, 0, time.UTC).Local(), Duration: time.Duration(4) * time.Second,
|
||||
ExtraFields: map[string]string{"effective_caller_id_number": "+4986517174960"}, MediationRunId: "wholesale_run", Cost: -1}
|
||||
if !reflect.DeepEqual(rtCdrOut, expctRatedCdr) {
|
||||
t.Errorf("Received: %v, expected: %v", rtCdrOut, expctRatedCdr)
|
||||
}
|
||||
rtCdrOut2, err := fsCdr.AsRatedCdr("wholesale_run", "^postpaid", "^*in", "^cgrates.com", "^premium_call", "^first_account", "^first_subject", "cgr_destination",
|
||||
rtCdrOut2, err := fsCdr.AsRatedCdr("wholesale_run", "^postpaid", "^*in", "^cgrates.com", "^premium_call", "^first_account", "^first_subject", "cgr_destination",
|
||||
"^2013-12-07T08:42:26Z", "^12s", []string{"effective_caller_id_number"}, true)
|
||||
if err != nil {
|
||||
t.Error("Unexpected error received", err)
|
||||
}
|
||||
expctRatedCdr2 := &utils.RatedCDR{CgrId: utils.FSCgrId("01df56f4-d99a-4ef6-b7fe-b924b2415b7f"), AccId: "01df56f4-d99a-4ef6-b7fe-b924b2415b7f", CdrHost: "127.0.0.1",
|
||||
expctRatedCdr2 := &utils.RatedCDR{CgrId: utils.FSCgrId("01df56f4-d99a-4ef6-b7fe-b924b2415b7f"), AccId: "01df56f4-d99a-4ef6-b7fe-b924b2415b7f", CdrHost: "127.0.0.1",
|
||||
CdrSource: FS_CDR_SOURCE, ReqType: "postpaid",
|
||||
Direction: "*in", Tenant: "cgrates.com", TOR: "premium_call", Account: "first_account", Subject: "first_subject", Destination: "+4986517174963",
|
||||
AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC), Duration: time.Duration(12)*time.Second,
|
||||
Direction: "*in", Tenant: "cgrates.com", TOR: "premium_call", Account: "first_account", Subject: "first_subject", Destination: "+4986517174963",
|
||||
AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC), Duration: time.Duration(12) * time.Second,
|
||||
ExtraFields: map[string]string{"effective_caller_id_number": "+4986517174960"}, MediationRunId: "wholesale_run", Cost: -1}
|
||||
if !reflect.DeepEqual(rtCdrOut2, expctRatedCdr2) {
|
||||
t.Errorf("Received: %v, expected: %v", rtCdrOut2, expctRatedCdr2)
|
||||
|
||||
@@ -43,7 +43,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
INTERNAL = "internal"
|
||||
JSON = "json"
|
||||
GOB = "gob"
|
||||
POSTGRES = "postgres"
|
||||
@@ -58,7 +57,7 @@ var (
|
||||
cfgPath = flag.String("config", "/etc/cgrates/cgrates.cfg", "Configuration file location.")
|
||||
version = flag.Bool("version", false, "Prints the application version.")
|
||||
raterEnabled = flag.Bool("rater", false, "Enforce starting of the rater daemon overwriting config")
|
||||
schedEnabled = flag.Bool("scheduler", false, "Enforce starting of the scheduler daemon overwriting config")
|
||||
schedEnabled = flag.Bool("scheduler", false, "Enforce starting of the scheduler daemon .overwriting config")
|
||||
cdrsEnabled = flag.Bool("cdrs", false, "Enforce starting of the cdrs daemon overwriting config")
|
||||
cdrcEnabled = flag.Bool("cdrc", false, "Enforce starting of the cdrc service overwriting config")
|
||||
mediatorEnabled = flag.Bool("mediator", false, "Enforce starting of the mediator service overwriting config")
|
||||
@@ -71,11 +70,35 @@ var (
|
||||
medi *mediator.Mediator
|
||||
cfg *config.CGRConfig
|
||||
err error
|
||||
|
||||
/*
|
||||
scribeServer history.Scribe
|
||||
cdrServer *cdrs.CDRS
|
||||
sm sessionmanager.SessionManager
|
||||
medi *mediator.Mediator
|
||||
cfg *config.CGRConfig
|
||||
err error
|
||||
*/
|
||||
)
|
||||
|
||||
func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage) {
|
||||
func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, doneChan chan struct{}) {
|
||||
if err := ratingDb.CacheRating(nil, nil, nil); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
if err := accountDb.CacheAccounting(nil, nil); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
close(doneChan)
|
||||
}
|
||||
|
||||
func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage, cacheChan, chanDone chan struct{}) {
|
||||
var connector engine.Connector
|
||||
if cfg.MediatorRater == INTERNAL {
|
||||
if cfg.MediatorRater == utils.INTERNAL {
|
||||
<-cacheChan // Cache needs to come up before we are ready
|
||||
connector = responder
|
||||
} else {
|
||||
var client *rpc.Client
|
||||
@@ -91,6 +114,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<Mediator> Could not connect to engine: %v", err))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
connector = &engine.RPCClientConnector{Client: client}
|
||||
}
|
||||
@@ -99,11 +123,16 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("Mediator config parsing error: %v", err))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
close(chanDone)
|
||||
}
|
||||
|
||||
func startCdrc() {
|
||||
cdrc, err := cdrc.NewCdrc(cfg)
|
||||
func startCdrc(cdrsChan chan struct{}) {
|
||||
if cfg.CdrcCdrs == utils.INTERNAL {
|
||||
<-cdrsChan // Wait for CDRServer to come up before start processing
|
||||
}
|
||||
cdrc, err := cdrc.NewCdrc(cfg, cdrServer)
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
@@ -115,9 +144,10 @@ func startCdrc() {
|
||||
exitChan <- true // If run stopped, something is bad, stop the application
|
||||
}
|
||||
|
||||
func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage) {
|
||||
func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) {
|
||||
var connector engine.Connector
|
||||
if cfg.SMRater == INTERNAL {
|
||||
if cfg.SMRater == utils.INTERNAL {
|
||||
<-cacheChan // Wait for the cache to init before start doing queries
|
||||
connector = responder
|
||||
} else {
|
||||
var client *rpc.Client
|
||||
@@ -146,31 +176,40 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
|
||||
}
|
||||
default:
|
||||
engine.Logger.Err(fmt.Sprintf("<SessionManager> Unsupported session manger type: %s!", cfg.SMSwitchType))
|
||||
exitChan <- true
|
||||
}
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage) {
|
||||
if cfg.CDRSMediator == INTERNAL {
|
||||
for i := 0; i < 3; i++ { // ToDo: If the right approach, make the reconnects configurable
|
||||
time.Sleep(time.Duration(i+1) * time.Second)
|
||||
if medi != nil { // Got our mediator, no need to wait any longer
|
||||
break
|
||||
}
|
||||
}
|
||||
func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, doneChan chan struct{}) {
|
||||
if cfg.CDRSMediator == utils.INTERNAL {
|
||||
<-mediChan // Deadlock if mediator not started
|
||||
if medi == nil {
|
||||
engine.Logger.Crit("<CDRS> Could not connect to mediator, exiting.")
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
cs := cdrs.New(cdrDb, medi, cfg)
|
||||
cs.RegisterHanlersToServer(server)
|
||||
cdrServer = cdrs.New(cdrDb, medi, cfg)
|
||||
cdrServer.RegisterHanlersToServer(server)
|
||||
close(doneChan)
|
||||
}
|
||||
|
||||
func startHistoryAgent(scribeServer history.Scribe) {
|
||||
if cfg.HistoryServer != INTERNAL { // Connect in iteration since there are chances of concurrency here
|
||||
engine.Logger.Info("Starting History Agent.")
|
||||
func startHistoryServer(chanDone chan struct{}) {
|
||||
if scribeServer, err = history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<HistoryServer> Could not start, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
server.RpcRegisterName("Scribe", scribeServer)
|
||||
close(chanDone)
|
||||
}
|
||||
|
||||
// chanStartServer will report when server is up, useful for internal requests
|
||||
func startHistoryAgent(scribeServer history.Scribe, chanServerStarted chan struct{}) {
|
||||
if cfg.HistoryServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting
|
||||
engine.Logger.Crit(fmt.Sprintf("<HistoryAgent> Connecting internally to HistoryServer"))
|
||||
<-chanServerStarted // If server is not enabled, will have deadlock here
|
||||
} else { // Connect in iteration since there are chances of concurrency here
|
||||
for i := 0; i < 3; i++ { //ToDo: Make it globally configurable
|
||||
//engine.Logger.Crit(fmt.Sprintf("<HistoryAgent> Trying to connect, iteration: %d, time %s", i, time.Now()))
|
||||
if scribeServer, err = history.NewProxyScribe(cfg.HistoryServer); err == nil {
|
||||
@@ -180,13 +219,31 @@ func startHistoryAgent(scribeServer history.Scribe) {
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Duration(i+1) * time.Second)
|
||||
time.Sleep(time.Duration(i) * time.Second)
|
||||
}
|
||||
}
|
||||
engine.SetHistoryScribe(scribeServer)
|
||||
return
|
||||
}
|
||||
|
||||
// Starts the rpc server, waiting for the necessary components to finish their tasks
|
||||
func serveRpc(rpcWaitChans []chan struct{}) {
|
||||
for _, chn := range rpcWaitChans {
|
||||
<-chn
|
||||
}
|
||||
// Each of the serve blocks so need to start in their own goroutine
|
||||
go server.ServeJSON(cfg.RPCJSONListen)
|
||||
go server.ServeGOB(cfg.RPCGOBListen)
|
||||
}
|
||||
|
||||
// Starts the http server, waiting for the necessary components to finish their tasks
|
||||
func serveHttp(httpWaitChans []chan struct{}) {
|
||||
for _, chn := range httpWaitChans {
|
||||
<-chn
|
||||
}
|
||||
server.ServeHTTP(cfg.HTTPListen)
|
||||
}
|
||||
|
||||
func checkConfigSanity() error {
|
||||
if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != "" {
|
||||
engine.Logger.Crit("The session manager must not be enabled on a worker engine (change [engine]/balancer to disabled)!")
|
||||
@@ -196,11 +253,11 @@ func checkConfigSanity() error {
|
||||
engine.Logger.Crit("The balancer is enabled so it cannot connect to another balancer (change rater/balancer to disabled)!")
|
||||
return errors.New("Improperly configured balancer")
|
||||
}
|
||||
if cfg.CDRSEnabled && cfg.CDRSMediator == INTERNAL && !cfg.MediatorEnabled {
|
||||
if cfg.CDRSEnabled && cfg.CDRSMediator == utils.INTERNAL && !cfg.MediatorEnabled {
|
||||
engine.Logger.Crit("CDRS cannot connect to mediator, Mediator not enabled in configuration!")
|
||||
return errors.New("Internal Mediator required by CDRS")
|
||||
}
|
||||
if cfg.HistoryServerEnabled && cfg.HistoryServer == INTERNAL && !cfg.HistoryServerEnabled {
|
||||
if cfg.HistoryServerEnabled && cfg.HistoryServer == utils.INTERNAL && !cfg.HistoryServerEnabled {
|
||||
engine.Logger.Crit("The history agent is enabled and internal and history server is disabled!")
|
||||
return errors.New("Improperly configured history service")
|
||||
}
|
||||
@@ -263,14 +320,16 @@ func main() {
|
||||
var logDb engine.LogStorage
|
||||
var loadDb engine.LoadStorage
|
||||
var cdrDb engine.CdrStorage
|
||||
ratingDb, err = engine.ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort, cfg.RatingDBName, cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding)
|
||||
ratingDb, err = engine.ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort,
|
||||
cfg.RatingDBName, cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
engine.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
defer ratingDb.Close()
|
||||
engine.SetRatingStorage(ratingDb)
|
||||
accountDb, err = engine.ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, cfg.AccountDBName, cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding)
|
||||
accountDb, err = engine.ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort,
|
||||
cfg.AccountDBName, cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
engine.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err))
|
||||
return
|
||||
@@ -306,7 +365,8 @@ func main() {
|
||||
if cfg.StorDBType == SAME {
|
||||
logDb = ratingDb.(engine.LogStorage)
|
||||
} else {
|
||||
logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding)
|
||||
logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort,
|
||||
cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding)
|
||||
if err != nil { // Cannot configure logger database, show stopper
|
||||
engine.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err))
|
||||
return
|
||||
@@ -327,18 +387,18 @@ func main() {
|
||||
|
||||
stopHandled := false
|
||||
|
||||
// Async starts here
|
||||
|
||||
rpcWait := make([]chan struct{}, 0) // Rpc server will start as soon as this list is consumed
|
||||
httpWait := make([]chan struct{}, 0) // Http server will start as soon as this list is consumed
|
||||
|
||||
var cacheChan chan struct{}
|
||||
if cfg.RaterEnabled { // Cache rating if rater enabled
|
||||
if err := ratingDb.CacheRating(nil, nil, nil); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error()))
|
||||
return
|
||||
}
|
||||
if err := accountDb.CacheAccounting(nil, nil); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error()))
|
||||
return
|
||||
}
|
||||
cacheChan = make(chan struct{})
|
||||
rpcWait = append(rpcWait, cacheChan)
|
||||
go cacheData(ratingDb, accountDb, cacheChan)
|
||||
}
|
||||
|
||||
// Async starts here
|
||||
if cfg.RaterEnabled && cfg.RaterBalancer != "" && !cfg.BalancerEnabled {
|
||||
go registerToBalancer()
|
||||
go stopRaterSignalHandler()
|
||||
@@ -348,14 +408,14 @@ func main() {
|
||||
responder := &engine.Responder{ExitChan: exitChan}
|
||||
apier := &apier.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, Config: cfg}
|
||||
|
||||
if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != INTERNAL {
|
||||
if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != utils.INTERNAL {
|
||||
engine.Logger.Info("Registering CGRateS Rater service")
|
||||
server.RpcRegister(responder)
|
||||
server.RpcRegister(apier)
|
||||
}
|
||||
|
||||
if cfg.BalancerEnabled {
|
||||
engine.Logger.Info("Registering CGRateS Balancer service")
|
||||
engine.Logger.Info("Registering CGRateS Balancer service.")
|
||||
go stopBalancerSignalHandler()
|
||||
stopHandled = true
|
||||
responder.Bal = bal
|
||||
@@ -382,49 +442,51 @@ func main() {
|
||||
}()
|
||||
}
|
||||
|
||||
var scribeServer history.Scribe
|
||||
|
||||
var histServChan chan struct{} // Will be initialized only if the server starts
|
||||
if cfg.HistoryServerEnabled {
|
||||
engine.Logger.Info("Registering CGRates History service")
|
||||
if scribeServer, err = history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<HistoryServer> Could not start, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
server.RpcRegisterName("Scribe", scribeServer)
|
||||
}
|
||||
go startHistoryAgent(scribeServer)
|
||||
|
||||
go server.ServeGOB(cfg.RPCGOBListen)
|
||||
go server.ServeJSON(cfg.RPCJSONListen)
|
||||
|
||||
go startHistoryAgent(scribeServer)
|
||||
|
||||
if cfg.CDRSEnabled {
|
||||
engine.Logger.Info("Registering CGRateS CDR service")
|
||||
go startCDRS(responder, cdrDb)
|
||||
histServChan = make(chan struct{})
|
||||
rpcWait = append(rpcWait, histServChan)
|
||||
go startHistoryServer(histServChan)
|
||||
}
|
||||
|
||||
go server.ServeHTTP(cfg.HTTPListen)
|
||||
if cfg.HistoryAgentEnabled {
|
||||
engine.Logger.Info("Starting CGRateS History Agent.")
|
||||
go startHistoryAgent(scribeServer, histServChan)
|
||||
}
|
||||
|
||||
var medChan chan struct{}
|
||||
if cfg.MediatorEnabled {
|
||||
engine.Logger.Info("Starting CGRateS Mediator service")
|
||||
go startMediator(responder, logDb, cdrDb)
|
||||
engine.Logger.Info("Starting CGRateS Mediator service.")
|
||||
medChan = make(chan struct{})
|
||||
go startMediator(responder, logDb, cdrDb, cacheChan, medChan)
|
||||
}
|
||||
|
||||
var cdrsChan chan struct{}
|
||||
if cfg.CDRSEnabled {
|
||||
engine.Logger.Info("Starting CGRateS CDRS service.")
|
||||
cdrsChan = make(chan struct{})
|
||||
httpWait = append(httpWait, cdrsChan)
|
||||
go startCDRS(responder, cdrDb, medChan, cdrsChan)
|
||||
}
|
||||
|
||||
if cfg.SMEnabled {
|
||||
engine.Logger.Info("Starting CGRateS SessionManager service")
|
||||
go startSessionManager(responder, logDb)
|
||||
engine.Logger.Info("Starting CGRateS SessionManager service.")
|
||||
go startSessionManager(responder, logDb, cacheChan)
|
||||
// close all sessions on shutdown
|
||||
go shutdownSessionmanagerSingnalHandler()
|
||||
}
|
||||
|
||||
if cfg.CdrcEnabled {
|
||||
engine.Logger.Info("Starting CGRateS CDR client")
|
||||
go startCdrc()
|
||||
engine.Logger.Info("Starting CGRateS CDR client.")
|
||||
go startCdrc(cdrsChan)
|
||||
}
|
||||
|
||||
// Start the servers
|
||||
go serveRpc(rpcWait)
|
||||
go serveHttp(httpWait)
|
||||
|
||||
<-exitChan
|
||||
|
||||
if *pidFile != "" {
|
||||
if err := os.Remove(*pidFile); err != nil {
|
||||
engine.Logger.Warning("Could not remove pid file: " + err.Error())
|
||||
|
||||
@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
@@ -67,8 +66,8 @@ var (
|
||||
stats = flag.Bool("stats", false, "Generates statsistics about given data.")
|
||||
fromStorDb = flag.Bool("from_stordb", false, "Load the tariff plan from storDb to dataDb")
|
||||
toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb")
|
||||
historyServer = flag.String("history_server", cgrConfig.HistoryServer, "The history server address:port, empty to disable automaticautomatic history archiving")
|
||||
raterAddress = flag.String("rater_address", cgrConfig.MediatorRater, "Rater service to contact for cache reloads, empty to disable automatic cache reloads")
|
||||
historyServer = flag.String("history_server", cgrConfig.RPCGOBListen, "The history server address:port, empty to disable automaticautomatic history archiving")
|
||||
raterAddress = flag.String("rater_address", cgrConfig.RPCGOBListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads")
|
||||
runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields")
|
||||
)
|
||||
|
||||
@@ -158,7 +157,6 @@ func main() {
|
||||
return
|
||||
} else {
|
||||
engine.SetHistoryScribe(scribeAgent)
|
||||
gob.Register(&engine.Destination{})
|
||||
defer scribeAgent.Client.Close()
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -29,7 +29,6 @@ import (
|
||||
|
||||
const (
|
||||
DISABLED = "disabled"
|
||||
INTERNAL = "internal"
|
||||
JSON = "json"
|
||||
GOB = "gob"
|
||||
POSTGRES = "postgres"
|
||||
@@ -137,10 +136,10 @@ type CGRConfig struct {
|
||||
HistoryServerEnabled bool // Starts History as server: <true|false>.
|
||||
HistoryDir string // Location on disk where to store history files.
|
||||
HistorySaveInterval time.Duration // The timout duration between history writes
|
||||
MailerServer string // The server to use when sending emails out
|
||||
MailerAuthUser string // Authenticate to email server using this user
|
||||
MailerAuthPass string // Authenticate to email server with this password
|
||||
MailerFromAddr string // From address used when sending emails out
|
||||
MailerServer string // The server to use when sending emails out
|
||||
MailerAuthUser string // Authenticate to email server using this user
|
||||
MailerAuthPass string // Authenticate to email server with this password
|
||||
MailerFromAddr string // From address used when sending emails out
|
||||
}
|
||||
|
||||
func (self *CGRConfig) setDefaults() error {
|
||||
@@ -183,7 +182,7 @@ func (self *CGRConfig) setDefaults() error {
|
||||
self.CdreExtraFields = []string{}
|
||||
self.CdreDir = "/var/log/cgrates/cdr/cdrexport/csv"
|
||||
self.CdrcEnabled = false
|
||||
self.CdrcCdrs = "127.0.0.1:2080"
|
||||
self.CdrcCdrs = utils.INTERNAL
|
||||
self.CdrcCdrsMethod = "http_cgr"
|
||||
self.CdrcRunDelay = time.Duration(0)
|
||||
self.CdrcCdrType = "csv"
|
||||
|
||||
@@ -84,7 +84,7 @@ func TestDefaults(t *testing.T) {
|
||||
eCfg.CdreExtraFields = []string{}
|
||||
eCfg.CdreDir = "/var/log/cgrates/cdr/cdrexport/csv"
|
||||
eCfg.CdrcEnabled = false
|
||||
eCfg.CdrcCdrs = "127.0.0.1:2080"
|
||||
eCfg.CdrcCdrs = utils.INTERNAL
|
||||
eCfg.CdrcCdrsMethod = "http_cgr"
|
||||
eCfg.CdrcRunDelay = time.Duration(0)
|
||||
eCfg.CdrcCdrType = "csv"
|
||||
|
||||
@@ -39,7 +39,7 @@
|
||||
|
||||
[rater]
|
||||
# enabled = false # Enable RaterCDRSExportPath service: <true|false>.
|
||||
# balancer = # Register to Balancer as worker: <""|127.0.0.1:2013>.
|
||||
# balancer = # Register to Balancer as worker: <""|internal|127.0.0.1:2013>.
|
||||
|
||||
[scheduler]
|
||||
# enabled = false # Starts Scheduler service: <true|false>.
|
||||
@@ -56,7 +56,7 @@
|
||||
|
||||
[cdrc]
|
||||
# enabled = false # Enable CDR client functionality
|
||||
# cdrs = 127.0.0.1:2080 # Address where to reach CDR server
|
||||
# cdrs = internal # Address where to reach CDR server. <internal|127.0.0.1:2080>
|
||||
# cdrs_method = http_cgr # Mechanism to use when posting CDRs on server <http_cgr>
|
||||
# run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify
|
||||
# cdr_type = csv # CDR file format <csv|freeswitch_csv>.
|
||||
|
||||
@@ -24,11 +24,13 @@
|
||||
# stordb_user = cgrates # Username to use when connecting to stordb.
|
||||
# stordb_passwd = CGRateS.org # Password to use when connecting to stordb.
|
||||
# dbdata_encoding = msgpack # The encoding used to store object data in strings: <msgpack|json>
|
||||
# rpc_encoding = json # RPC encoding used on APIs: <gob|json>.
|
||||
# rpc_json_listen = 127.0.0.1:2012 # RPC JSON listening address
|
||||
# rpc_gob_listen = 127.0.0.1:2013 # RPC GOB listening address
|
||||
# http_listen = 127.0.0.1:2080 # HTTP listening address
|
||||
# default_reqtype = rated # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>.
|
||||
# default_tor = 0 # Default Type of Record to consider when missing from requests.
|
||||
# default_tenant = 0 # Default Tenant to consider when missing from requests.
|
||||
# default_subject = 0 # Default rating Subject to consider when missing from requests.
|
||||
# default_tor = call # Default Type of Record to consider when missing from requests.
|
||||
# default_tenant = cgrates.org # Default Tenant to consider when missing from requests.
|
||||
# default_subject = cgrates # Default rating Subject to consider when missing from requests.
|
||||
# rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down>
|
||||
# rounding_decimals = 4 # Number of decimals to round float/costs at
|
||||
|
||||
@@ -38,15 +40,13 @@
|
||||
|
||||
[rater]
|
||||
enabled = true # Enable RaterCDRSExportPath service: <true|false>.
|
||||
# balancer = disabled # Register to Balancer as worker: <enabled|disabled>.
|
||||
# listen = 127.0.0.1:2012 # Rater's listening interface: <internal|x.y.z.y:1234>.
|
||||
# balancer = # Register to Balancer as worker: <""|internal|127.0.0.1:2013>.
|
||||
|
||||
[scheduler]
|
||||
enabled = true # Starts Scheduler service: <true|false>.
|
||||
|
||||
[cdrs]
|
||||
enabled = true # Start the CDR Server service: <true|false>.
|
||||
# listen=127.0.0.1:2022 # CDRS's listening interface: <x.y.z.y:1234>.
|
||||
# extra_fields = # Extra fields to store in CDRs
|
||||
mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
|
||||
|
||||
@@ -57,7 +57,7 @@ export_dir = /tmp # Path where the exported CDRs will be placed
|
||||
|
||||
[cdrc]
|
||||
enabled = true # Enable CDR client functionality
|
||||
# cdrs = 127.0.0.1:2022 # Address where to reach CDR server
|
||||
# cdrs = internal # Address where to reach CDR server. <internal|127.0.0.1:2080>
|
||||
# cdrs_method = http_cgr # Mechanism to use when posting CDRs on server <http_cgr>
|
||||
# run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify
|
||||
cdr_type = freeswitch_csv # CDR file format <csv>.
|
||||
@@ -78,8 +78,7 @@ extra_fields = read_codec:13,write_codec:14 # Extra fields identifiers. For .csv
|
||||
|
||||
[mediator]
|
||||
enabled = true # Starts Mediator service: <true|false>.
|
||||
# listen=internal # Mediator's listening interface: <internal>.
|
||||
# rater = 127.0.0.1:2012 # Address where to reach the Rater: <internal|x.y.z.y:1234>
|
||||
# rater = internal # Address where to reach the Rater: <internal|x.y.z.y:1234>
|
||||
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
|
||||
# run_ids = # Identifiers of each extra mediation to run on CDRs
|
||||
# reqtype_fields = # Name of request type fields to be used during extra mediation. Use index number in case of .csv cdrs.
|
||||
@@ -95,7 +94,7 @@ enabled = true # Starts Mediator service: <true|false>.
|
||||
[session_manager]
|
||||
enabled = true # Starts SessionManager service: <true|false>.
|
||||
# switch_type = freeswitch # Defines the type of switch behind: <freeswitch>.
|
||||
# rater = 127.0.0.1:2012 # Address where to reach the Rater.
|
||||
# rater = internal # Address where to reach the Rater.
|
||||
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
|
||||
# debit_interval = 5 # Interval to perform debits on.
|
||||
|
||||
@@ -106,11 +105,16 @@ enabled = true # Starts SessionManager service: <true|false>.
|
||||
|
||||
[history_server]
|
||||
enabled = true # Starts History service: <true|false>.
|
||||
# listen = 127.0.0.1:2013 # Listening addres for history server: <internal|x.y.z.y:1234>
|
||||
history_dir = /tmp/cgr_history # Location on disk where to store history files.
|
||||
# save_interval = 1s # Interval to save changed cache into .git archive
|
||||
|
||||
[history_agent]
|
||||
# enabled = false # Starts History as a client: <true|false>.
|
||||
# server = 127.0.0.1:2013 # Address where to reach the master history server: <internal|x.y.z.y:1234>
|
||||
# server = internal # Address where to reach the master history server: <internal|x.y.z.y:1234>
|
||||
|
||||
[mailer]
|
||||
# server = localhost # The server to use when sending emails out
|
||||
# auth_user = cgrates # Authenticate to email server using this user
|
||||
# auth_passwd = CGRateS.org # Authenticate to email server with this password
|
||||
# from_address = cgr-mailer@localhost.localdomain # From address used when sending emails out
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#Tenant,Account,Direction,ActionTimingsTag,ActionTriggersTag
|
||||
#Tenant,Account,Direction,ActionPlanTag,ActionTriggersTag
|
||||
cgrates.org,1001,*out,PREPAID_10,STANDARD_TRIGGERS
|
||||
cgrates.org,1002,*out,PREPAID_10,STANDARD_TRIGGERS
|
||||
cgrates.org,1003,*out,PREPAID_10,STANDARD_TRIGGERS
|
||||
|
||||
|
@@ -24,11 +24,13 @@
|
||||
# stordb_user = cgrates # Username to use when connecting to stordb.
|
||||
# stordb_passwd = CGRateS.org # Password to use when connecting to stordb.
|
||||
# dbdata_encoding = msgpack # The encoding used to store object data in strings: <msgpack|json>
|
||||
# rpc_encoding = json # RPC encoding used on APIs: <gob|json>.
|
||||
# rpc_json_listen = 127.0.0.1:2012 # RPC JSON listening address
|
||||
# rpc_gob_listen = 127.0.0.1:2013 # RPC GOB listening address
|
||||
# http_listen = 127.0.0.1:2080 # HTTP listening address
|
||||
# default_reqtype = rated # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>.
|
||||
# default_tor = 0 # Default Type of Record to consider when missing from requests.
|
||||
# default_tenant = 0 # Default Tenant to consider when missing from requests.
|
||||
# default_subject = 0 # Default rating Subject to consider when missing from requests.
|
||||
# default_tor = call # Default Type of Record to consider when missing from requests.
|
||||
# default_tenant = cgrates.org # Default Tenant to consider when missing from requests.
|
||||
# default_subject = cgrates # Default rating Subject to consider when missing from requests.
|
||||
# rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down>
|
||||
# rounding_decimals = 4 # Number of decimals to round float/costs at
|
||||
|
||||
@@ -38,15 +40,13 @@
|
||||
|
||||
[rater]
|
||||
enabled = true # Enable RaterCDRSExportPath service: <true|false>.
|
||||
# balancer = disabled # Register to Balancer as worker: <enabled|disabled>.
|
||||
# listen = 127.0.0.1:2012 # Rater's listening interface: <internal|x.y.z.y:1234>.
|
||||
# balancer = # Register to Balancer as worker: <""|internal|127.0.0.1:2013>.
|
||||
|
||||
[scheduler]
|
||||
enabled = true # Starts Scheduler service: <true|false>.
|
||||
|
||||
[cdrs]
|
||||
enabled = true # Start the CDR Server service: <true|false>.
|
||||
# listen=127.0.0.1:2022 # CDRS's listening interface: <x.y.z.y:1234>.
|
||||
# extra_fields = # Extra fields to store in CDRs
|
||||
mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
|
||||
|
||||
@@ -57,29 +57,28 @@ export_dir = /tmp # Path where the exported CDRs will be placed
|
||||
|
||||
[cdrc]
|
||||
# enabled = false # Enable CDR client functionality
|
||||
# cdrs = 127.0.0.1:2022 # Address where to reach CDR server
|
||||
# cdrs = internal # Address where to reach CDR server. <internal|127.0.0.1:2080>
|
||||
# cdrs_method = http_cgr # Mechanism to use when posting CDRs on server <http_cgr>
|
||||
# run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify
|
||||
# cdr_type = freeswitch_csv # CDR file format <csv>.
|
||||
# cdr_type = csv # CDR file format <csv|freeswitch_csv>.
|
||||
# cdr_in_dir = /var/log/cgrates/cdr/cdrc/in # Absolute path towards the directory where the CDRs are stored.
|
||||
# cdr_out_dir = /tmp # Absolute path towards the directory where processed CDRs will be moved.
|
||||
# cdr_out_dir = /var/log/cgrates/cdr/cdrc/out # Absolute path towards the directory where processed CDRs will be moved.
|
||||
# cdr_source_id = freeswitch_csv # Free form field, tag identifying the source of the CDRs within CGRS database.
|
||||
# accid_field = 10 # Accounting id field identifier. Use index number in case of .csv cdrs.
|
||||
# reqtype_field = 16 # Request type field identifier. Use index number in case of .csv cdrs.
|
||||
# direction_field = ^*out # Direction field identifier. Use index numbers in case of .csv cdrs.
|
||||
# tenant_field = ^cgrates.org # Tenant field identifier. Use index numbers in case of .csv cdrs.
|
||||
# tor_field = ^call # Type of Record field identifier. Use index numbers in case of .csv cdrs.
|
||||
# account_field = 1 # Account field identifier. Use index numbers in case of .csv cdrs.
|
||||
# subject_field = 1 # Subject field identifier. Use index numbers in case of .csv CDRs.
|
||||
# destination_field = 2 # Destination field identifier. Use index numbers in case of .csv cdrs.
|
||||
# answer_time_field = 5 # Answer time field identifier. Use index numbers in case of .csv cdrs.
|
||||
# duration_field = 8 # Duration field identifier. Use index numbers in case of .csv cdrs.
|
||||
# extra_fields = read_codec:13,write_codec:14 # Extra fields identifiers. For .csv, format: <label_extrafield1>:<index_extrafield_1>
|
||||
# accid_field = 0 # Accounting id field identifier. Use index number in case of .csv cdrs.
|
||||
# reqtype_field = 1 # Request type field identifier. Use index number in case of .csv cdrs.
|
||||
# direction_field = 2 # Direction field identifier. Use index numbers in case of .csv cdrs.
|
||||
# tenant_field = 3 # Tenant field identifier. Use index numbers in case of .csv cdrs.
|
||||
# tor_field = 4 # Type of Record field identifier. Use index numbers in case of .csv cdrs.
|
||||
# account_field = 5 # Account field identifier. Use index numbers in case of .csv cdrs.
|
||||
# subject_field = 6 # Subject field identifier. Use index numbers in case of .csv CDRs.
|
||||
# destination_field = 7 # Destination field identifier. Use index numbers in case of .csv cdrs.
|
||||
# answer_time_field = 8 # Answer time field identifier. Use index numbers in case of .csv cdrs.
|
||||
# duration_field = 9 # Duration field identifier. Use index numbers in case of .csv cdrs.
|
||||
# extra_fields = # Extra fields identifiers. For .csv, format: <label_extrafield_1>:<index_extrafield_1>[...,<label_extrafield_n>:<index_extrafield_n>]
|
||||
|
||||
[mediator]
|
||||
enabled = true # Starts Mediator service: <true|false>.
|
||||
# listen=internal # Mediator's listening interface: <internal>.
|
||||
# rater = 127.0.0.1:2012 # Address where to reach the Rater: <internal|x.y.z.y:1234>
|
||||
# rater = internal # Address where to reach the Rater: <internal|x.y.z.y:1234>
|
||||
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
|
||||
# run_ids = # Identifiers of each extra mediation to run on CDRs
|
||||
# reqtype_fields = # Name of request type fields to be used during extra mediation. Use index number in case of .csv cdrs.
|
||||
@@ -95,7 +94,7 @@ enabled = true # Starts Mediator service: <true|false>.
|
||||
[session_manager]
|
||||
enabled = true # Starts SessionManager service: <true|false>.
|
||||
# switch_type = freeswitch # Defines the type of switch behind: <freeswitch>.
|
||||
# rater = 127.0.0.1:2012 # Address where to reach the Rater.
|
||||
# rater = internal # Address where to reach the Rater.
|
||||
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
|
||||
# debit_interval = 5 # Interval to perform debits on.
|
||||
|
||||
@@ -106,11 +105,16 @@ enabled = true # Starts SessionManager service: <true|false>.
|
||||
|
||||
[history_server]
|
||||
enabled = true # Starts History service: <true|false>.
|
||||
# listen = 127.0.0.1:2013 # Listening addres for history server: <internal|x.y.z.y:1234>
|
||||
history_dir = /tmp/cgr_history # Location on disk where to store history files.
|
||||
# save_interval = 1s # Interval to save changed cache into .git archive
|
||||
|
||||
[history_agent]
|
||||
# enabled = false # Starts History as a client: <true|false>.
|
||||
# server = 127.0.0.1:2013 # Address where to reach the master history server: <internal|x.y.z.y:1234>
|
||||
# server = internal # Address where to reach the master history server: <internal|x.y.z.y:1234>
|
||||
|
||||
[mailer]
|
||||
# server = localhost # The server to use when sending emails out
|
||||
# auth_user = cgrates # Authenticate to email server using this user
|
||||
# auth_passwd = CGRateS.org # Authenticate to email server with this password
|
||||
# from_address = cgr-mailer@localhost.localdomain # From address used when sending emails out
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#Tenant,Account,Direction,ActionTimingsTag,ActionTriggersTag
|
||||
#Tenant,Account,Direction,ActionPlanTag,ActionTriggersTag
|
||||
cgrates.org,1001,*out,PREPAID_10,STANDARD_TRIGGERS
|
||||
cgrates.org,1002,*out,PREPAID_10,STANDARD_TRIGGERS
|
||||
cgrates.org,1003,*out,PREPAID_10,STANDARD_TRIGGERS
|
||||
|
||||
|
@@ -21,15 +21,15 @@ package engine
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"net/http"
|
||||
"net/smtp"
|
||||
"sort"
|
||||
"time"
|
||||
"strings"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"time"
|
||||
)
|
||||
|
||||
/*
|
||||
@@ -101,7 +101,7 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
|
||||
}
|
||||
|
||||
func logAction(ub *UserBalance, a *Action) (err error) {
|
||||
ubMarshal,_ := json.Marshal(ub)
|
||||
ubMarshal, _ := json.Marshal(ub)
|
||||
Logger.Info(fmt.Sprintf("Threshold reached, balance: %s", ubMarshal))
|
||||
return
|
||||
}
|
||||
@@ -214,7 +214,7 @@ func callUrlAsync(ub *UserBalance, a *Action) error {
|
||||
}
|
||||
time.Sleep(time.Duration(i) * time.Minute)
|
||||
}
|
||||
|
||||
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
@@ -238,8 +238,8 @@ func mailAsync(ub *UserBalance, a *Action) error {
|
||||
}
|
||||
toAddrStr += addr
|
||||
}
|
||||
message := []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on balance: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nBalance:\r\n\t%s\r\n\r\nYours faithfully,\r\nCGR Balance Monitor\r\n", toAddrStr, ub.Id, time.Now(), ubJson))
|
||||
auth := smtp.PlainAuth("", cgrCfg.MailerAuthUser, cgrCfg.MailerAuthPass, strings.Split(cgrCfg.MailerServer,":")[0]) // We only need host part, so ignore port
|
||||
message := []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on balance: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nBalance:\r\n\t%s\r\n\r\nYours faithfully,\r\nCGR Balance Monitor\r\n", toAddrStr, ub.Id, time.Now(), ubJson))
|
||||
auth := smtp.PlainAuth("", cgrCfg.MailerAuthUser, cgrCfg.MailerAuthPass, strings.Split(cgrCfg.MailerServer, ":")[0]) // We only need host part, so ignore port
|
||||
go func() {
|
||||
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
|
||||
if err := smtp.SendMail(cgrCfg.MailerServer, auth, cgrCfg.MailerFromAddr, toAddrs, message); err == nil {
|
||||
@@ -250,11 +250,10 @@ func mailAsync(ub *UserBalance, a *Action) error {
|
||||
}
|
||||
time.Sleep(time.Duration(i) * time.Minute)
|
||||
}
|
||||
}()
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Structure to store actions according to weight
|
||||
type Actions []*Action
|
||||
|
||||
|
||||
@@ -20,11 +20,11 @@ package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -299,12 +299,12 @@ func (at *ActionTiming) String_DISABLED() string {
|
||||
// Helper to remove ActionTiming members based on specific filters, empty data means no always match
|
||||
func RemActionTiming(ats ActionPlan, actionTimingId, balanceId string) ActionPlan {
|
||||
for idx, at := range ats {
|
||||
if len(actionTimingId)!=0 && at.Id!=actionTimingId { // No Match for ActionTimingId, no need to move further
|
||||
if len(actionTimingId) != 0 && at.Id != actionTimingId { // No Match for ActionTimingId, no need to move further
|
||||
continue
|
||||
}
|
||||
if len(balanceId) == 0 { // No account defined, considered match for complete removal
|
||||
if len(ats) == 1 { // Removing last item, by init empty
|
||||
return make([]*ActionTiming,0)
|
||||
return make([]*ActionTiming, 0)
|
||||
}
|
||||
ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1]
|
||||
continue
|
||||
@@ -313,9 +313,9 @@ func RemActionTiming(ats ActionPlan, actionTimingId, balanceId string) ActionPla
|
||||
if blncId == balanceId {
|
||||
if len(at.UserBalanceIds) == 1 { // Only one balance, remove complete at
|
||||
if len(ats) == 1 { // Removing last item, by init empty
|
||||
return make([]*ActionTiming,0)
|
||||
return make([]*ActionTiming, 0)
|
||||
}
|
||||
ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1]
|
||||
ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1]
|
||||
} else {
|
||||
at.UserBalanceIds[iBlnc], at.UserBalanceIds = at.UserBalanceIds[len(at.UserBalanceIds)-1], at.UserBalanceIds[:len(at.UserBalanceIds)-1]
|
||||
}
|
||||
|
||||
@@ -427,21 +427,21 @@ func TestActionTimingsRemoveMember(t *testing.T) {
|
||||
Tag: "test",
|
||||
UserBalanceIds: []string{"one", "two", "three"},
|
||||
ActionsId: "TEST_ACTIONS",
|
||||
}
|
||||
}
|
||||
at2 := &ActionTiming{
|
||||
Id: "some uuid22",
|
||||
Tag: "test2",
|
||||
UserBalanceIds: []string{"three", "four"},
|
||||
ActionsId: "TEST_ACTIONS2",
|
||||
}
|
||||
}
|
||||
ats := ActionPlan{at1, at2}
|
||||
if outAts := RemActionTiming(ats, "", "four"); len(outAts[1].UserBalanceIds) != 1 {
|
||||
if outAts := RemActionTiming(ats, "", "four"); len(outAts[1].UserBalanceIds) != 1 {
|
||||
t.Error("Expecting fewer balance ids", outAts[1].UserBalanceIds)
|
||||
}
|
||||
if ats = RemActionTiming(ats, "", "three"); len(ats) != 1 {
|
||||
t.Error("Expecting fewer actionTimings", ats)
|
||||
}
|
||||
if ats = RemActionTiming(ats, "some_uuid22", "");len(ats) != 1 {
|
||||
if ats = RemActionTiming(ats, "some_uuid22", ""); len(ats) != 1 {
|
||||
t.Error("Expecting fewer actionTimings members", ats)
|
||||
}
|
||||
ats2 := ActionPlan{at1, at2}
|
||||
@@ -450,8 +450,6 @@ func TestActionTimingsRemoveMember(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
func TestActionTriggerMatchNil(t *testing.T) {
|
||||
at := &ActionTrigger{
|
||||
Direction: OUTBOUND,
|
||||
|
||||
@@ -18,7 +18,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
package engine
|
||||
|
||||
import "strings"
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
|
||||
"github.com/cgrates/cgrates/history"
|
||||
)
|
||||
|
||||
/*
|
||||
Structure that gathers multiple destination prefixes under a common id.
|
||||
@@ -53,3 +58,13 @@ func (d *Destination) String() (result string) {
|
||||
func (d *Destination) AddPrefix(pfx string) {
|
||||
d.Prefixes = append(d.Prefixes, pfx)
|
||||
}
|
||||
|
||||
// history record method
|
||||
func (d *Destination) GetHistoryRecord() history.Record {
|
||||
js, _ := json.Marshal(d)
|
||||
return history.Record{
|
||||
Id: d.Id,
|
||||
Filename: history.DESTINATIONS_FN,
|
||||
Payload: js,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,24 +27,26 @@ import (
|
||||
|
||||
func TestHistoryRatinPlans(t *testing.T) {
|
||||
scribe := historyScribe.(*history.MockScribe)
|
||||
if !strings.Contains(scribe.RpBuf.String(), `{"Key":"*out:vdf:0:minu","Object":{"Id":"*out:vdf:0:minu","RatingPlanActivations":[{"ActivationTime":"2012-01-01T00:00:00Z","RatingPlanId":"EVENING","FallbackKeys":null}]}}`) {
|
||||
t.Error("Error in destination history content:", scribe.RpBuf.String())
|
||||
buf := scribe.BufMap[history.RATING_PROFILES_FN]
|
||||
if !strings.Contains(buf.String(), `{"Id":"*out:vdf:0:minu","RatingPlanActivations":[{"ActivationTime":"2012-01-01T00:00:00Z","RatingPlanId":"EVENING","FallbackKeys":null}]}`) {
|
||||
t.Error("Error in destination history content:", buf.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistoryDestinations(t *testing.T) {
|
||||
scribe := historyScribe.(*history.MockScribe)
|
||||
expected := `[{"Key":"ALL","Object":{"Id":"ALL","Prefixes":["49","41","43"]}}
|
||||
{"Key":"GERMANY","Object":{"Id":"GERMANY","Prefixes":["49"]}}
|
||||
{"Key":"GERMANY_O2","Object":{"Id":"GERMANY_O2","Prefixes":["41"]}}
|
||||
{"Key":"GERMANY_PREMIUM","Object":{"Id":"GERMANY_PREMIUM","Prefixes":["43"]}}
|
||||
{"Key":"NAT","Object":{"Id":"NAT","Prefixes":["0256","0257","0723"]}}
|
||||
{"Key":"PSTN_70","Object":{"Id":"PSTN_70","Prefixes":["+4970"]}}
|
||||
{"Key":"PSTN_71","Object":{"Id":"PSTN_71","Prefixes":["+4971"]}}
|
||||
{"Key":"PSTN_72","Object":{"Id":"PSTN_72","Prefixes":["+4972"]}}
|
||||
{"Key":"RET","Object":{"Id":"RET","Prefixes":["0723","0724"]}}
|
||||
{"Key":"nat","Object":{"Id":"nat","Prefixes":["0257","0256","0723"]}}]`
|
||||
if scribe.DestBuf.String() != expected {
|
||||
t.Error("Error in destination history content:", scribe.DestBuf.String())
|
||||
buf := scribe.BufMap[history.DESTINATIONS_FN]
|
||||
expected := `[{"Id":"ALL","Prefixes":["49","41","43"]},
|
||||
{"Id":"GERMANY","Prefixes":["49"]},
|
||||
{"Id":"GERMANY_O2","Prefixes":["41"]},
|
||||
{"Id":"GERMANY_PREMIUM","Prefixes":["43"]},
|
||||
{"Id":"NAT","Prefixes":["0256","0257","0723"]},
|
||||
{"Id":"PSTN_70","Prefixes":["+4970"]},
|
||||
{"Id":"PSTN_71","Prefixes":["+4971"]},
|
||||
{"Id":"PSTN_72","Prefixes":["+4972"]},
|
||||
{"Id":"RET","Prefixes":["0723","0724"]},
|
||||
{"Id":"nat","Prefixes":["0257","0256","0723"]}]`
|
||||
if buf.String() != expected {
|
||||
t.Error("Error in destination history content:", buf.String())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/cgrates/cgrates/history"
|
||||
)
|
||||
|
||||
/*
|
||||
The struture that is saved to storage.
|
||||
*/
|
||||
@@ -97,3 +103,13 @@ func (rp *RatingPlan) AddRateInterval(dId string, ris ...*RateInterval) {
|
||||
func (rp *RatingPlan) Equal(o *RatingPlan) bool {
|
||||
return rp.Id == o.Id
|
||||
}
|
||||
|
||||
// history record method
|
||||
func (rp *RatingPlan) GetHistoryRecord() history.Record {
|
||||
js, _ := json.Marshal(rp)
|
||||
return history.Record{
|
||||
Id: rp.Id,
|
||||
Filename: history.RATING_PLANS_FN,
|
||||
Payload: js,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,12 +19,14 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package engine
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/cache2go"
|
||||
"github.com/cgrates/cgrates/history"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -146,3 +148,13 @@ func (rp *RatingProfile) GetRatingPlansForPrefix(cd *CallDescriptor) (err error)
|
||||
|
||||
return errors.New("not found")
|
||||
}
|
||||
|
||||
// history record method
|
||||
func (rpf *RatingProfile) GetHistoryRecord() history.Record {
|
||||
js, _ := json.Marshal(rpf)
|
||||
return history.Record{
|
||||
Id: rpf.Id,
|
||||
Filename: history.RATING_PROFILES_FN,
|
||||
Payload: js,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
"github.com/cgrates/cgrates/cache2go"
|
||||
"github.com/cgrates/cgrates/history"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -139,7 +138,8 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) {
|
||||
result, err := ms.ms.Marshal(rp)
|
||||
ms.dict[RATING_PLAN_PREFIX+rp.Id] = result
|
||||
response := 0
|
||||
go historyScribe.Record(&history.Record{Key: RATING_PLAN_PREFIX + rp.Id, Object: rp}, &response)
|
||||
|
||||
go historyScribe.Record(rp.GetHistoryRecord(), &response)
|
||||
cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp)
|
||||
return
|
||||
}
|
||||
@@ -167,7 +167,7 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
|
||||
result, err := ms.ms.Marshal(rpf)
|
||||
ms.dict[RATING_PROFILE_PREFIX+rpf.Id] = result
|
||||
response := 0
|
||||
go historyScribe.Record(&history.Record{Key: RATING_PROFILE_PREFIX + rpf.Id, Object: rpf}, &response)
|
||||
go historyScribe.Record(rpf.GetHistoryRecord(), &response)
|
||||
cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf)
|
||||
return
|
||||
}
|
||||
@@ -196,7 +196,7 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) {
|
||||
result, err := ms.ms.Marshal(dest)
|
||||
ms.dict[DESTINATION_PREFIX+dest.Id] = result
|
||||
response := 0
|
||||
go historyScribe.Record(&history.Record{Key: DESTINATION_PREFIX + dest.Id, Object: dest}, &response)
|
||||
go historyScribe.Record(dest.GetHistoryRecord(), &response)
|
||||
cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/history"
|
||||
"labix.org/v2/mgo"
|
||||
"labix.org/v2/mgo/bson"
|
||||
)
|
||||
@@ -138,7 +137,7 @@ func (ms *MongoStorage) GetRatingPlan(key string) (rp *RatingPlan, err error) {
|
||||
func (ms *MongoStorage) SetRatingPlan(rp *RatingPlan) error {
|
||||
if historyScribe != nil {
|
||||
response := 0
|
||||
historyScribe.Record(&history.Record{Key: RATING_PLAN_PREFIX + rp.Id, Object: rp}, &response)
|
||||
historyScribe.Record(rp.GetHistoryRecord(), &response)
|
||||
}
|
||||
return ms.db.C("ratingplans").Insert(rp)
|
||||
}
|
||||
@@ -152,7 +151,7 @@ func (ms *MongoStorage) GetRatingProfile(key string) (rp *RatingProfile, err err
|
||||
func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile) error {
|
||||
if historyScribe != nil {
|
||||
response := 0
|
||||
historyScribe.Record(&history.Record{Key: RATING_PROFILE_PREFIX + rp.Id, Object: rp}, &response)
|
||||
historyScribe.Record(rp.GetHistoryRecord(), &response)
|
||||
}
|
||||
return ms.db.C("ratingprofiles").Insert(rp)
|
||||
}
|
||||
@@ -169,7 +168,7 @@ func (ms *MongoStorage) GetDestination(key string) (result *Destination, err err
|
||||
func (ms *MongoStorage) SetDestination(dest *Destination) error {
|
||||
if historyScribe != nil {
|
||||
response := 0
|
||||
historyScribe.Record(&history.Record{Key: DESTINATION_PREFIX + dest.Id, Object: dest}, &response)
|
||||
historyScribe.Record(dest.GetHistoryRecord(), &response)
|
||||
}
|
||||
return ms.db.C("destinations").Insert(dest)
|
||||
}
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/cache2go"
|
||||
"github.com/cgrates/cgrates/history"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/hoisie/redis"
|
||||
|
||||
@@ -214,7 +213,7 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) {
|
||||
err = rs.db.Set(RATING_PLAN_PREFIX+rp.Id, b.Bytes())
|
||||
if err == nil && historyScribe != nil {
|
||||
response := 0
|
||||
go historyScribe.Record(&history.Record{Key: RATING_PLAN_PREFIX + rp.Id, Object: rp}, &response)
|
||||
go historyScribe.Record(rp.GetHistoryRecord(), &response)
|
||||
}
|
||||
//cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp)
|
||||
return
|
||||
@@ -242,7 +241,7 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
|
||||
err = rs.db.Set(RATING_PROFILE_PREFIX+rpf.Id, result)
|
||||
if err == nil && historyScribe != nil {
|
||||
response := 0
|
||||
go historyScribe.Record(&history.Record{Key: RATING_PROFILE_PREFIX + rpf.Id, Object: rpf}, &response)
|
||||
go historyScribe.Record(rpf.GetHistoryRecord(), &response)
|
||||
}
|
||||
//cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf)
|
||||
return
|
||||
@@ -291,7 +290,7 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) {
|
||||
err = rs.db.Set(DESTINATION_PREFIX+dest.Id, b.Bytes())
|
||||
if err == nil && historyScribe != nil {
|
||||
response := 0
|
||||
go historyScribe.Record(&history.Record{Key: DESTINATION_PREFIX + dest.Id, Object: dest}, &response)
|
||||
go historyScribe.Record(dest.GetHistoryRecord(), &response)
|
||||
}
|
||||
//cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest)
|
||||
return
|
||||
|
||||
@@ -1129,7 +1129,7 @@ func (self *SQLStorage) GetTpAccountActions(aaFltr *utils.TPAccountActions) (map
|
||||
Tenant: tenant,
|
||||
Account: account,
|
||||
Direction: direction,
|
||||
ActionPlanId: action_timings_tag,
|
||||
ActionPlanId: action_timings_tag,
|
||||
ActionTriggersId: action_triggers_tag,
|
||||
}
|
||||
aa[aacts.KeyId()] = aacts
|
||||
|
||||
@@ -47,7 +47,7 @@ var fileHandlers = map[string]func(*TPCSVImporter, string) error{
|
||||
utils.RATING_PLANS_CSV: (*TPCSVImporter).importRatingPlans,
|
||||
utils.RATING_PROFILES_CSV: (*TPCSVImporter).importRatingProfiles,
|
||||
utils.ACTIONS_CSV: (*TPCSVImporter).importActions,
|
||||
utils.ACTION_PLANS_CSV: (*TPCSVImporter).importActionTimings,
|
||||
utils.ACTION_PLANS_CSV: (*TPCSVImporter).importActionTimings,
|
||||
utils.ACTION_TRIGGERS_CSV: (*TPCSVImporter).importActionTriggers,
|
||||
utils.ACCOUNT_ACTIONS_CSV: (*TPCSVImporter).importAccountActions,
|
||||
}
|
||||
|
||||
@@ -22,31 +22,22 @@ import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
DESTINATIONS_FILE = "destinations.json"
|
||||
RATING_PLANS_FILE = "rating_plans.json"
|
||||
RATING_PROFILES_FILE = "rating_profiles.json"
|
||||
)
|
||||
|
||||
type FileScribe struct {
|
||||
mu sync.Mutex
|
||||
fileRoot string
|
||||
gitCommand string
|
||||
destinations records
|
||||
ratingPlans records
|
||||
ratingProfiles records
|
||||
loopChecker chan int
|
||||
waitingFile string
|
||||
savePeriod time.Duration
|
||||
mu sync.Mutex
|
||||
fileRoot string
|
||||
gitCommand string
|
||||
loopChecker chan int
|
||||
waitingFile string
|
||||
savePeriod time.Duration
|
||||
}
|
||||
|
||||
func NewFileScribe(fileRoot string, saveInterval time.Duration) (*FileScribe, error) {
|
||||
@@ -58,32 +49,19 @@ func NewFileScribe(fileRoot string, saveInterval time.Duration) (*FileScribe, er
|
||||
s := &FileScribe{fileRoot: fileRoot, gitCommand: gitCommand, savePeriod: saveInterval}
|
||||
s.loopChecker = make(chan int)
|
||||
s.gitInit()
|
||||
if err := s.load(DESTINATIONS_FILE); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := s.load(RATING_PLANS_FILE); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := s.load(RATING_PROFILES_FILE); err != nil {
|
||||
return nil, err
|
||||
|
||||
for _, fn := range []string{DESTINATIONS_FN, RATING_PLANS_FN, RATING_PROFILES_FN} {
|
||||
if err := s.load(fn); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *FileScribe) Record(rec *Record, out *int) error {
|
||||
func (s *FileScribe) Record(rec Record, out *int) error {
|
||||
s.mu.Lock()
|
||||
var fileToSave string
|
||||
switch {
|
||||
case strings.HasPrefix(rec.Key, DESTINATION_PREFIX):
|
||||
s.destinations = s.destinations.SetOrAdd(&Record{rec.Key[len(DESTINATION_PREFIX):], rec.Object})
|
||||
fileToSave = DESTINATIONS_FILE
|
||||
case strings.HasPrefix(rec.Key, RATING_PLAN_PREFIX):
|
||||
s.ratingPlans = s.ratingPlans.SetOrAdd(&Record{rec.Key[len(RATING_PLAN_PREFIX):], rec.Object})
|
||||
fileToSave = RATING_PLANS_FILE
|
||||
case strings.HasPrefix(rec.Key, RATING_PROFILE_PREFIX):
|
||||
s.ratingProfiles = s.ratingProfiles.SetOrAdd(&Record{rec.Key[len(RATING_PROFILE_PREFIX):], rec.Object})
|
||||
fileToSave = RATING_PROFILES_FILE
|
||||
}
|
||||
fileToSave := rec.Filename
|
||||
recordsMap[fileToSave] = recordsMap[fileToSave].SetOrAdd(&rec)
|
||||
|
||||
// flood protection for save method (do not save on every loop iteration)
|
||||
if s.waitingFile == fileToSave {
|
||||
@@ -126,21 +104,14 @@ func (s *FileScribe) gitInit() error {
|
||||
if out, err := cmd.Output(); err != nil {
|
||||
return errors.New(string(out) + " " + err.Error())
|
||||
}
|
||||
if f, err := os.Create(filepath.Join(s.fileRoot, DESTINATIONS_FILE)); err != nil {
|
||||
return errors.New("<History> Error writing destinations file: " + err.Error())
|
||||
} else {
|
||||
f.Close()
|
||||
}
|
||||
if f, err := os.Create(filepath.Join(s.fileRoot, RATING_PLANS_FILE)); err != nil {
|
||||
return errors.New("<History> Error writing rating plans file: " + err.Error())
|
||||
} else {
|
||||
f.Close()
|
||||
}
|
||||
if f, err := os.Create(filepath.Join(s.fileRoot, RATING_PROFILES_FILE)); err != nil {
|
||||
return errors.New("<History> Error writing rating profiles file: " + err.Error())
|
||||
} else {
|
||||
f.Close()
|
||||
for fn, _ := range recordsMap {
|
||||
if f, err := os.Create(filepath.Join(s.fileRoot, fn)); err != nil {
|
||||
return fmt.Errorf("<History> Error writing %s file: %s", fn, err.Error())
|
||||
} else {
|
||||
f.Close()
|
||||
}
|
||||
}
|
||||
|
||||
cmd = exec.Command(s.gitCommand, "add", ".")
|
||||
cmd.Dir = s.fileRoot
|
||||
if out, err := cmd.Output(); err != nil {
|
||||
@@ -169,23 +140,11 @@ func (s *FileScribe) load(filename string) error {
|
||||
defer f.Close()
|
||||
d := json.NewDecoder(f)
|
||||
|
||||
switch filename {
|
||||
case DESTINATIONS_FILE:
|
||||
if err := d.Decode(&s.destinations); err != nil && err != io.EOF {
|
||||
return errors.New("<History> Error loading destinations: " + err.Error())
|
||||
}
|
||||
s.destinations.Sort()
|
||||
case RATING_PLANS_FILE:
|
||||
if err := d.Decode(&s.ratingPlans); err != nil && err != io.EOF {
|
||||
return errors.New("<History> Error loading rating plans: " + err.Error())
|
||||
}
|
||||
s.ratingPlans.Sort()
|
||||
case RATING_PROFILES_FILE:
|
||||
if err := d.Decode(&s.ratingProfiles); err != nil && err != io.EOF {
|
||||
return errors.New("<History> Error loading rating profiles: " + err.Error())
|
||||
}
|
||||
s.ratingProfiles.Sort()
|
||||
records := recordsMap[filename]
|
||||
if err := d.Decode(&records); err != nil && err != io.EOF {
|
||||
return fmt.Errorf("<History> Error loading %s: %s", filename, err.Error())
|
||||
}
|
||||
records.Sort()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -198,38 +157,11 @@ func (s *FileScribe) save(filename string) error {
|
||||
}
|
||||
|
||||
b := bufio.NewWriter(f)
|
||||
switch filename {
|
||||
case DESTINATIONS_FILE:
|
||||
if err := s.format(b, s.destinations); err != nil {
|
||||
return err
|
||||
}
|
||||
case RATING_PLANS_FILE:
|
||||
if err := s.format(b, s.ratingPlans); err != nil {
|
||||
return err
|
||||
}
|
||||
case RATING_PROFILES_FILE:
|
||||
if err := s.format(b, s.ratingProfiles); err != nil {
|
||||
return err
|
||||
}
|
||||
records := recordsMap[filename]
|
||||
if err := format(b, records); err != nil {
|
||||
return err
|
||||
}
|
||||
b.Flush()
|
||||
f.Close()
|
||||
return s.gitCommit()
|
||||
}
|
||||
|
||||
func (s *FileScribe) format(b io.Writer, recs records) error {
|
||||
recs.Sort()
|
||||
b.Write([]byte("["))
|
||||
for i, r := range recs {
|
||||
src, err := json.Marshal(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.Write(src)
|
||||
if i < len(recs)-1 {
|
||||
b.Write([]byte(",\n"))
|
||||
}
|
||||
}
|
||||
b.Write([]byte("]"))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -21,85 +21,38 @@ package history
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type MockScribe struct {
|
||||
mu sync.Mutex
|
||||
destinations records
|
||||
ratingPlans records
|
||||
ratingProfiles records
|
||||
DestBuf bytes.Buffer
|
||||
RplBuf bytes.Buffer
|
||||
RpBuf bytes.Buffer
|
||||
mu sync.Mutex
|
||||
BufMap map[string]*bytes.Buffer
|
||||
}
|
||||
|
||||
func NewMockScribe() (*MockScribe, error) {
|
||||
return &MockScribe{}, nil
|
||||
return &MockScribe{BufMap: map[string]*bytes.Buffer{
|
||||
DESTINATIONS_FN: bytes.NewBuffer(nil),
|
||||
RATING_PLANS_FN: bytes.NewBuffer(nil),
|
||||
RATING_PROFILES_FN: bytes.NewBuffer(nil),
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func (s *MockScribe) Record(rec *Record, out *int) error {
|
||||
switch {
|
||||
case strings.HasPrefix(rec.Key, DESTINATION_PREFIX):
|
||||
s.destinations = s.destinations.SetOrAdd(&Record{rec.Key[len(DESTINATION_PREFIX):], rec.Object})
|
||||
s.save(DESTINATIONS_FILE)
|
||||
case strings.HasPrefix(rec.Key, RATING_PLAN_PREFIX):
|
||||
s.ratingPlans = s.ratingPlans.SetOrAdd(&Record{rec.Key[len(RATING_PLAN_PREFIX):], rec.Object})
|
||||
s.save(RATING_PLANS_FILE)
|
||||
case strings.HasPrefix(rec.Key, RATING_PROFILE_PREFIX):
|
||||
s.ratingProfiles = s.ratingProfiles.SetOrAdd(&Record{rec.Key[len(RATING_PROFILE_PREFIX):], rec.Object})
|
||||
s.save(RATING_PROFILES_FILE)
|
||||
}
|
||||
*out = 0
|
||||
func (s *MockScribe) Record(rec Record, out *int) error {
|
||||
fn := rec.Filename
|
||||
recordsMap[fn] = recordsMap[fn].SetOrAdd(&rec)
|
||||
s.save(fn)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MockScribe) save(filename string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
switch filename {
|
||||
case DESTINATIONS_FILE:
|
||||
s.DestBuf.Reset()
|
||||
b := bufio.NewWriter(&s.DestBuf)
|
||||
defer b.Flush()
|
||||
if err := s.format(b, s.destinations); err != nil {
|
||||
return err
|
||||
}
|
||||
case RATING_PLANS_FILE:
|
||||
s.RplBuf.Reset()
|
||||
b := bufio.NewWriter(&s.RplBuf)
|
||||
defer b.Flush()
|
||||
if err := s.format(b, s.ratingPlans); err != nil {
|
||||
return err
|
||||
}
|
||||
case RATING_PROFILES_FILE:
|
||||
s.RpBuf.Reset()
|
||||
b := bufio.NewWriter(&s.RpBuf)
|
||||
defer b.Flush()
|
||||
if err := s.format(b, s.ratingProfiles); err != nil {
|
||||
return err
|
||||
}
|
||||
records := recordsMap[filename]
|
||||
s.BufMap[filename].Reset()
|
||||
b := bufio.NewWriter(s.BufMap[filename])
|
||||
defer b.Flush()
|
||||
if err := format(b, records); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MockScribe) format(b io.Writer, recs records) error {
|
||||
recs.Sort()
|
||||
b.Write([]byte("["))
|
||||
for i, r := range recs {
|
||||
src, err := json.Marshal(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.Write(src)
|
||||
if i < len(recs)-1 {
|
||||
b.Write([]byte("\n"))
|
||||
}
|
||||
}
|
||||
b.Write([]byte("]"))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -20,11 +20,6 @@ package history
|
||||
|
||||
import "net/rpc"
|
||||
|
||||
const (
|
||||
JSON = "json"
|
||||
GOB = "gob"
|
||||
)
|
||||
|
||||
type ProxyScribe struct {
|
||||
Client *rpc.Client
|
||||
}
|
||||
@@ -38,6 +33,6 @@ func NewProxyScribe(addr string) (*ProxyScribe, error) {
|
||||
return &ProxyScribe{Client: client}, nil
|
||||
}
|
||||
|
||||
func (ps *ProxyScribe) Record(rec *Record, out *int) error {
|
||||
func (ps *ProxyScribe) Record(rec Record, out *int) error {
|
||||
return ps.Client.Call("Scribe.Record", rec, out)
|
||||
}
|
||||
|
||||
@@ -19,26 +19,34 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package history
|
||||
|
||||
import (
|
||||
"io"
|
||||
"reflect"
|
||||
"sort"
|
||||
)
|
||||
|
||||
const (
|
||||
RATING_PLAN_PREFIX = "rpl_"
|
||||
RATING_PROFILE_PREFIX = "rpf_"
|
||||
DESTINATION_PREFIX = "dst_"
|
||||
DESTINATIONS_FN = "destinations.json"
|
||||
RATING_PLANS_FN = "rating_plans.json"
|
||||
RATING_PROFILES_FN = "rating_profiles.json"
|
||||
)
|
||||
|
||||
type Scribe interface {
|
||||
Record(record *Record, out *int) error
|
||||
Record(Record, *int) error
|
||||
}
|
||||
|
||||
type Record struct {
|
||||
Key string
|
||||
Object interface{}
|
||||
Id string
|
||||
Filename string
|
||||
Payload []byte
|
||||
}
|
||||
|
||||
type records []*Record
|
||||
|
||||
var (
|
||||
recordsMap = make(map[string]records)
|
||||
filenameMap = make(map[reflect.Type]string)
|
||||
)
|
||||
|
||||
func (rs records) Len() int {
|
||||
return len(rs)
|
||||
}
|
||||
@@ -48,7 +56,7 @@ func (rs records) Swap(i, j int) {
|
||||
}
|
||||
|
||||
func (rs records) Less(i, j int) bool {
|
||||
return rs[i].Key < rs[j].Key
|
||||
return rs[i].Id < rs[j].Id
|
||||
}
|
||||
|
||||
func (rs records) Sort() {
|
||||
@@ -58,9 +66,9 @@ func (rs records) Sort() {
|
||||
func (rs records) SetOrAdd(rec *Record) records {
|
||||
//rs.Sort()
|
||||
n := len(rs)
|
||||
i := sort.Search(n, func(i int) bool { return rs[i].Key >= rec.Key })
|
||||
if i < n && rs[i].Key == rec.Key {
|
||||
rs[i].Object = rec.Object
|
||||
i := sort.Search(n, func(i int) bool { return rs[i].Id >= rec.Id })
|
||||
if i < n && rs[i].Id == rec.Id {
|
||||
rs[i] = rec
|
||||
} else {
|
||||
// i is the index where it would be inserted.
|
||||
rs = append(rs, nil)
|
||||
@@ -70,17 +78,15 @@ func (rs records) SetOrAdd(rec *Record) records {
|
||||
return rs
|
||||
}
|
||||
|
||||
func (rs records) SetOrAddOld(rec *Record) records {
|
||||
found := false
|
||||
for _, r := range rs {
|
||||
if r.Key == rec.Key {
|
||||
found = true
|
||||
r.Object = rec.Object
|
||||
return rs
|
||||
func format(b io.Writer, recs records) error {
|
||||
recs.Sort()
|
||||
b.Write([]byte("["))
|
||||
for i, r := range recs {
|
||||
b.Write(r.Payload)
|
||||
if i < len(recs)-1 {
|
||||
b.Write([]byte(",\n"))
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
rs = append(rs, rec)
|
||||
}
|
||||
return rs
|
||||
b.Write([]byte("]"))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -24,17 +24,19 @@ import (
|
||||
)
|
||||
|
||||
func TestHistorySet(t *testing.T) {
|
||||
rs := records{&Record{"first", "test"}}
|
||||
rs.SetOrAdd(&Record{"first", "new value"})
|
||||
if len(rs) != 1 || rs[0].Object != "new value" {
|
||||
rs := records{&Record{Id: "first"}}
|
||||
second := &Record{Id: "first"}
|
||||
rs.SetOrAdd(second)
|
||||
if len(rs) != 1 || rs[0] != second {
|
||||
t.Error("error setting new value: ", rs[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistoryAdd(t *testing.T) {
|
||||
rs := records{&Record{"first", "test"}}
|
||||
rs = rs.SetOrAdd(&Record{"second", "new value"})
|
||||
if len(rs) != 2 || rs[1].Object != "new value" {
|
||||
rs := records{&Record{Id: "first"}}
|
||||
second := &Record{Id: "second"}
|
||||
rs = rs.SetOrAdd(second)
|
||||
if len(rs) != 2 || rs[1] != second {
|
||||
t.Error("error setting new value: ", rs)
|
||||
}
|
||||
}
|
||||
@@ -42,19 +44,9 @@ func TestHistoryAdd(t *testing.T) {
|
||||
func BenchmarkSetOrAdd(b *testing.B) {
|
||||
var rs records
|
||||
for i := 0; i < 1000; i++ {
|
||||
rs = rs.SetOrAdd(&Record{strconv.Itoa(i), strconv.Itoa(i)})
|
||||
rs = rs.SetOrAdd(&Record{Id: strconv.Itoa(i)})
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
rs.SetOrAdd(&Record{"400", "test"})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSetOrAddOld(b *testing.B) {
|
||||
var rs records
|
||||
for i := 0; i < 1000; i++ {
|
||||
rs = rs.SetOrAddOld(&Record{strconv.Itoa(i), strconv.Itoa(i)})
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
rs.SetOrAddOld(&Record{"400", "test"})
|
||||
rs.SetOrAdd(&Record{Id: "400"})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,7 +133,6 @@ func (sm *FSSessionManager) setMaxCallDuration(uuid string, maxDur time.Duration
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
// Sends the transfer command to unpark the call to freeswitch
|
||||
func (sm *FSSessionManager) unparkCall(uuid, call_dest_nb, notify string) {
|
||||
err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify))
|
||||
@@ -175,7 +174,7 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) {
|
||||
Destination: ev.GetDestination(),
|
||||
TimeStart: startTime,
|
||||
TimeEnd: startTime.Add(cfg.SMMaxCallDuration),
|
||||
}
|
||||
}
|
||||
var remainingDurationFloat float64
|
||||
err = sm.connector.GetMaxSessionTime(cd, &remainingDurationFloat)
|
||||
if err != nil {
|
||||
|
||||
@@ -221,9 +221,9 @@ type TPAction struct {
|
||||
}
|
||||
|
||||
type TPActionPlan struct {
|
||||
TPid string // Tariff plan id
|
||||
Id string // ActionPlan id
|
||||
ActionPlan []*TPActionTiming // Set of ActionTiming bindings this profile will group
|
||||
TPid string // Tariff plan id
|
||||
Id string // ActionPlan id
|
||||
ActionPlan []*TPActionTiming // Set of ActionTiming bindings this profile will group
|
||||
}
|
||||
|
||||
type TPActionTiming struct {
|
||||
@@ -266,7 +266,7 @@ type TPAccountActions struct {
|
||||
Tenant string // Tenant's Id
|
||||
Account string // Account name
|
||||
Direction string // Traffic direction
|
||||
ActionPlanId string // Id of ActionPlan profile to use
|
||||
ActionPlanId string // Id of ActionPlan profile to use
|
||||
ActionTriggersId string // Id of ActionTriggers profile to use
|
||||
}
|
||||
|
||||
|
||||
@@ -64,6 +64,7 @@ const (
|
||||
JSON = "json"
|
||||
MSGPACK = "msgpack"
|
||||
CSV_LOAD = "CSVLOAD"
|
||||
CGRID = "cgrid"
|
||||
ACCID = "accid"
|
||||
CDRHOST = "cdrhost"
|
||||
CDRSOURCE = "cdrsource"
|
||||
@@ -80,6 +81,7 @@ const (
|
||||
STATIC_VALUE_PREFIX = "^"
|
||||
CDRE_CSV = "csv"
|
||||
CDRE_DRYRUN = "dry_run"
|
||||
INTERNAL = "internal"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
@@ -184,6 +184,6 @@ func ParseDurationWithSecs(durStr string) (time.Duration, error) {
|
||||
return time.ParseDuration(durStr)
|
||||
}
|
||||
|
||||
func BalanceKey(tenant, account, direction string ) string {
|
||||
func BalanceKey(tenant, account, direction string) string {
|
||||
return fmt.Sprintf("%s:%s:%s", direction, tenant, account)
|
||||
}
|
||||
|
||||
@@ -19,6 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package utils
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -127,3 +129,24 @@ func (ratedCdr *RatedCDR) GetExtraFields() map[string]string {
|
||||
func (ratedCdr *RatedCDR) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld, accountFld, subjectFld, destFld, answerTimeFld, durationFld string, extraFlds []string, fieldsMandatory bool) (*RatedCDR, error) {
|
||||
return ratedCdr, nil
|
||||
}
|
||||
|
||||
// Converts part of the rated Cdr as httpForm used to post remotely to CDRS
|
||||
func (ratedCdr *RatedCDR) AsRawCdrHttpForm() url.Values {
|
||||
v := url.Values{}
|
||||
v.Set(ACCID, ratedCdr.AccId)
|
||||
v.Set(CDRHOST, ratedCdr.CdrHost)
|
||||
v.Set(CDRSOURCE, ratedCdr.CdrSource)
|
||||
v.Set(REQTYPE, ratedCdr.ReqType)
|
||||
v.Set(DIRECTION, ratedCdr.Direction)
|
||||
v.Set(TENANT, ratedCdr.Tenant)
|
||||
v.Set(TOR, ratedCdr.TOR)
|
||||
v.Set(ACCOUNT, ratedCdr.Account)
|
||||
v.Set(SUBJECT, ratedCdr.Subject)
|
||||
v.Set(DESTINATION, ratedCdr.Destination)
|
||||
v.Set(ANSWER_TIME, ratedCdr.AnswerTime.String())
|
||||
v.Set(DURATION, strconv.FormatFloat(ratedCdr.Duration.Seconds(), 'f', -1, 64))
|
||||
for fld, val := range ratedCdr.ExtraFields {
|
||||
v.Set(fld, val)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
@@ -98,3 +98,53 @@ func TestRatedCdrFields(t *testing.T) {
|
||||
t.Error("Error parsing cdr: ", ratedCdr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAsRawCdrHttpForm(t *testing.T) {
|
||||
ratedCdr := RatedCDR{CgrId: FSCgrId("dsafdsaf"), AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org",
|
||||
TOR: "call", Account: "1001", Subject: "1001", Destination: "1002", AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
|
||||
Duration: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
|
||||
}
|
||||
cdrForm := ratedCdr.AsRawCdrHttpForm()
|
||||
if cdrForm.Get(ACCID) != ratedCdr.AccId {
|
||||
t.Errorf("Expected: %s, received: %s", ratedCdr.AccId, cdrForm.Get(ACCID))
|
||||
}
|
||||
if cdrForm.Get(CDRHOST) != ratedCdr.CdrHost {
|
||||
t.Errorf("Expected: %s, received: %s", ratedCdr.CdrHost, cdrForm.Get(CDRHOST))
|
||||
}
|
||||
if cdrForm.Get(CDRSOURCE) != ratedCdr.CdrSource {
|
||||
t.Errorf("Expected: %s, received: %s", ratedCdr.CdrSource, cdrForm.Get(CDRSOURCE))
|
||||
}
|
||||
if cdrForm.Get(REQTYPE) != ratedCdr.ReqType {
|
||||
t.Errorf("Expected: %s, received: %s", ratedCdr.ReqType, cdrForm.Get(REQTYPE))
|
||||
}
|
||||
if cdrForm.Get(DIRECTION) != ratedCdr.Direction {
|
||||
t.Errorf("Expected: %s, received: %s", ratedCdr.Direction, cdrForm.Get(DIRECTION))
|
||||
}
|
||||
if cdrForm.Get(TENANT) != ratedCdr.Tenant {
|
||||
t.Errorf("Expected: %s, received: %s", ratedCdr.Tenant, cdrForm.Get(TENANT))
|
||||
}
|
||||
if cdrForm.Get(TOR) != ratedCdr.TOR {
|
||||
t.Errorf("Expected: %s, received: %s", ratedCdr.TOR, cdrForm.Get(TOR))
|
||||
}
|
||||
if cdrForm.Get(ACCOUNT) != ratedCdr.Account {
|
||||
t.Errorf("Expected: %s, received: %s", ratedCdr.Account, cdrForm.Get(ACCOUNT))
|
||||
}
|
||||
if cdrForm.Get(SUBJECT) != ratedCdr.Subject {
|
||||
t.Errorf("Expected: %s, received: %s", ratedCdr.Subject, cdrForm.Get(SUBJECT))
|
||||
}
|
||||
if cdrForm.Get(DESTINATION) != ratedCdr.Destination {
|
||||
t.Errorf("Expected: %s, received: %s", ratedCdr.Destination, cdrForm.Get(DESTINATION))
|
||||
}
|
||||
if cdrForm.Get(ANSWER_TIME) != "2013-11-07 08:42:26 +0000 UTC" {
|
||||
t.Errorf("Expected: %s, received: %s", "2013-11-07 08:42:26 +0000 UTC", cdrForm.Get(ANSWER_TIME))
|
||||
}
|
||||
if cdrForm.Get(DURATION) != "10" {
|
||||
t.Errorf("Expected: %s, received: %s", "10", cdrForm.Get(DURATION))
|
||||
}
|
||||
if cdrForm.Get("field_extr1") != ratedCdr.ExtraFields["field_extr1"] {
|
||||
t.Errorf("Expected: %s, received: %s", ratedCdr.ExtraFields["field_extr1"], cdrForm.Get("field_extr1"))
|
||||
}
|
||||
if cdrForm.Get("fieldextr2") != ratedCdr.ExtraFields["fieldextr2"] {
|
||||
t.Errorf("Expected: %s, received: %s", ratedCdr.ExtraFields["fieldextr2"], cdrForm.Get("fieldextr2"))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user