From 4eee6ddd0e8a9c5fdc03c8ecd155e5404990de17 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 27 Dec 2013 20:02:47 +0100 Subject: [PATCH] Adding static field values for multiple mediation in configuration, fmt on sources --- apier/v1/apier.go | 108 +++++++++++++++++------------------ apier/v1/apier_local_test.go | 32 +++++------ cdrc/cdrc.go | 4 +- cdrs/fscdr.go | 6 +- console/get_cost.go | 1 - docs/cdrserver.rst | 5 +- engine/storage_sql.go | 6 +- mediator/mediator.go | 18 +++--- utils/apitpdata.go | 18 ++++-- utils/cgrcdr.go | 48 +++++++++++----- utils/cgrcdr_test.go | 29 +++++++--- utils/consts.go | 3 +- utils/coreutils.go | 8 ++- utils/ratedcdr.go | 30 +++++----- utils/ratedcdr_test.go | 15 +++-- utils/rawcdr.go | 2 +- utils/utils_test.go | 2 - 17 files changed, 189 insertions(+), 146 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 6b618fe07..8b776f07b 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -21,14 +21,11 @@ package apier import ( "errors" "fmt" - "strings" - "time" + "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/cgrates/cache2go" - "path" ) @@ -186,11 +183,11 @@ func (self *ApierV1) LoadRatingProfile(attrs utils.TPRatingProfile, reply *strin } type AttrSetRatingProfile struct { - Tenant string // Tenant's Id - TOR string // TypeOfRecord - Direction string // Traffic direction, OUT is the only one supported for now - Subject string // Rating subject, usually the same as account - Overwrite bool // Overwrite if exists + Tenant string // Tenant's Id + TOR string // TypeOfRecord + Direction string // Traffic direction, OUT is the only one supported for now + Subject string // Rating subject, usually the same as account + Overwrite bool // Overwrite if exists RatingPlanActivations []*utils.TPRatingActivation // Activate rate profiles at specific time } @@ -215,17 +212,17 @@ func (self *ApierV1) SetRatingProfile(attrs AttrSetRatingProfile, reply *string) } rpfl := &engine.RatingProfile{Id: keyId, RatingPlanActivations: make(engine.RatingPlanActivations, len(attrs.RatingPlanActivations))} for idx, ra := range attrs.RatingPlanActivations { - at, err := utils.ParseDate(ra.ActivationTime) - if err != nil { - return fmt.Errorf(fmt.Sprintf("%s:Cannot parse activation time from %v", utils.ERR_SERVER_ERROR, ra.ActivationTime)) - } - if exists, err := self.RatingDb.ExistsData(engine.RATING_PLAN_PREFIX, ra.RatingPlanId); err != nil { - return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) - } else if !exists { - return fmt.Errorf(fmt.Sprintf("%s:RatingPlanId:%s", utils.ERR_NOT_FOUND, ra.RatingPlanId)) - } - rpfl.RatingPlanActivations[idx] = &engine.RatingPlanActivation{ActivationTime: at, RatingPlanId: ra.RatingPlanId, - FallbackKeys: utils.FallbackSubjKeys(tpRpf.Direction, tpRpf.Tenant, tpRpf.TOR, ra.FallbackSubjects)} + at, err := utils.ParseDate(ra.ActivationTime) + if err != nil { + return fmt.Errorf(fmt.Sprintf("%s:Cannot parse activation time from %v", utils.ERR_SERVER_ERROR, ra.ActivationTime)) + } + if exists, err := self.RatingDb.ExistsData(engine.RATING_PLAN_PREFIX, ra.RatingPlanId); err != nil { + return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) + } else if !exists { + return fmt.Errorf(fmt.Sprintf("%s:RatingPlanId:%s", utils.ERR_NOT_FOUND, ra.RatingPlanId)) + } + rpfl.RatingPlanActivations[idx] = &engine.RatingPlanActivation{ActivationTime: at, RatingPlanId: ra.RatingPlanId, + FallbackKeys: utils.FallbackSubjKeys(tpRpf.Direction, tpRpf.Tenant, tpRpf.TOR, ra.FallbackSubjects)} } if err := self.RatingDb.SetRatingProfile(rpfl); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) @@ -236,7 +233,7 @@ func (self *ApierV1) SetRatingProfile(attrs AttrSetRatingProfile, reply *string) type AttrSetActions struct { ActionsId string // Actions id - Overwrite bool // If previously defined, will be overwritten + Overwrite bool // If previously defined, will be overwritten Actions []*utils.TPAction // Set of actions this Actions profile will perform } @@ -288,18 +285,18 @@ func (self *ApierV1) SetActions(attrs AttrSetActions, reply *string) error { } type AttrSetActionTimings struct { - ActionTimingsId string // Profile id - Overwrite bool // If previously defined, will be overwritten + ActionTimingsId string // Profile id + Overwrite bool // If previously defined, will be overwritten ActionTimings []*ApiActionTiming // Set of actions this Actions profile will perform } type ApiActionTiming struct { ActionsId string // Actions id - Years string // semicolon separated list of years this timing is valid on, *any or empty supported - Months string // semicolon separated list of months this timing is valid on, *any or empty supported - MonthDays string // semicolon separated list of month's days this timing is valid on, *any or empty supported - WeekDays string // semicolon separated list of week day names this timing is valid on *any or empty supported - Time string // String representing the time this timing starts on, *asap supported + Years string // semicolon separated list of years this timing is valid on, *any or empty supported + Months string // semicolon separated list of months this timing is valid on, *any or empty supported + MonthDays string // semicolon separated list of month's days this timing is valid on, *any or empty supported + WeekDays string // semicolon separated list of week day names this timing is valid on *any or empty supported + Time string // String representing the time this timing starts on, *asap supported Weight float64 // Binding's weight } @@ -334,10 +331,10 @@ func (self *ApierV1) SetActionTimings(attrs AttrSetActionTimings, reply *string) timing.WeekDays.Parse(apiAtm.WeekDays, ";") timing.StartTime = apiAtm.Time at := &engine.ActionTiming{ - Id: utils.GenUUID(), - Tag: attrs.ActionTimingsId, - Weight: apiAtm.Weight, - Timing: &engine.RateInterval{Timing:timing}, + Id: utils.GenUUID(), + Tag: attrs.ActionTimingsId, + Weight: apiAtm.Weight, + Timing: &engine.RateInterval{Timing: timing}, ActionsId: apiAtm.ActionsId, } storeAtms[idx] = at @@ -513,8 +510,6 @@ func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) erro return nil } - - func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.CacheStats) error { cs := new(utils.CacheStats) cs.Destinations = cache2go.CountEntries(engine.DESTINATION_PREFIX) @@ -525,34 +520,35 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach return nil } -func (self *ApierV1) GetCachedItemAge(attrs utils.AttrCachedItemAge, reply *time.Duration) error { - if missing := utils.MissingStructFields(&attrs, []string{"Category", "ItemId"}); len(missing) != 0 { - return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) +func (self *ApierV1) GetCachedItemAge(itemId string, reply *utils.CachedItemAge) error { + if len(itemId) == 0 { + return fmt.Errorf("%s:ItemId", utils.ERR_MANDATORY_IE_MISSING) } - cacheKey := "" - switch attrs.Category { - case strings.TrimSuffix(utils.DESTINATIONS_CSV, ".csv"): - cacheKey = engine.DESTINATION_PREFIX + attrs.ItemId - case strings.TrimSuffix(utils.RATING_PLANS_CSV, ".csv"): - cacheKey = engine.RATING_PLAN_PREFIX + attrs.ItemId - case strings.TrimSuffix(utils.RATING_PROFILES_CSV, ".csv"): - cacheKey = engine.RATING_PROFILE_PREFIX + attrs.ItemId - case strings.TrimSuffix(utils.ACTIONS_CSV, ".csv"): - cacheKey = engine.ACTION_PREFIX + attrs.ItemId + cachedItemAge := new(utils.CachedItemAge) + var found bool + for idx, cacheKey := range []string{ engine.DESTINATION_PREFIX+itemId, engine.RATING_PLAN_PREFIX+itemId, engine.RATING_PROFILE_PREFIX+itemId, + engine.ACTION_PREFIX+itemId} { + if age, err := cache2go.GetKeyAge(cacheKey); err == nil { + found = true + switch idx { + case 0: + cachedItemAge.Destination = age + case 1: + cachedItemAge.RatingPlan = age + case 2: + cachedItemAge.RatingProfile = age + case 3: + cachedItemAge.Action = age + } + } } - if len(cacheKey) == 0 { - return fmt.Errorf("%s:Category", utils.ERR_MANDATORY_IE_MISSING) - } - //engine.Logger.Debug(fmt.Sprintf("Will query cache age with: %s", cacheKey)) - if age, err := cache2go.GetKeyAge(cacheKey); err != nil { - return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) - } else { - *reply = age + if !found { + return errors.New(utils.ERR_NOT_FOUND) } + *reply = *cachedItemAge return nil } - type AttrLoadTPFromFolder struct { FolderPath string // Take files from folder absolute path DryRun bool // Do not write to database but parse only diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index ead7c8352..3074760c7 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -29,9 +29,8 @@ import ( "os/exec" "path" "reflect" - "strings" - "time" "testing" + "time" ) // ToDo: Replace rpc.Client with internal rpc server and Apier using internal map as both data and stor so we can run the tests non-local @@ -750,7 +749,7 @@ func TestApierSetRatingProfile(t *testing.T) { t.Error("Calling ApierV1.SetRatingProfile got reply: ", reply) } // Calling the second time should raise EXISTS - if err := rater.Call("ApierV1.SetRatingProfile", rpf, &reply); err == nil || err.Error() != "EXISTS"{ + if err := rater.Call("ApierV1.SetRatingProfile", rpf, &reply); err == nil || err.Error() != "EXISTS" { t.Error("Unexpected result on duplication: ", err.Error()) } } @@ -803,7 +802,7 @@ func TestApierGetCacheStats(t *testing.T) { return } var rcvStats *utils.CacheStats - expectedStats := &utils.CacheStats{Destinations:4, RatingPlans: 1, RatingProfiles: 2, Actions: 1} + expectedStats := &utils.CacheStats{Destinations: 4, RatingPlans: 1, RatingProfiles: 2, Actions: 1} var args utils.AttrCacheStats if err := rater.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil { t.Error("Got error on ApierV1.GetCacheStats: ", err.Error()) @@ -816,19 +815,20 @@ func TestApierGetCachedItemAge(t *testing.T) { if !*testLocal { return } - var rcvAge *time.Duration - qryData := &utils.AttrCachedItemAge{Category: strings.TrimSuffix(utils.DESTINATIONS_CSV, ".csv"), ItemId: "+4917"} // Destinations are cached per prefix not id - if err := rater.Call("ApierV1.GetCachedItemAge", qryData, &rcvAge); err != nil { + var rcvAge *utils.CachedItemAge + if err := rater.Call("ApierV1.GetCachedItemAge", "+4917", &rcvAge); err != nil { t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) - } else if *rcvAge > time.Duration(2)*time.Second { + } else if rcvAge.Destination > time.Duration(2)*time.Second { t.Errorf("Cache too old: %d", rcvAge) } - qryData = &utils.AttrCachedItemAge{Category: strings.TrimSuffix(utils.RATING_PLANS_CSV, ".csv"), ItemId: "RETAIL1"} - if err := rater.Call("ApierV1.GetCachedItemAge", qryData, &rcvAge); err != nil { + if err := rater.Call("ApierV1.GetCachedItemAge", "RETAIL1", &rcvAge); err != nil { t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) - } else if *rcvAge > time.Duration(2)*time.Second { + } else if rcvAge.RatingPlan > time.Duration(2)*time.Second { t.Errorf("Cache too old: %d", rcvAge) } + if err := rater.Call("ApierV1.GetCachedItemAge", "DUMMY_DATA", &rcvAge); err == nil || err.Error() != "NOT_FOUND" { + t.Error("Did not get NOT_FOUND: ", err.Error()) + } } // Test here GetDestination @@ -933,8 +933,8 @@ func TestApierSetActions(t *testing.T) { if !*testLocal { return } - act1 := &utils.TPAction {Identifier: engine.TOPUP_RESET, BalanceType: engine.CREDIT, Direction: engine.OUTBOUND, Units: 75.0, ExpiryTime: engine.UNLIMITED, Weight: 20.0} - attrs1 := &AttrSetActions{ActionsId: "ACTS_1", Actions : []*utils.TPAction{act1}} + act1 := &utils.TPAction{Identifier: engine.TOPUP_RESET, BalanceType: engine.CREDIT, Direction: engine.OUTBOUND, Units: 75.0, ExpiryTime: engine.UNLIMITED, Weight: 20.0} + attrs1 := &AttrSetActions{ActionsId: "ACTS_1", Actions: []*utils.TPAction{act1}} reply1 := "" if err := rater.Call("ApierV1.SetActions", attrs1, &reply1); err != nil { t.Error("Got error on ApierV1.SetActions: ", err.Error()) @@ -942,7 +942,7 @@ func TestApierSetActions(t *testing.T) { t.Errorf("Calling ApierV1.SetActions received: %s", reply1) } // Calling the second time should raise EXISTS - if err := rater.Call("ApierV1.SetActions", attrs1, &reply1); err == nil || err.Error() != "EXISTS"{ + if err := rater.Call("ApierV1.SetActions", attrs1, &reply1); err == nil || err.Error() != "EXISTS" { t.Error("Unexpected result on duplication: ", err.Error()) } } @@ -952,7 +952,7 @@ func TestApierSetActionTimings(t *testing.T) { return } atm1 := &ApiActionTiming{ActionsId: "ACTS_1", MonthDays: "1", Time: "00:00:00", Weight: 20.0} - atms1 := &AttrSetActionTimings{ ActionTimingsId: "ATMS_1", ActionTimings: []*ApiActionTiming{atm1} } + atms1 := &AttrSetActionTimings{ActionTimingsId: "ATMS_1", ActionTimings: []*ApiActionTiming{atm1}} reply1 := "" if err := rater.Call("ApierV1.SetActionTimings", atms1, &reply1); err != nil { t.Error("Got error on ApierV1.SetActionTimings: ", err.Error()) @@ -960,7 +960,7 @@ func TestApierSetActionTimings(t *testing.T) { t.Errorf("Calling ApierV1.SetActionTimings received: %s", reply1) } // Calling the second time should raise EXISTS - if err := rater.Call("ApierV1.SetActionTimings", atms1, &reply1); err == nil || err.Error() != "EXISTS"{ + if err := rater.Call("ApierV1.SetActionTimings", atms1, &reply1); err == nil || err.Error() != "EXISTS" { t.Error("Unexpected result on duplication: ", err.Error()) } } diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 567655502..6b2bb3933 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -27,8 +27,8 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/howeyc/fsnotify" - "io/ioutil" "io" + "io/ioutil" "net/http" "net/url" "os" @@ -190,7 +190,7 @@ func (self *Cdrc) processFile(filePath string) error { } // Finished with file, move it to processed folder newPath := path.Join(self.cgrCfg.CdrcCdrOutDir, fn) - if err:= os.Rename(filePath, newPath); err != nil { + if err := os.Rename(filePath, newPath); err != nil { engine.Logger.Err(err.Error()) return err } diff --git a/cdrs/fscdr.go b/cdrs/fscdr.go index 5e992a08a..28c894834 100644 --- a/cdrs/fscdr.go +++ b/cdrs/fscdr.go @@ -21,10 +21,10 @@ package cdrs import ( "encoding/json" "errors" - "github.com/cgrates/cgrates/utils" - "time" "fmt" + "github.com/cgrates/cgrates/utils" "strconv" + "time" ) const ( @@ -161,7 +161,7 @@ func (fsCdr FSCdr) Restore(input string) error { } // Used in extra mediation -func(fsCdr FSCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld, accountFld, subjectFld, destFld, answerTimeFld, durationFld string, extraFlds []string, fieldsMandatory bool) (*utils.RatedCDR, error) { +func (fsCdr FSCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld, accountFld, subjectFld, destFld, answerTimeFld, durationFld string, extraFlds []string, fieldsMandatory bool) (*utils.RatedCDR, error) { if utils.IsSliceMember([]string{runId, reqTypeFld, directionFld, tenantFld, torFld, accountFld, subjectFld, destFld, answerTimeFld, durationFld}, "") { return nil, errors.New(fmt.Sprintf("%s:FieldName", utils.ERR_MANDATORY_IE_MISSING)) // All input field names are mandatory } diff --git a/console/get_cost.go b/console/get_cost.go index 02a0ca6f4..760224ff9 100644 --- a/console/get_cost.go +++ b/console/get_cost.go @@ -73,7 +73,6 @@ func (self *CmdGetCost) FromArgs(args []string) error { callDur, err := utils.ParseDurationWithSecs(args[7]) if err != nil { fmt.Println("\n\tExample durations: 60s for 60 seconds, 25m for 25minutes, 1m25s for one minute and 25 seconds\n") - return fmt.Errorf(self.Usage("")) } self.rpcParams.TOR = args[2] self.rpcParams.Tenant = args[3] diff --git a/docs/cdrserver.rst b/docs/cdrserver.rst index 8c6db80e1..c167837f9 100644 --- a/docs/cdrserver.rst +++ b/docs/cdrserver.rst @@ -20,6 +20,7 @@ Primary fields: the fields which CGRateS needs for it's own operations and are s - accid: represents the unique accounting id given by the switch generating the CDR - cdrhost: represents the ip of the host generating the CDR +- cdrsource: formally identifies the source of the CDR - reqtype: matching the supported request types by the CGRateS - direction: matching the supported direction identifiers of the CGRateS - tenant: tenant whom this call belongs @@ -27,7 +28,7 @@ Primary fields: the fields which CGRateS needs for it's own operations and are s - account: account id (accounting subsystem) the record should be attached to - subject: rating subject (rating subsystem) this call should be attached to - destination: destination to be charged -- time_answer: time of the record (in case of tor=call this would be answer time of the call). This will arive as either unix timestamp or datetime RFC3339 compatible. +- answer_time: time of the record (in case of tor=call this would be answer time of the call). Supported formats: datetime RFC3339 compatible, SQL datetime (eg: MySQL), unix timestamp. - duration: used in case of tor=call like, representing the total duration of the call Extra fields: any field coming in via the http request and not a member of primary fields list. These fields are stored as json encoded into *cdrs_extra* table of storDb. @@ -35,7 +36,7 @@ Extra fields: any field coming in via the http request and not a member of prima Example of sample CDR generated simply using curl: :: - curl --data "accid=asbfdsaf&cdrhost=192.168.1.1&reqtype=rated&direction=*out&tenant=cgrates.org&tor=call&account=1001&subject=1001&destination=1002&time_answer=1383813746&duration=10&sip_user=Jitsi" http://ipbxdev:2022/cgr + curl --data "curl --data "accid=iiaasbfdsaf&cdrhost=192.168.1.1&cdrsource=curl_cdr&reqtype=rated&direction=*out&tenant=192.168.56.66&tor=call&account=dan&subject=dan&destination=%2B4986517174963&answer_time=1383813746&duration=1&sip_user=Jitsi&subject2=1003" http://127.0.0.1:2022/cgr CDR-FS_JSON diff --git a/engine/storage_sql.go b/engine/storage_sql.go index b1362542c..4c053801d 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -690,11 +690,7 @@ func (self *SQLStorage) LogError(uuid, source, runid, errstr string) (err error) func (self *SQLStorage) SetCdr(cdr utils.RawCDR) (err error) { // map[account:1001 direction:out orig_ip:172.16.1.1 tor:call accid:accid23 answer_time:2013-02-03 19:54:00 cdrsource:freeswitch_csv destination:+4986517174963 duration:62 reqtype:prepaid subject:1001 supplier:supplier1 tenant:cgrates.org] - startTime, err := cdr.GetAnswerTime() - if err != nil { - Logger.Info(err.Error()) - return err - } + startTime, _ := cdr.GetAnswerTime() // Ignore errors, we want to store the cdr no matter what _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (NULL,'%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %d)", utils.TBL_CDRS_PRIMARY, cdr.GetCgrId(), diff --git a/mediator/mediator.go b/mediator/mediator.go index ec0839fad..536a6e4fe 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -20,11 +20,11 @@ package mediator import ( "errors" + "fmt" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "time" - "fmt" ) func NewMediator(connector engine.Connector, logDb engine.LogStorage, cdrDb engine.CdrStorage, cfg *config.CGRConfig) (m *Mediator, err error) { @@ -68,7 +68,7 @@ func (self *Mediator) parseConfig() error { func (self *Mediator) getCostsFromDB(cgrid string) (cc *engine.CallCost, err error) { for i := 0; i < 3; i++ { // Mechanism to avoid concurrency between SessionManager writing the costs and mediator picking them up cc, err = self.logDb.GetCallCostLog(cgrid, engine.SESSION_MANAGER_SOURCE, utils.DEFAULT_RUNID) //ToDo: What are we getting when there is no log? - if cc != nil { // There were no errors, chances are that we got what we are looking for + if cc != nil { // There were no errors, chances are that we got what we are looking for break } time.Sleep(time.Duration(i) * time.Second) @@ -123,21 +123,23 @@ func (self *Mediator) rateCDR(cdr *utils.RatedCDR) error { } else if qryCC == nil { return errors.New("No cost returned from rater") } - cdr.Cost = qryCC.ConnectFee+qryCC.Cost + cdr.Cost = qryCC.ConnectFee + qryCC.Cost return nil } // Forks original CDR based on original request plus runIds for extra mediation func (self *Mediator) MediateRawCDR(dbcdr utils.RawCDR) error { - rtCdr,err := utils.NewRatedCDRFromRawCDR(dbcdr) + engine.Logger.Info(fmt.Sprintf("Mediating rawCdr: %v, duration: %d",dbcdr, dbcdr.GetDuration())) + rtCdr, err := utils.NewRatedCDRFromRawCDR(dbcdr) if err != nil { return err } + engine.Logger.Info(fmt.Sprintf("Have converted raw into rated: %v", rtCdr)) cdrs := []*utils.RatedCDR{rtCdr} // Start with initial dbcdr, will add here all to be mediated for runIdx, runId := range self.cgrCfg.MediatorRunIds { - forkedCdr, err := dbcdr.AsRatedCdr(self.cgrCfg.MediatorRunIds[runIdx], self.cgrCfg.MediatorReqTypeFields[runIdx], self.cgrCfg.MediatorDirectionFields[runIdx], - self.cgrCfg.MediatorTenantFields[runIdx], self.cgrCfg.MediatorTORFields[runIdx], self.cgrCfg.MediatorAccountFields[runIdx], - self.cgrCfg.MediatorSubjectFields[runIdx], self.cgrCfg.MediatorDestFields[runIdx], self.cgrCfg.MediatorAnswerTimeFields[runIdx], + forkedCdr, err := dbcdr.AsRatedCdr(self.cgrCfg.MediatorRunIds[runIdx], self.cgrCfg.MediatorReqTypeFields[runIdx], self.cgrCfg.MediatorDirectionFields[runIdx], + self.cgrCfg.MediatorTenantFields[runIdx], self.cgrCfg.MediatorTORFields[runIdx], self.cgrCfg.MediatorAccountFields[runIdx], + self.cgrCfg.MediatorSubjectFields[runIdx], self.cgrCfg.MediatorDestFields[runIdx], self.cgrCfg.MediatorAnswerTimeFields[runIdx], self.cgrCfg.MediatorDurationFields[runIdx], []string{}, true) if err != nil { // Errors on fork, cannot calculate further, write that into db for later analysis self.cdrDb.SetRatedCdr(&utils.RatedCDR{CgrId: dbcdr.GetCgrId(), MediationRunId: runId, Cost: -1.0}, err.Error()) // Cannot fork CDR, important just runid and error @@ -151,7 +153,7 @@ func (self *Mediator) MediateRawCDR(dbcdr utils.RawCDR) error { extraInfo = err.Error() } if err := self.cdrDb.SetRatedCdr(cdr, extraInfo); err != nil { - engine.Logger.Err(fmt.Sprintf(" Could not record cost for cgrid: <%s>, err: <%s>, cost: %f, extraInfo: %s", + engine.Logger.Err(fmt.Sprintf(" Could not record cost for cgrid: <%s>, err: <%s>, cost: %f, extraInfo: %s", cdr.CgrId, err.Error(), cdr.Cost, extraInfo)) } } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 2757058fd..0d021870a 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -287,14 +287,20 @@ type AttrCacheStats struct { // Add in the future filters here maybe so we avoid } type CacheStats struct { - Destinations int - RatingPlans int - RatingProfiles int - Actions int + Destinations int + RatingPlans int + RatingProfiles int + Actions int } type AttrCachedItemAge struct { - Category string // Item's category, same name as .csv files without extension - ItemId string // Item's identity tag + Category string // Item's category, same name as .csv files without extension + ItemId string // Item's identity tag } +type CachedItemAge struct { + Destination time.Duration + RatingPlan time.Duration + RatingProfile time.Duration + Action time.Duration +} diff --git a/utils/cgrcdr.go b/utils/cgrcdr.go index 3f4dc126d..8ac63e97b 100644 --- a/utils/cgrcdr.go +++ b/utils/cgrcdr.go @@ -19,10 +19,11 @@ along with this program. If not, see package utils import ( - "net/http" - "time" "errors" "fmt" + "net/http" + "time" + "strings" ) func NewCgrCdrFromHttpReq(req *http.Request) (CgrCdr, error) { @@ -105,10 +106,11 @@ func (cgrCdr CgrCdr) GetDuration() time.Duration { } // Used in mediation, fieldsMandatory marks whether missing field out of request represents error or can be ignored -func(cgrCdr CgrCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld, accountFld, subjectFld, destFld, answerTimeFld, durationFld string, extraFlds []string, fieldsMandatory bool) (*RatedCDR, error) { +// If the fields in parameters start with ^ their value is considered instead of dynamically retrieving it from CDR +func (cgrCdr CgrCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld, accountFld, subjectFld, destFld, answerTimeFld, durationFld string, extraFlds []string, fieldsMandatory bool) (*RatedCDR, error) { if IsSliceMember([]string{runId, reqTypeFld, directionFld, tenantFld, torFld, accountFld, subjectFld, destFld, answerTimeFld, durationFld}, "") { return nil, errors.New(fmt.Sprintf("%s:FieldName", ERR_MANDATORY_IE_MISSING)) // All input field names are mandatory - } + } var err error var hasKey bool var aTimeStr, durStr string @@ -130,37 +132,57 @@ func(cgrCdr CgrCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFl if rtCdr.CdrSource, hasKey = cgrCdr[CDRSOURCE]; !hasKey && fieldsMandatory { return nil, errors.New(fmt.Sprintf("%s:%s", ERR_MANDATORY_IE_MISSING, CDRSOURCE)) } - if rtCdr.ReqType, hasKey = cgrCdr[reqTypeFld]; !hasKey && fieldsMandatory { + if strings.HasPrefix(reqTypeFld, STATIC_VALUE_PREFIX) { // Values starting with prefix are not dynamically populated + rtCdr.ReqType = reqTypeFld[1:] + } else if rtCdr.ReqType, hasKey = cgrCdr[reqTypeFld]; !hasKey && fieldsMandatory { return nil, errors.New(fmt.Sprintf("%s:%s", ERR_MANDATORY_IE_MISSING, reqTypeFld)) } - if rtCdr.Direction, hasKey = cgrCdr[directionFld]; !hasKey && fieldsMandatory { + if strings.HasPrefix(directionFld, STATIC_VALUE_PREFIX) { + rtCdr.Direction = directionFld[1:] + } else if rtCdr.Direction, hasKey = cgrCdr[directionFld]; !hasKey && fieldsMandatory { return nil, errors.New(fmt.Sprintf("%s:%s", ERR_MANDATORY_IE_MISSING, directionFld)) } - if rtCdr.Tenant, hasKey = cgrCdr[tenantFld]; !hasKey && fieldsMandatory { + if strings.HasPrefix(tenantFld, STATIC_VALUE_PREFIX) { + rtCdr.Tenant = tenantFld[1:] + } else if rtCdr.Tenant, hasKey = cgrCdr[tenantFld]; !hasKey && fieldsMandatory { return nil, errors.New(fmt.Sprintf("%s:%s", ERR_MANDATORY_IE_MISSING, tenantFld)) } - if rtCdr.TOR, hasKey = cgrCdr[torFld]; !hasKey && fieldsMandatory { + if strings.HasPrefix(torFld, STATIC_VALUE_PREFIX) { + rtCdr.TOR = torFld[1:] + } else if rtCdr.TOR, hasKey = cgrCdr[torFld]; !hasKey && fieldsMandatory { return nil, errors.New(fmt.Sprintf("%s:%s", ERR_MANDATORY_IE_MISSING, torFld)) } - if rtCdr.Account, hasKey = cgrCdr[accountFld]; !hasKey && fieldsMandatory { + if strings.HasPrefix(accountFld, STATIC_VALUE_PREFIX) { + rtCdr.Account = accountFld[1:] + } else if rtCdr.Account, hasKey = cgrCdr[accountFld]; !hasKey && fieldsMandatory { return nil, errors.New(fmt.Sprintf("%s:%s", ERR_MANDATORY_IE_MISSING, accountFld)) } - if rtCdr.Subject, hasKey = cgrCdr[subjectFld]; !hasKey && fieldsMandatory { + if strings.HasPrefix(subjectFld, STATIC_VALUE_PREFIX) { + rtCdr.Subject = subjectFld[1:] + } else if rtCdr.Subject, hasKey = cgrCdr[subjectFld]; !hasKey && fieldsMandatory { return nil, errors.New(fmt.Sprintf("%s:%s", ERR_MANDATORY_IE_MISSING, subjectFld)) } - if rtCdr.Destination, hasKey = cgrCdr[destFld]; !hasKey && fieldsMandatory { + if strings.HasPrefix(destFld, STATIC_VALUE_PREFIX) { + rtCdr.Destination = destFld[1:] + } else if rtCdr.Destination, hasKey = cgrCdr[destFld]; !hasKey && fieldsMandatory { return nil, errors.New(fmt.Sprintf("%s:%s", ERR_MANDATORY_IE_MISSING, destFld)) } - if aTimeStr, hasKey = cgrCdr[answerTimeFld]; !hasKey && fieldsMandatory { + if aTimeStr, hasKey = cgrCdr[answerTimeFld]; !hasKey && fieldsMandatory && !strings.HasPrefix(answerTimeFld, STATIC_VALUE_PREFIX) { return nil, errors.New(fmt.Sprintf("%s:%s", ERR_MANDATORY_IE_MISSING, answerTimeFld)) } else { + if strings.HasPrefix(answerTimeFld, STATIC_VALUE_PREFIX) { + aTimeStr = answerTimeFld[1:] + } if rtCdr.AnswerTime, err = ParseTimeDetectLayout(aTimeStr); err != nil && fieldsMandatory { return nil, err } } - if durStr, hasKey = cgrCdr[durationFld]; !hasKey && fieldsMandatory { + if durStr, hasKey = cgrCdr[durationFld]; !hasKey && fieldsMandatory && !strings.HasPrefix(durationFld, STATIC_VALUE_PREFIX){ return nil, errors.New(fmt.Sprintf("%s:%s", ERR_MANDATORY_IE_MISSING, durationFld)) } else { + if strings.HasPrefix(durationFld, STATIC_VALUE_PREFIX) { + durStr = durationFld[1:] + } if rtCdr.Duration, err = ParseDurationWithSecs(durStr); err != nil && fieldsMandatory { return nil, err } diff --git a/utils/cgrcdr_test.go b/utils/cgrcdr_test.go index 8e981fd5e..2172e6b96 100644 --- a/utils/cgrcdr_test.go +++ b/utils/cgrcdr_test.go @@ -19,9 +19,9 @@ along with this program. If not, see package utils import ( + "reflect" "testing" "time" - "reflect" ) /* @@ -80,18 +80,31 @@ func TestCgrCdrFields(t *testing.T) { func TestCgrCdrAsRatedCdr(t *testing.T) { cgrCdr := &CgrCdr{"accid": "dsafdsaf", "cdrhost": "192.168.1.1", "cdrsource": "source_test", "reqtype": "rated", "direction": "*out", "tenant": "cgrates.org", "tor": "call", - "account": "1001", "subject": "1001", "destination": "1002", "answer_time": "2013-11-07T08:42:26Z", "duration": "10", + "account": "1001", "subject": "1001", "destination": "1002", "answer_time": "2013-11-07T08:42:26Z", "duration": "10", "field_extr1": "val_extr1", "fieldextr2": "valextr2"} - rtCdrOut, err := cgrCdr.AsRatedCdr("wholesale_run", "reqtype", "direction", "tenant", "tor", "account", "subject", "destination", "answer_time", "duration", []string{"field_extr1","fieldextr2"}, true) + rtCdrOut, err := cgrCdr.AsRatedCdr("wholesale_run", "reqtype", "direction", "tenant", "tor", "account", "subject", "destination", "answer_time", "duration", []string{"field_extr1", "fieldextr2"}, true) if err != nil { t.Error("Unexpected error received", err) } - expctRatedCdr := &RatedCDR{CgrId: FSCgrId("dsafdsaf"), AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "source_test", ReqType: "rated", - Direction: "*out", Tenant: "cgrates.org", TOR: "call", Account: "1001", Subject: "1001", Destination: "1002", AnswerTime: time.Unix(1383813746,0).UTC(), - Duration: 10000000000, ExtraFields: map[string]string{"field_extr1":"val_extr1", "fieldextr2": "valextr2"}, MediationRunId:"wholesale_run", Cost: -1} + expctRatedCdr := &RatedCDR{CgrId: FSCgrId("dsafdsaf"), AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "source_test", ReqType: "rated", + Direction: "*out", Tenant: "cgrates.org", TOR: "call", Account: "1001", Subject: "1001", Destination: "1002", AnswerTime: time.Unix(1383813746, 0).UTC(), + Duration: 10000000000, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, MediationRunId: "wholesale_run", Cost: -1} if !reflect.DeepEqual(rtCdrOut, expctRatedCdr) { t.Errorf("Received: %v, expected: %v", rtCdrOut, expctRatedCdr) } + rtCdrOut2, err := cgrCdr.AsRatedCdr("wholesale_run", "^postpaid", "^*in", "^cgrates.com", "^premium_call", "^first_account", "^first_subject", "destination", "^2013-12-07T08:42:26Z", "^12s", []string{"field_extr1", "fieldextr2"}, true) + if err != nil { + t.Error("Unexpected error received", err) + } + expctRatedCdr2 := &RatedCDR{CgrId: FSCgrId("dsafdsaf"), AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "source_test", ReqType: "postpaid", + Direction: "*in", Tenant: "cgrates.com", TOR: "premium_call", Account: "first_account", Subject: "first_subject", Destination: "1002", + AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC), Duration: time.Duration(12)*time.Second, + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, MediationRunId: "wholesale_run", Cost: -1} + if !reflect.DeepEqual(rtCdrOut2, expctRatedCdr2) { + t.Errorf("Received: %v, expected: %v", rtCdrOut, expctRatedCdr) + } + _, err = cgrCdr.AsRatedCdr("wholesale_run", "dummy_header", "direction", "tenant", "tor", "account", "subject", "destination", "answer_time", "duration", []string{"field_extr1", "fieldextr2"}, true) + if err == nil { + t.Error("Failed to detect missing header") + } } - - diff --git a/utils/consts.go b/utils/consts.go index 7aa6e396d..0d83fad81 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -76,5 +76,6 @@ const ( DESTINATION = "destination" ANSWER_TIME = "answer_time" DURATION = "duration" - DEFAULT_RUNID = "default" + DEFAULT_RUNID = "default" + STATIC_VALUE_PREFIX = "^" ) diff --git a/utils/coreutils.go b/utils/coreutils.go index c8b40f68c..082a333d1 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -22,13 +22,13 @@ import ( "crypto/rand" "crypto/sha1" "encoding/hex" + "errors" "fmt" "math" + "regexp" "strconv" "strings" "time" - "regexp" - "errors" ) // Returns first non empty string out of vals. Useful to extract defaults @@ -113,8 +113,10 @@ func ParseTimeDetectLayout(tmStr string) (time.Time, error) { if tmstmp, err := strconv.ParseInt(tmStr, 10, 64); err != nil { return nilTime, err } else { - return time.Unix(tmstmp,0), nil + return time.Unix(tmstmp, 0), nil } + case len(tmStr) == 0: // Time probably missing from request + return nilTime, nil } return nilTime, errors.New("Unsupported time format") } diff --git a/utils/ratedcdr.go b/utils/ratedcdr.go index 43d62ab74..51798d95e 100644 --- a/utils/ratedcdr.go +++ b/utils/ratedcdr.go @@ -39,7 +39,7 @@ func NewRatedCDRFromRawCDR(rawcdr RawCDR) (*RatedCDR, error) { if rtCdr.AnswerTime, err = rawcdr.GetAnswerTime(); err != nil { return nil, err } - rtCdr.Duration = time.Duration(rawcdr.GetDuration()) * time.Second + rtCdr.Duration = rawcdr.GetDuration() rtCdr.ExtraFields = rawcdr.GetExtraFields() rtCdr.MediationRunId = DEFAULT_RUNID rtCdr.Cost = -1 @@ -69,59 +69,59 @@ type RatedCDR struct { // Methods maintaining RawCDR interface func (ratedCdr *RatedCDR) GetCgrId() string { - return ratedCdr.CgrId + return ratedCdr.CgrId } func (ratedCdr *RatedCDR) GetAccId() string { - return ratedCdr.AccId + return ratedCdr.AccId } func (ratedCdr *RatedCDR) GetCdrHost() string { - return ratedCdr.CdrHost + return ratedCdr.CdrHost } func (ratedCdr *RatedCDR) GetCdrSource() string { - return ratedCdr.CdrSource + return ratedCdr.CdrSource } func (ratedCdr *RatedCDR) GetDirection() string { - return ratedCdr.Direction + return ratedCdr.Direction } func (ratedCdr *RatedCDR) GetSubject() string { - return ratedCdr.Subject + return ratedCdr.Subject } func (ratedCdr *RatedCDR) GetAccount() string { - return ratedCdr.Account + return ratedCdr.Account } func (ratedCdr *RatedCDR) GetDestination() string { - return ratedCdr.Destination + return ratedCdr.Destination } func (ratedCdr *RatedCDR) GetTOR() string { - return ratedCdr.TOR + return ratedCdr.TOR } func (ratedCdr *RatedCDR) GetTenant() string { - return ratedCdr.Tenant + return ratedCdr.Tenant } func (ratedCdr *RatedCDR) GetReqType() string { - return ratedCdr.ReqType + return ratedCdr.ReqType } func (ratedCdr *RatedCDR) GetAnswerTime() (time.Time, error) { - return ratedCdr.AnswerTime, nil + return ratedCdr.AnswerTime, nil } func (ratedCdr *RatedCDR) GetDuration() time.Duration { - return ratedCdr.Duration + return ratedCdr.Duration } func (ratedCdr *RatedCDR) GetExtraFields() map[string]string { - return ratedCdr.ExtraFields + return ratedCdr.ExtraFields } func (ratedCdr *RatedCDR) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld, accountFld, subjectFld, destFld, answerTimeFld, durationFld string, extraFlds []string, fieldsMandatory bool) (*RatedCDR, error) { diff --git a/utils/ratedcdr_test.go b/utils/ratedcdr_test.go index 9a087b0ca..e777889af 100644 --- a/utils/ratedcdr_test.go +++ b/utils/ratedcdr_test.go @@ -21,6 +21,7 @@ package utils import ( "testing" "time" + "reflect" ) func TestRatedCDRInterfaces(t *testing.T) { @@ -29,11 +30,17 @@ func TestRatedCDRInterfaces(t *testing.T) { } func TestNewRatedCDRFromRawCDR(t *testing.T) { - cgrCdr := CgrCdr{"accid": "dsafdsaf", "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out", "tenant": "cgrates.org", "tor": "call", - "account": "1001", "subject": "1001", "destination": "1002", "answer_time": "2013-11-07T08:42:26Z", "duration": "10", + cgrCdr := CgrCdr{"accid": "dsafdsaf", "cdrhost": "192.168.1.1", "cdrsource": "internal_test", "reqtype": "rated", "direction": "*out", "tenant": "cgrates.org", "tor": "call", + "account": "1001", "subject": "1001", "destination": "1002", "answer_time": "2013-11-07T08:42:26Z", "duration": "10", "field_extr1": "val_extr1", "fieldextr2": "valextr2"} - if _,err := NewRatedCDRFromRawCDR(cgrCdr); err != nil { + expctRtCdr := &RatedCDR{CgrId: FSCgrId(cgrCdr["accid"]), AccId: cgrCdr["accid"], CdrHost: cgrCdr["cdrhost"], CdrSource: cgrCdr["cdrsource"], ReqType: cgrCdr["reqtype"], + Direction: cgrCdr["direction"], Tenant: cgrCdr["tenant"], TOR: cgrCdr["tor"], Account: cgrCdr["account"], Subject: cgrCdr["subject"], + Destination: cgrCdr["destination"], AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), Duration: time.Duration(10)*time.Second, + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, MediationRunId: DEFAULT_RUNID, Cost: -1} + if rt, err := NewRatedCDRFromRawCDR(cgrCdr); err != nil { t.Error(err) + } else if !reflect.DeepEqual(rt, expctRtCdr) { + t.Errorf("Received %v, expected: %v", rt, expctRtCdr) } } @@ -41,7 +48,7 @@ func TestRatedCdrFields(t *testing.T) { ratedCdr := RatedCDR{CgrId: FSCgrId("dsafdsaf"), AccId: "dsafdsaf", CdrHost: "192.168.1.1", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", TOR: "call", Account: "1001", Subject: "1001", Destination: "1002", AnswerTime: time.Unix(1383813746, 0), Duration: 10, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, - } + } if ratedCdr.GetCgrId() != "b18944ef4dc618569f24c27b9872827a242bad0c" { t.Error("Error parsing cdr: ", ratedCdr) } diff --git a/utils/rawcdr.go b/utils/rawcdr.go index 0a1867a33..bf2cfbbf9 100644 --- a/utils/rawcdr.go +++ b/utils/rawcdr.go @@ -39,6 +39,6 @@ type RawCDR interface { GetReqType() string GetAnswerTime() (time.Time, error) GetDuration() time.Duration - GetExtraFields() map[string]string //Stores extra CDR Fields + GetExtraFields() map[string]string //Stores extra CDR Fields AsRatedCdr(string, string, string, string, string, string, string, string, string, string, []string, bool) (*RatedCDR, error) // Based on fields queried will return a particular instance of RatedCDR } diff --git a/utils/utils_test.go b/utils/utils_test.go index 64b0bce43..51089fe4d 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -157,8 +157,6 @@ func TestParseTimeDetectLayout(t *testing.T) { t.Errorf("Expecting error") } } - - func TestParseDateUnix(t *testing.T) { date, err := ParseDate("1375212790")