From afe845a8aa444d88f4127d45893b7b0be9f6bb5e Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 7 Jan 2014 20:27:45 +0100 Subject: [PATCH 1/7] Enhancing cdrexporter with dry_run, console export_cdrs command --- apier/v1/cdrs.go | 59 +++++++++++++---------- cdrexporter/csv.go | 2 +- cdrexporter/csv_test.go | 4 +- console/export_cdrs.go | 95 +++++++++++++++++++++++++++++++++++++ engine/storage_interface.go | 1 + engine/storage_sql.go | 32 +++++++++++-- utils/apitpdata.go | 13 +++++ utils/consts.go | 6 +++ utils/coreutils.go | 9 ++-- utils/utils_test.go | 22 +++++++++ 10 files changed, 208 insertions(+), 35 deletions(-) create mode 100644 console/export_cdrs.go diff --git a/apier/v1/cdrs.go b/apier/v1/cdrs.go index 7879f64d5..f0ad83f5f 100644 --- a/apier/v1/cdrs.go +++ b/apier/v1/cdrs.go @@ -25,28 +25,23 @@ import ( "os" "path" "time" + "strings" ) -type AttrExpCsvCdrs struct { - TimeStart string // If provided, will represent the starting of the CDRs interval (>=) - TimeEnd string // If provided, will represent the end of the CDRs interval (<) -} - -type ExportedCsvCdrs struct { - ExportedFilePath string // Full path to the newly generated export file - NumberOfCdrs int // Number of CDRs in the export file -} - -func (self *ApierV1) ExportCsvCdrs(attr *AttrExpCsvCdrs, reply *ExportedCsvCdrs) error { +func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.ExportedFileCdrs) error { var tStart, tEnd time.Time var err error + cdrFormat := strings.ToLower(attr.CdrFormat) + if !utils.IsSliceMember(utils.CdreCdrFormats, cdrFormat) { + return fmt.Errorf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, "CdrFormat") + } if len(attr.TimeStart) != 0 { - if tStart, err = utils.ParseDate(attr.TimeStart); err != nil { + if tStart, err = utils.ParseTimeDetectLayout(attr.TimeStart); err != nil { return err } } if len(attr.TimeEnd) != 0 { - if tEnd, err = utils.ParseDate(attr.TimeEnd); err != nil { + if tEnd, err = utils.ParseTimeDetectLayout(attr.TimeEnd); err != nil { return err } } @@ -54,21 +49,33 @@ func (self *ApierV1) ExportCsvCdrs(attr *AttrExpCsvCdrs, reply *ExportedCsvCdrs) if err != nil { return err } - fileName := path.Join(self.Config.CdreDir, "cgr", "csv", fmt.Sprintf("cdrs_%d.csv", time.Now().Unix())) - fileOut, err := os.Create(fileName) - if err != nil { - return err - } else { - defer fileOut.Close() - } - csvWriter := cdrexporter.NewCsvCdrWriter(fileOut, self.Config.RoundingDecimals, self.Config.CdreExtraFields) - for _, cdr := range cdrs { - if err := csvWriter.Write(cdr); err != nil { - os.Remove(fileName) + var fileName string + if cdrFormat == utils.CDRE_CSV && len(cdrs) != 0 { + fileName = path.Join(self.Config.CdreDir, fmt.Sprintf("cdrs_%d.csv", time.Now().Unix())) + fileOut, err := os.Create(fileName) + if err != nil { return err + } else { + defer fileOut.Close() + } + csvWriter := cdrexporter.NewCsvCdrWriter(fileOut, self.Config.RoundingDecimals, self.Config.CdreExtraFields) + for _, cdr := range cdrs { + if err := csvWriter.Write(cdr); err != nil { + os.Remove(fileName) + return err + } + } + csvWriter.Close() + if attr.RemoveFromDb { + cgrIds := make([]string, len(cdrs)) + for idx, cdr := range cdrs { + cgrIds[idx] = cdr.CgrId + } + if err := self.CdrDb.RemRatedCdrs(cgrIds); err != nil { + return err + } } } - csvWriter.Close() - *reply = ExportedCsvCdrs{fileName, len(cdrs)} + *reply = utils.ExportedFileCdrs{fileName, len(cdrs)} return nil } diff --git a/cdrexporter/csv.go b/cdrexporter/csv.go index 2406b2b29..dde1535b9 100644 --- a/cdrexporter/csv.go +++ b/cdrexporter/csv.go @@ -37,7 +37,7 @@ func NewCsvCdrWriter(writer io.Writer, roundDecimals int, extraFields []string) } func (dcw *CsvCdrWriter) Write(cdr *utils.RatedCDR) error { - primaryFields := []string{cdr.CgrId, cdr.AccId, cdr.CdrHost, cdr.ReqType, cdr.Direction, cdr.Tenant, cdr.TOR, cdr.Account, cdr.Subject, + primaryFields := []string{cdr.CgrId, cdr.MediationRunId, cdr.AccId, cdr.CdrHost, cdr.ReqType, cdr.Direction, cdr.Tenant, cdr.TOR, cdr.Account, cdr.Subject, cdr.Destination, cdr.AnswerTime.String(), strconv.Itoa(int(cdr.Duration)), strconv.FormatFloat(cdr.Cost, 'f', dcw.roundDecimals, 64)} if len(dcw.extraFields) == 0 { dcw.extraFields = utils.MapKeys(cdr.ExtraFields) diff --git a/cdrexporter/csv_test.go b/cdrexporter/csv_test.go index 6b9a4a2a8..0b518b0c1 100644 --- a/cdrexporter/csv_test.go +++ b/cdrexporter/csv_test.go @@ -30,12 +30,12 @@ func TestCsvCdrWriter(t *testing.T) { writer := &bytes.Buffer{} csvCdrWriter := NewCsvCdrWriter(writer, 4, []string{"extra3", "extra1"}) ratedCdr := &utils.RatedCDR{CgrId: utils.FSCgrId("dsafdsaf"), AccId: "dsafdsaf", CdrHost: "192.168.1.1", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", - TOR: "call", Account: "1001", Subject: "1001", Destination: "1002", AnswerTime: time.Unix(1383813746, 0).UTC(), Duration: 10, + TOR: "call", Account: "1001", Subject: "1001", Destination: "1002", AnswerTime: time.Unix(1383813746, 0).UTC(), Duration: 10, MediationRunId: utils.DEFAULT_RUNID, ExtraFields: map[string]string{"extra1": "val_extra1", "extra2": "val_extra2", "extra3": "val_extra3"}, Cost: 1.01, } csvCdrWriter.Write(ratedCdr) csvCdrWriter.Close() - expected := "b18944ef4dc618569f24c27b9872827a242bad0c,dsafdsaf,192.168.1.1,rated,*out,cgrates.org,call,1001,1001,1002,2013-11-07 08:42:26 +0000 UTC,10,1.0100,val_extra3,val_extra1" + expected := "b18944ef4dc618569f24c27b9872827a242bad0c,default,dsafdsaf,192.168.1.1,rated,*out,cgrates.org,call,1001,1001,1002,2013-11-07 08:42:26 +0000 UTC,10,1.0100,val_extra3,val_extra1" result := strings.TrimSpace(writer.String()) if result != expected { t.Errorf("Expected %s received %s.", expected, result) diff --git a/console/export_cdrs.go b/console/export_cdrs.go new file mode 100644 index 000000000..b864662ab --- /dev/null +++ b/console/export_cdrs.go @@ -0,0 +1,95 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2013 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package console + +import ( + "fmt" + "github.com/cgrates/cgrates/utils" + "time" +) + +func init() { + commands["export_cdrs"] = &CmdExportCdrs{} +} + +// Commander implementation +type CmdExportCdrs struct { + rpcMethod string + rpcParams *utils.AttrExpFileCdrs + rpcResult utils.ExportedFileCdrs +} + +// name should be exec's name +func (self *CmdExportCdrs) Usage(name string) string { + return fmt.Sprintf("\n\tUsage: cgr-console [cfg_opts...{-h}] export_cdrs [ [ [remove_from_db]]]") +} + +// set param defaults +func (self *CmdExportCdrs) defaults() error { + self.rpcMethod = "ApierV1.ExportCdrsToFile" + self.rpcParams = &utils.AttrExpFileCdrs{CdrFormat:"csv"} + return nil +} + +func (self *CmdExportCdrs) FromArgs(args []string) error { + self.defaults() + var timeStart, timeEnd string + if len(args) < 3 { + return fmt.Errorf(self.Usage("")) + } + if !utils.IsSliceMember(utils.CdreCdrFormats, args[2]) { + return fmt.Errorf(self.Usage("")) + } + self.rpcParams.CdrFormat = args[2] + switch len(args) { + case 4: + timeStart = args[3] + + case 5: + timeStart = args[3] + timeEnd = args[4] + case 6: + timeStart = args[3] + timeEnd = args[4] + if args[5] == "remove_from_db" { + self.rpcParams.RemoveFromDb = true + } + } + if timeStart == "*one_month" { + now := time.Now() + self.rpcParams.TimeStart = now.AddDate(0,-1,0).String() + self.rpcParams.TimeEnd = now.String() + } else { + self.rpcParams.TimeStart = timeStart + self.rpcParams.TimeEnd = timeEnd + } + return nil +} + +func (self *CmdExportCdrs) RpcMethod() string { + return self.rpcMethod +} + +func (self *CmdExportCdrs) RpcParams() interface{} { + return self.rpcParams +} + +func (self *CmdExportCdrs) RpcResult() interface{} { + return &self.rpcResult +} diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 4ba43e621..ca07d11cf 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -96,6 +96,7 @@ type CdrStorage interface { SetCdr(utils.RawCDR) error SetRatedCdr(*utils.RatedCDR, string) error GetRatedCdrs(time.Time, time.Time) ([]*utils.RatedCDR, error) + RemRatedCdrs([]string) error } type LogStorage interface { diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 4c053801d..ae03e59af 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -760,14 +760,14 @@ func (self *SQLStorage) GetRatedCdrs(timeStart, timeEnd time.Time) ([]*utils.Rat for rows.Next() { var cgrid, accid, cdrhost, cdrsrc, reqtype, direction, tenant, tor, account, subject, destination, runid string var extraFields []byte - var answerTimestamp, duration int64 + var answerTime time.Time + var duration int64 var cost float64 var extraFieldsMp map[string]string - if err := rows.Scan(&cgrid, &accid, &cdrhost, &cdrsrc, &reqtype, &direction, &tenant, &tor, &account, &subject, &destination, &answerTimestamp, &duration, + if err := rows.Scan(&cgrid, &accid, &cdrhost, &cdrsrc, &reqtype, &direction, &tenant, &tor, &account, &subject, &destination, &answerTime, &duration, &extraFields, &runid, &cost); err != nil { return nil, err } - answerTime := time.Unix(answerTimestamp, 0) if err := json.Unmarshal(extraFields, &extraFieldsMp); err != nil { return nil, err } @@ -781,6 +781,32 @@ func (self *SQLStorage) GetRatedCdrs(timeStart, timeEnd time.Time) ([]*utils.Rat return cdrs, nil } +// Remove CDR data out of all CDR tables based on their cgrid +func (self *SQLStorage) RemRatedCdrs(cgrIds []string) error { + if len(cgrIds) == 0 { + return nil + } + buffRated := bytes.NewBufferString(fmt.Sprintf("DELETE FROM %s WHERE", utils.TBL_RATED_CDRS)) + buffCosts := bytes.NewBufferString(fmt.Sprintf("DELETE FROM %s WHERE", utils.TBL_COST_DETAILS)) + buffCdrExtra := bytes.NewBufferString(fmt.Sprintf("DELETE FROM %s WHERE",utils.TBL_CDRS_EXTRA)) + buffCdrPrimary := bytes.NewBufferString(fmt.Sprintf("DELETE FROM %s WHERE",utils.TBL_CDRS_PRIMARY)) + qryBuffers := []*bytes.Buffer{buffRated, buffCosts, buffCdrExtra, buffCdrPrimary} + for idx, cgrId := range cgrIds { + for _, buffer := range qryBuffers { + if idx != 0 { + buffer.WriteString(" OR") + } + buffer.WriteString(fmt.Sprintf(" cgrid='%s'",cgrId)) + } + } + for _, buffer := range qryBuffers { + if _, err := self.Db.Exec(buffer.String()); err != nil { + return err + } + } + return nil +} + func (self *SQLStorage) GetTpDestinations(tpid, tag string) ([]*Destination, error) { var dests []*Destination q := fmt.Sprintf("SELECT * FROM %s WHERE tpid='%s'", utils.TBL_TP_DESTINATIONS, tpid) diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 0d021870a..aba11f578 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -304,3 +304,16 @@ type CachedItemAge struct { RatingProfile time.Duration Action time.Duration } + +type AttrExpFileCdrs struct { + CdrFormat string // Cdr output file format + TimeStart string // If provided, will represent the starting of the CDRs interval (>=) + TimeEnd string // If provided, will represent the end of the CDRs interval (<) + RemoveFromDb bool // If true the CDRs will be also deleted after export + +} + +type ExportedFileCdrs struct { + ExportedFilePath string // Full path to the newly generated export file + NumberOfRecords int // Number of CDRs in the export file +} diff --git a/utils/consts.go b/utils/consts.go index 0d83fad81..5f17455f2 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -78,4 +78,10 @@ const ( DURATION = "duration" DEFAULT_RUNID = "default" STATIC_VALUE_PREFIX = "^" + CDRE_CSV = "csv" + CDRE_DRYRUN = "dry_run" +) + +var ( + CdreCdrFormats = []string{CDRE_CSV, CDRE_DRYRUN} ) diff --git a/utils/coreutils.go b/utils/coreutils.go index 082a333d1..eaf44b603 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -101,12 +101,15 @@ func Round(x float64, prec int, method string) float64 { func ParseTimeDetectLayout(tmStr string) (time.Time, error) { var nilTime time.Time - rfc3339Rule := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.+`) - sqlRule := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}`) - unixTimestampRule := regexp.MustCompile(`^\d{10}`) + rfc3339Rule := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.+$`) + sqlRule := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}$`) + gotimeRule := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.?\d*\s[+,-]\d+\s\w+$`) + unixTimestampRule := regexp.MustCompile(`^\d{10}$`) switch { case rfc3339Rule.MatchString(tmStr): return time.Parse(time.RFC3339, tmStr) + case gotimeRule.MatchString(tmStr): + return time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", tmStr) case sqlRule.MatchString(tmStr): return time.Parse("2006-01-02 15:04:05", tmStr) case unixTimestampRule.MatchString(tmStr): diff --git a/utils/utils_test.go b/utils/utils_test.go index 51089fe4d..cbc0924dd 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -156,6 +156,28 @@ func TestParseTimeDetectLayout(t *testing.T) { if err == nil { t.Errorf("Expecting error") } + goTmStr := "2013-12-30 15:00:01 +0000 UTC" + goTm, err := ParseTimeDetectLayout(goTmStr) + if err != nil { + t.Error(err) + } else if !goTm.Equal(expectedTime) { + t.Errorf("Unexpected time parsed: %v, expecting: %v", goTm, expectedTime) + } + _, err = ParseTimeDetectLayout(goTmStr[1:]) + if err == nil { + t.Errorf("Expecting error") + } + goTmStr = "2013-12-30 15:00:01.000000000 +0000 UTC" + goTm, err = ParseTimeDetectLayout(goTmStr) + if err != nil { + t.Error(err) + } else if !goTm.Equal(expectedTime) { + t.Errorf("Unexpected time parsed: %v, expecting: %v", goTm, expectedTime) + } + _, err = ParseTimeDetectLayout(goTmStr[1:]) + if err == nil { + t.Errorf("Expecting error") + } } func TestParseDateUnix(t *testing.T) { From b7d9da842bca3924e2faa21cbb127da112ff624c Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 8 Jan 2014 14:00:20 +0100 Subject: [PATCH 2/7] Adding AccountActionTiming Get and Remove APIs --- apier/v1/accounts.go | 96 ++++++++++++++++++++++++++++++++++++++++++ apier/v1/apier.go | 4 +- apier/v1/cdrs.go | 6 +-- cdrc/cdrc.go | 6 +-- console/export_cdrs.go | 2 +- engine/storage_sql.go | 6 +-- utils/apitpdata.go | 20 ++++----- utils/cgrcdr.go | 4 +- utils/cgrcdr_test.go | 4 +- utils/ratedcdr_test.go | 8 ++-- 10 files changed, 126 insertions(+), 30 deletions(-) create mode 100644 apier/v1/accounts.go diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go new file mode 100644 index 000000000..1242910ad --- /dev/null +++ b/apier/v1/accounts.go @@ -0,0 +1,96 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2013 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package apier + +import ( + "errors" + "fmt" + "github.com/cgrates/cgrates/utils" + "time" +) + +type AttrAcntActionTimings struct { + Tenant string + Account string + Direction string +} + +// Returns the balance id as used internally +// eg: *out:cgrates.org:1005 +func BalanceId(tenant, account, direction string) string { + return fmt.Sprintf("%s:%s:%s", direction, tenant, account) +} + +type AccountActionTiming struct { + Id string // The id to reference this particular ActionTiming + ActionTimingsId string // The id of the ActionTimings profile attached to the account + ActionsId string // The id of actions which will be executed + NextExecTime time.Time // Next execution time +} + +func (self *ApierV1) GetAccountActionTimings(attrs AttrAcntActionTimings, reply *[]*AccountActionTiming) error { + if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account", "Direction"}); len(missing) != 0 { + return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) + } + accountATs := make([]*AccountActionTiming, 0) + allATs, err := self.AccountDb.GetAllActionTimings() + if err != nil { + return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) + } + for _, ats := range allATs { + for _, at := range ats { + if utils.IsSliceMember(at.UserBalanceIds, BalanceId(attrs.Tenant, attrs.Account, attrs.Direction)) { + accountATs = append(accountATs, &AccountActionTiming{Id: at.Id, ActionTimingsId: at.Tag, ActionsId: at.ActionsId, NextExecTime: at.GetNextStartTime()}) + } + } + } + *reply = accountATs + return nil +} + +type AttrRemAcntActionTiming struct { + Tenant string // Tenant he account belongs to + Account string // Account name + Direction string // Traffic direction + ActionTimingsId string // Id identifying the ActionTimings profile + ActionTimingId string // Internal CGR id identifying particular ActionTiming, *all for all user related ActionTimings to be canceled +} + +func (self *ApierV1) RemAccountActionTiming(attrs AttrRemAcntActionTiming, reply *string) error { + if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account", "Direction", "ActionTimingsId", "ActionTimingId"}); len(missing) != 0 { + return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) + } + ats, err := self.AccountDb.GetActionTimings(attrs.ActionTimingsId) + if err != nil { + return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) + } else if len(ats) == 0 { + return errors.New(utils.ERR_NOT_FOUND) + } + for idx, at := range ats { + if utils.IsSliceMember(at.UserBalanceIds, BalanceId(attrs.Tenant, attrs.Account, attrs.Direction)) && + (at.Id == attrs.ActionTimingId || attrs.ActionTimingId == "*any") { + ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1] // Remove from ats + } + } + if err := self.AccountDb.SetActionTimings(attrs.ActionTimingsId, ats); err != nil { + return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) + } + *reply = OK + return nil +} diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 8b776f07b..dc0790e94 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -526,8 +526,8 @@ func (self *ApierV1) GetCachedItemAge(itemId string, reply *utils.CachedItemAge) } cachedItemAge := new(utils.CachedItemAge) var found bool - for idx, cacheKey := range []string{ engine.DESTINATION_PREFIX+itemId, engine.RATING_PLAN_PREFIX+itemId, engine.RATING_PROFILE_PREFIX+itemId, - engine.ACTION_PREFIX+itemId} { + for idx, cacheKey := range []string{engine.DESTINATION_PREFIX + itemId, engine.RATING_PLAN_PREFIX + itemId, engine.RATING_PROFILE_PREFIX + itemId, + engine.ACTION_PREFIX + itemId} { if age, err := cache2go.GetKeyAge(cacheKey); err == nil { found = true switch idx { diff --git a/apier/v1/cdrs.go b/apier/v1/cdrs.go index f0ad83f5f..f92659924 100644 --- a/apier/v1/cdrs.go +++ b/apier/v1/cdrs.go @@ -24,8 +24,8 @@ import ( "github.com/cgrates/cgrates/utils" "os" "path" - "time" "strings" + "time" ) func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.ExportedFileCdrs) error { @@ -34,7 +34,7 @@ func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.E cdrFormat := strings.ToLower(attr.CdrFormat) if !utils.IsSliceMember(utils.CdreCdrFormats, cdrFormat) { return fmt.Errorf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, "CdrFormat") - } + } if len(attr.TimeStart) != 0 { if tStart, err = utils.ParseTimeDetectLayout(attr.TimeStart); err != nil { return err @@ -62,7 +62,7 @@ func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.E for _, cdr := range cdrs { if err := csvWriter.Write(cdr); err != nil { os.Remove(fileName) - return err + return err } } csvWriter.Close() diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index d36e4fbd9..a9fe8476d 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -39,7 +39,7 @@ import ( ) const ( - CSV = "csv" + CSV = "csv" FS_CSV = "freeswitch_csv" ) @@ -126,7 +126,7 @@ func (self *Cdrc) processCdrDir() error { engine.Logger.Info(fmt.Sprintf(" Parsing folder %s for CDR files.", self.cgrCfg.CdrcCdrInDir)) filesInDir, _ := ioutil.ReadDir(self.cgrCfg.CdrcCdrInDir) for _, file := range filesInDir { - if self.cgrCfg.CdrcCdrType!=FS_CSV || path.Ext(file.Name())!=".csv" { + if self.cgrCfg.CdrcCdrType != FS_CSV || path.Ext(file.Name()) != ".csv" { if err := self.processFile(path.Join(self.cgrCfg.CdrcCdrInDir, file.Name())); err != nil { return err } @@ -150,7 +150,7 @@ func (self *Cdrc) trackCDRFiles() (err error) { for { select { case ev := <-watcher.Event: - if ev.IsCreate() && (self.cgrCfg.CdrcCdrType!=FS_CSV || path.Ext(ev.Name)!=".csv") { + if ev.IsCreate() && (self.cgrCfg.CdrcCdrType != FS_CSV || path.Ext(ev.Name) != ".csv") { if err = self.processFile(ev.Name); err != nil { return err } diff --git a/console/export_cdrs.go b/console/export_cdrs.go index b864662ab..ae5e285a8 100644 --- a/console/export_cdrs.go +++ b/console/export_cdrs.go @@ -37,7 +37,7 @@ type CmdExportCdrs struct { // name should be exec's name func (self *CmdExportCdrs) Usage(name string) string { - return fmt.Sprintf("\n\tUsage: cgr-console [cfg_opts...{-h}] export_cdrs [ [ [remove_from_db]]]") + return fmt.Sprintf("\n\tUsage: cgr-console [cfg_opts...{-h}] export_cdrs [ [ [remove_from_db]]]") } // set param defaults diff --git a/engine/storage_sql.go b/engine/storage_sql.go index ae03e59af..b4c1a36ac 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -788,15 +788,15 @@ func (self *SQLStorage) RemRatedCdrs(cgrIds []string) error { } buffRated := bytes.NewBufferString(fmt.Sprintf("DELETE FROM %s WHERE", utils.TBL_RATED_CDRS)) buffCosts := bytes.NewBufferString(fmt.Sprintf("DELETE FROM %s WHERE", utils.TBL_COST_DETAILS)) - buffCdrExtra := bytes.NewBufferString(fmt.Sprintf("DELETE FROM %s WHERE",utils.TBL_CDRS_EXTRA)) - buffCdrPrimary := bytes.NewBufferString(fmt.Sprintf("DELETE FROM %s WHERE",utils.TBL_CDRS_PRIMARY)) + buffCdrExtra := bytes.NewBufferString(fmt.Sprintf("DELETE FROM %s WHERE", utils.TBL_CDRS_EXTRA)) + buffCdrPrimary := bytes.NewBufferString(fmt.Sprintf("DELETE FROM %s WHERE", utils.TBL_CDRS_PRIMARY)) qryBuffers := []*bytes.Buffer{buffRated, buffCosts, buffCdrExtra, buffCdrPrimary} for idx, cgrId := range cgrIds { for _, buffer := range qryBuffers { if idx != 0 { buffer.WriteString(" OR") } - buffer.WriteString(fmt.Sprintf(" cgrid='%s'",cgrId)) + buffer.WriteString(fmt.Sprintf(" cgrid='%s'", cgrId)) } } for _, buffer := range qryBuffers { diff --git a/utils/apitpdata.go b/utils/apitpdata.go index aba11f578..d7a054985 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -299,21 +299,21 @@ type AttrCachedItemAge struct { } type CachedItemAge struct { - Destination time.Duration - RatingPlan time.Duration - RatingProfile time.Duration - Action time.Duration + Destination time.Duration + RatingPlan time.Duration + RatingProfile time.Duration + Action time.Duration } type AttrExpFileCdrs struct { - CdrFormat string // Cdr output file format - TimeStart string // If provided, will represent the starting of the CDRs interval (>=) - TimeEnd string // If provided, will represent the end of the CDRs interval (<) - RemoveFromDb bool // If true the CDRs will be also deleted after export - + CdrFormat string // Cdr output file format + TimeStart string // If provided, will represent the starting of the CDRs interval (>=) + TimeEnd string // If provided, will represent the end of the CDRs interval (<) + RemoveFromDb bool // If true the CDRs will be also deleted after export + } type ExportedFileCdrs struct { ExportedFilePath string // Full path to the newly generated export file - NumberOfRecords int // Number of CDRs in the export file + NumberOfRecords int // Number of CDRs in the export file } diff --git a/utils/cgrcdr.go b/utils/cgrcdr.go index 8ac63e97b..48120893f 100644 --- a/utils/cgrcdr.go +++ b/utils/cgrcdr.go @@ -22,8 +22,8 @@ import ( "errors" "fmt" "net/http" - "time" "strings" + "time" ) func NewCgrCdrFromHttpReq(req *http.Request) (CgrCdr, error) { @@ -177,7 +177,7 @@ func (cgrCdr CgrCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torF return nil, err } } - if durStr, hasKey = cgrCdr[durationFld]; !hasKey && fieldsMandatory && !strings.HasPrefix(durationFld, STATIC_VALUE_PREFIX){ + if durStr, hasKey = cgrCdr[durationFld]; !hasKey && fieldsMandatory && !strings.HasPrefix(durationFld, STATIC_VALUE_PREFIX) { return nil, errors.New(fmt.Sprintf("%s:%s", ERR_MANDATORY_IE_MISSING, durationFld)) } else { if strings.HasPrefix(durationFld, STATIC_VALUE_PREFIX) { diff --git a/utils/cgrcdr_test.go b/utils/cgrcdr_test.go index 751c142cf..071c9ef11 100644 --- a/utils/cgrcdr_test.go +++ b/utils/cgrcdr_test.go @@ -97,8 +97,8 @@ func TestCgrCdrAsRatedCdr(t *testing.T) { t.Error("Unexpected error received", err) } expctRatedCdr2 := &RatedCDR{CgrId: FSCgrId("dsafdsaf"), AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "source_test", ReqType: "postpaid", - Direction: "*in", Tenant: "cgrates.com", TOR: "premium_call", Account: "first_account", Subject: "first_subject", Destination: "1002", - AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC), Duration: time.Duration(12)*time.Second, + Direction: "*in", Tenant: "cgrates.com", TOR: "premium_call", Account: "first_account", Subject: "first_subject", Destination: "1002", + AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC), Duration: time.Duration(12) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, MediationRunId: "wholesale_run", Cost: -1} if !reflect.DeepEqual(rtCdrOut2, expctRatedCdr2) { t.Errorf("Received: %v, expected: %v", rtCdrOut2, expctRatedCdr2) diff --git a/utils/ratedcdr_test.go b/utils/ratedcdr_test.go index e777889af..d3ff122e5 100644 --- a/utils/ratedcdr_test.go +++ b/utils/ratedcdr_test.go @@ -19,9 +19,9 @@ along with this program. If not, see package utils import ( + "reflect" "testing" "time" - "reflect" ) func TestRatedCDRInterfaces(t *testing.T) { @@ -33,9 +33,9 @@ func TestNewRatedCDRFromRawCDR(t *testing.T) { cgrCdr := CgrCdr{"accid": "dsafdsaf", "cdrhost": "192.168.1.1", "cdrsource": "internal_test", "reqtype": "rated", "direction": "*out", "tenant": "cgrates.org", "tor": "call", "account": "1001", "subject": "1001", "destination": "1002", "answer_time": "2013-11-07T08:42:26Z", "duration": "10", "field_extr1": "val_extr1", "fieldextr2": "valextr2"} - expctRtCdr := &RatedCDR{CgrId: FSCgrId(cgrCdr["accid"]), AccId: cgrCdr["accid"], CdrHost: cgrCdr["cdrhost"], CdrSource: cgrCdr["cdrsource"], ReqType: cgrCdr["reqtype"], + expctRtCdr := &RatedCDR{CgrId: FSCgrId(cgrCdr["accid"]), AccId: cgrCdr["accid"], CdrHost: cgrCdr["cdrhost"], CdrSource: cgrCdr["cdrsource"], ReqType: cgrCdr["reqtype"], Direction: cgrCdr["direction"], Tenant: cgrCdr["tenant"], TOR: cgrCdr["tor"], Account: cgrCdr["account"], Subject: cgrCdr["subject"], - Destination: cgrCdr["destination"], AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), Duration: time.Duration(10)*time.Second, + Destination: cgrCdr["destination"], AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), Duration: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, MediationRunId: DEFAULT_RUNID, Cost: -1} if rt, err := NewRatedCDRFromRawCDR(cgrCdr); err != nil { t.Error(err) @@ -48,7 +48,7 @@ func TestRatedCdrFields(t *testing.T) { ratedCdr := RatedCDR{CgrId: FSCgrId("dsafdsaf"), AccId: "dsafdsaf", CdrHost: "192.168.1.1", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", TOR: "call", Account: "1001", Subject: "1001", Destination: "1002", AnswerTime: time.Unix(1383813746, 0), Duration: 10, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, - } + } if ratedCdr.GetCgrId() != "b18944ef4dc618569f24c27b9872827a242bad0c" { t.Error("Error parsing cdr: ", ratedCdr) } From 3f506b50c8bad9e4fff47d5b1cd504c3a496619d Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 8 Jan 2014 20:36:29 +0100 Subject: [PATCH 3/7] Fixup Get and Rem ActionTiming API methods --- apier/v1/accounts.go | 24 ++++++++++++++---------- engine/action_timing.go | 30 +++++++++++++++++++++++++++++- engine/actions_test.go | 31 +++++++++++++++++++++++++++++++ utils/coreutils.go | 4 ++++ 4 files changed, 78 insertions(+), 11 deletions(-) diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index 1242910ad..f30a059d1 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/cgrates/engine" "time" ) @@ -65,32 +66,35 @@ func (self *ApierV1) GetAccountActionTimings(attrs AttrAcntActionTimings, reply } type AttrRemAcntActionTiming struct { + ActionTimingsId string // Id identifying the ActionTimings profile + ActionTimingId string // Internal CGR id identifying particular ActionTiming, *all for all user related ActionTimings to be canceled Tenant string // Tenant he account belongs to Account string // Account name Direction string // Traffic direction - ActionTimingsId string // Id identifying the ActionTimings profile - ActionTimingId string // Internal CGR id identifying particular ActionTiming, *all for all user related ActionTimings to be canceled } -func (self *ApierV1) RemAccountActionTiming(attrs AttrRemAcntActionTiming, reply *string) error { - if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account", "Direction", "ActionTimingsId", "ActionTimingId"}); len(missing) != 0 { +// Removes an ActionTimings or parts of it depending on filters being set +func (self *ApierV1) RemActionTiming(attrs AttrRemAcntActionTiming, reply *string) error { + if missing := utils.MissingStructFields(&attrs, []string{"ActionTimingsId"}); len(missing) != 0 { // Only mandatory ActionTimingsId return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) } + if len(attrs.Account) != 0 { // Presence of Account requires complete account details to be provided + if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account", "Direction"}); len(missing) != 0 { + return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) + } + } + // ToDo: lock here actionTimings ats, err := self.AccountDb.GetActionTimings(attrs.ActionTimingsId) if err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } else if len(ats) == 0 { return errors.New(utils.ERR_NOT_FOUND) } - for idx, at := range ats { - if utils.IsSliceMember(at.UserBalanceIds, BalanceId(attrs.Tenant, attrs.Account, attrs.Direction)) && - (at.Id == attrs.ActionTimingId || attrs.ActionTimingId == "*any") { - ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1] // Remove from ats - } - } + ats = engine.RemActionTiming(ats, attrs.ActionTimingId, utils.BalanceKey(attrs.Tenant, attrs.Account, attrs.Direction)) if err := self.AccountDb.SetActionTimings(attrs.ActionTimingsId, ats); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } + // ToDo: Unlock here actionTimings *reply = OK return nil } diff --git a/engine/action_timing.go b/engine/action_timing.go index f28a50cd9..0ade0e0c0 100644 --- a/engine/action_timing.go +++ b/engine/action_timing.go @@ -24,7 +24,6 @@ import ( "strconv" "strings" "time" - "github.com/cgrates/cgrates/utils" ) @@ -296,3 +295,32 @@ func (atpl ActionTimingPriotityList) Sort() { func (at *ActionTiming) String_DISABLED() string { return at.Tag + " " + at.GetNextStartTime().String() + ",w: " + strconv.FormatFloat(at.Weight, 'f', -1, 64) } + +// Helper to remove ActionTiming members based on specific filters, empty data means no always match +func RemActionTiming(ats ActionTimings, actionTimingId, balanceId string) ActionTimings { + for idx, at := range ats { + if len(actionTimingId)!=0 && at.Id!=actionTimingId { // No Match for ActionTimingId, no need to move further + continue + } + if len(balanceId) == 0 { // No account defined, considered match for complete removal + if len(ats) == 1 { // Removing last item, by init empty + return make([]*ActionTiming,0) + } + ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1] + continue + } + for iBlnc, blncId := range at.UserBalanceIds { + if blncId == balanceId { + if len(at.UserBalanceIds) == 1 { // Only one balance, remove complete at + if len(ats) == 1 { // Removing last item, by init empty + return make([]*ActionTiming,0) + } + ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1] + } else { + at.UserBalanceIds[iBlnc], at.UserBalanceIds = at.UserBalanceIds[len(at.UserBalanceIds)-1], at.UserBalanceIds[:len(at.UserBalanceIds)-1] + } + } + } + } + return ats +} diff --git a/engine/actions_test.go b/engine/actions_test.go index 4b7733c56..99622aca7 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -421,6 +421,37 @@ func TestActionTimingPriotityListWeight(t *testing.T) { } } +func TestActionTimingsRemoveMember(t *testing.T) { + at1 := &ActionTiming{ + Id: "some uuid", + Tag: "test", + UserBalanceIds: []string{"one", "two", "three"}, + ActionsId: "TEST_ACTIONS", + } + at2 := &ActionTiming{ + Id: "some uuid22", + Tag: "test2", + UserBalanceIds: []string{"three", "four"}, + ActionsId: "TEST_ACTIONS2", + } + ats := ActionTimings{at1, at2} + if outAts := RemActionTiming(ats, "", "four"); len(outAts[1].UserBalanceIds) != 1 { + t.Error("Expecting fewer balance ids", outAts[1].UserBalanceIds) + } + if ats = RemActionTiming(ats, "", "three"); len(ats) != 1 { + t.Error("Expecting fewer actionTimings", ats) + } + if ats = RemActionTiming(ats, "some_uuid22", "");len(ats) != 1 { + t.Error("Expecting fewer actionTimings members", ats) + } + ats2 := ActionTimings{at1, at2} + if ats2 = RemActionTiming(ats2, "", ""); len(ats2) != 0 { + t.Error("Should have no members anymore", ats2) + } +} + + + func TestActionTriggerMatchNil(t *testing.T) { at := &ActionTrigger{ Direction: OUTBOUND, diff --git a/utils/coreutils.go b/utils/coreutils.go index eaf44b603..7b49a8e0d 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -183,3 +183,7 @@ func ParseDurationWithSecs(durStr string) (time.Duration, error) { } return time.ParseDuration(durStr) } + +func BalanceKey(tenant, account, direction string ) string { + return fmt.Sprintf("%s:%s:%s", direction, tenant, account) +} From 07dd19a9d5248f5d4930c5ba3d8d998a26833dfa Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 8 Jan 2014 21:30:15 +0100 Subject: [PATCH 4/7] Adding Get and Rem APIs for AccountTriggers --- apier/v1/accounts.go | 50 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index f30a059d1..6ecdf14f4 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -65,7 +65,7 @@ func (self *ApierV1) GetAccountActionTimings(attrs AttrAcntActionTimings, reply return nil } -type AttrRemAcntActionTiming struct { +type AttrRemActionTiming struct { ActionTimingsId string // Id identifying the ActionTimings profile ActionTimingId string // Internal CGR id identifying particular ActionTiming, *all for all user related ActionTimings to be canceled Tenant string // Tenant he account belongs to @@ -74,7 +74,7 @@ type AttrRemAcntActionTiming struct { } // Removes an ActionTimings or parts of it depending on filters being set -func (self *ApierV1) RemActionTiming(attrs AttrRemAcntActionTiming, reply *string) error { +func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) error { if missing := utils.MissingStructFields(&attrs, []string{"ActionTimingsId"}); len(missing) != 0 { // Only mandatory ActionTimingsId return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) } @@ -98,3 +98,49 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemAcntActionTiming, reply *strin *reply = OK return nil } + +// Returns a list of ActionTriggers on an account +func (self *ApierV1) GetAccountActionTriggers(attrs AttrAcntActionTimings, reply *engine.ActionTriggerPriotityList) error { + if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account", "Direction"}); len(missing) != 0 { + return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) + } + if balance, err := self.AccountDb.GetUserBalance(utils.BalanceKey(attrs.Tenant, attrs.Account, attrs.Direction)); err != nil { + return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) + } else { + *reply = balance.ActionTriggers + } + return nil +} + +type AttrRemAcntActionTriggers struct { + Tenant string // Tenant he account belongs to + Account string // Account name + Direction string // Traffic direction + ActionTriggerId string // Id filtering only specific id to remove +} + +// Returns a list of ActionTriggers on an account +func (self *ApierV1) RemAccountActionTriggers(attrs AttrRemAcntActionTrigger, reply *string) error { + if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account", "Direction"}); len(missing) != 0 { + return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) + } + ub, err := self.AccountDb.GetUserBalance(utils.BalanceKey(attrs.Tenant, attrs.Account, attrs.Direction)) + if err != nil { + return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) + } + for idx, actr := range ub.ActionTriggers { + if len(attrs.ActionTriggerId) != 0 && actr.Id != attrs.ActionTriggerId { // Empty actionTriggerId will match always + continue + } + if len(ub.ActionTriggers) != 1 { // Remove by index + ub.ActionTriggers[idx], ub.ActionTriggers = ub.ActionTriggers[len(ub.ActionTriggers)-1], ub.ActionTriggers[:len(ub.ActionTriggers)-1] + } else { // For last item, simply reinit the slice + ub.ActionTriggers = make(engine.ActionTriggerPriotityList, 0) + } + } + if err := self.AccountDb.SetUserBalance(ub); err != nil { + return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) + } + *reply = OK + return nil +} From 79497e89709531409f400fba48bfa0ec0e1363de Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 9 Jan 2014 10:44:38 +0100 Subject: [PATCH 5/7] APIs - locking balance and action triggers when operating on them, adding reloadScheduler attribute in some methods --- apier/v1/accounts.go | 62 +++++++++++++++++++++++------------- apier/v1/apier.go | 25 +++++++++------ apier/v1/apier_local_test.go | 10 +++--- 3 files changed, 59 insertions(+), 38 deletions(-) diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index 6ecdf14f4..6f15d29c3 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -71,6 +71,7 @@ type AttrRemActionTiming struct { Tenant string // Tenant he account belongs to Account string // Account name Direction string // Traffic direction + ReloadScheduler bool // If set it will reload the scheduler after adding } // Removes an ActionTimings or parts of it depending on filters being set @@ -83,18 +84,26 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) } } - // ToDo: lock here actionTimings - ats, err := self.AccountDb.GetActionTimings(attrs.ActionTimingsId) + _, err := engine.AccLock.Guard(engine.ACTION_TIMING_PREFIX+attrs.ActionTimingId, func() (float64, error) { // ToDo: Expand the scheduler to consider the locks also + ats, err := self.AccountDb.GetActionTimings(attrs.ActionTimingsId) + if err != nil { + return 0, err + } else if len(ats) == 0 { + return 0, errors.New(utils.ERR_NOT_FOUND) + } + ats = engine.RemActionTiming(ats, attrs.ActionTimingId, utils.BalanceKey(attrs.Tenant, attrs.Account, attrs.Direction)) + if err := self.AccountDb.SetActionTimings(attrs.ActionTimingsId, ats); err != nil { + return 0, err + } + return 0, nil + }) if err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) - } else if len(ats) == 0 { - return errors.New(utils.ERR_NOT_FOUND) } - ats = engine.RemActionTiming(ats, attrs.ActionTimingId, utils.BalanceKey(attrs.Tenant, attrs.Account, attrs.Direction)) - if err := self.AccountDb.SetActionTimings(attrs.ActionTimingsId, ats); err != nil { - return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) + if attrs.ReloadScheduler && self.Sched != nil { + self.Sched.LoadActionTimings(self.AccountDb) + self.Sched.Restart() } - // ToDo: Unlock here actionTimings *reply = OK return nil } @@ -120,27 +129,34 @@ type AttrRemAcntActionTriggers struct { } // Returns a list of ActionTriggers on an account -func (self *ApierV1) RemAccountActionTriggers(attrs AttrRemAcntActionTrigger, reply *string) error { +func (self *ApierV1) RemAccountActionTriggers(attrs AttrRemAcntActionTriggers, reply *string) error { if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account", "Direction"}); len(missing) != 0 { return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) } - ub, err := self.AccountDb.GetUserBalance(utils.BalanceKey(attrs.Tenant, attrs.Account, attrs.Direction)) + balanceId := utils.BalanceKey(attrs.Tenant, attrs.Account, attrs.Direction) + _, err := engine.AccLock.Guard(balanceId, func() (float64, error) { + ub, err := self.AccountDb.GetUserBalance(balanceId) + if err != nil { + return 0, err + } + for idx, actr := range ub.ActionTriggers { + if len(attrs.ActionTriggerId) != 0 && actr.Id != attrs.ActionTriggerId { // Empty actionTriggerId will match always + continue + } + if len(ub.ActionTriggers) != 1 { // Remove by index + ub.ActionTriggers[idx], ub.ActionTriggers = ub.ActionTriggers[len(ub.ActionTriggers)-1], ub.ActionTriggers[:len(ub.ActionTriggers)-1] + } else { // For last item, simply reinit the slice + ub.ActionTriggers = make(engine.ActionTriggerPriotityList, 0) + } + } + if err := self.AccountDb.SetUserBalance(ub); err != nil { + return 0, err + } + return 0, nil + }) if err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } - for idx, actr := range ub.ActionTriggers { - if len(attrs.ActionTriggerId) != 0 && actr.Id != attrs.ActionTriggerId { // Empty actionTriggerId will match always - continue - } - if len(ub.ActionTriggers) != 1 { // Remove by index - ub.ActionTriggers[idx], ub.ActionTriggers = ub.ActionTriggers[len(ub.ActionTriggers)-1], ub.ActionTriggers[:len(ub.ActionTriggers)-1] - } else { // For last item, simply reinit the slice - ub.ActionTriggers = make(engine.ActionTriggerPriotityList, 0) - } - } - if err := self.AccountDb.SetUserBalance(ub); err != nil { - return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) - } *reply = OK return nil } diff --git a/apier/v1/apier.go b/apier/v1/apier.go index dc0790e94..70673cc57 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -288,6 +288,7 @@ type AttrSetActionTimings struct { ActionTimingsId string // Profile id Overwrite bool // If previously defined, will be overwritten ActionTimings []*ApiActionTiming // Set of actions this Actions profile will perform + ReloadScheduler bool // Enables automatic reload of the scheduler (eg: useful when adding a single action timing) } type ApiActionTiming struct { @@ -342,6 +343,10 @@ func (self *ApierV1) SetActionTimings(attrs AttrSetActionTimings, reply *string) if err := self.AccountDb.SetActionTimings(attrs.ActionTimingsId, storeAtms); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } + if attrs.ReloadScheduler && self.Sched != nil { + self.Sched.LoadActionTimings(self.AccountDb) + self.Sched.Restart() + } *reply = OK return nil } @@ -375,7 +380,7 @@ func (self *ApierV1) AddTriggeredAction(attr AttrAddActionTrigger, reply *string Executed: false, } - tag := fmt.Sprintf("%s:%s:%s", attr.Direction, attr.Tenant, attr.Account) + tag := utils.BalanceKey(attr.Tenant, attr.Account, attr.Direction) _, err := engine.AccLock.Guard(tag, func() (float64, error) { userBalance, err := self.AccountDb.GetUserBalance(tag) if err != nil { @@ -410,7 +415,7 @@ func (self *ApierV1) AddAccount(attr AttrAddAccount, reply *string) error { if missing := utils.MissingStructFields(&attr, []string{"Tenant", "Direction", "Account", "Type", "ActionTimingsId"}); len(missing) != 0 { return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) } - tag := fmt.Sprintf("%s:%s:%s", attr.Direction, attr.Tenant, attr.Account) + tag := utils.BalanceKey(attr.Tenant, attr.Account, attr.Direction) ub := &engine.UserBalance{ Id: tag, Type: attr.Type, @@ -441,7 +446,7 @@ func (self *ApierV1) AddAccount(attr AttrAddAccount, reply *string) error { } // Process dependencies and load a specific AccountActions profile from storDb into dataDb. -func (self *ApierV1) SetAccountActions(attrs utils.TPAccountActions, reply *string) error { +func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *string) error { if missing := utils.MissingStructFields(&attrs, []string{"TPid", "LoadId", "Tenant", "Account", "Direction"}); len(missing) != 0 { return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) } @@ -464,14 +469,14 @@ func (self *ApierV1) SetAccountActions(attrs utils.TPAccountActions, reply *stri } func (self *ApierV1) ReloadScheduler(input string, reply *string) error { - if self.Sched != nil { - self.Sched.LoadActionTimings(self.AccountDb) - self.Sched.Restart() - *reply = OK - return nil + if self.Sched == nil { + return errors.New(utils.ERR_NOT_FOUND) } - *reply = utils.ERR_NOT_FOUND - return errors.New(utils.ERR_NOT_FOUND) + self.Sched.LoadActionTimings(self.AccountDb) + self.Sched.Restart() + *reply = OK + return nil + } func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) error { diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 3074760c7..9c544421e 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -754,17 +754,17 @@ func TestApierSetRatingProfile(t *testing.T) { } } -// Test here SetAccountActions -func TestApierSetAccountActions(t *testing.T) { +// Test here LoadAccountActions +func TestApierLoadAccountActions(t *testing.T) { if !*testLocal { return } reply := "" aa1 := &utils.TPAccountActions{TPid: engine.TEST_SQL, LoadId: engine.TEST_SQL, Tenant: "cgrates.org", Account: "1001", Direction: "*out"} - if err := rater.Call("ApierV1.SetAccountActions", aa1, &reply); err != nil { - t.Error("Got error on ApierV1.SetAccountActions: ", err.Error()) + if err := rater.Call("ApierV1.LoadAccountActions", aa1, &reply); err != nil { + t.Error("Got error on ApierV1.LoadAccountActions: ", err.Error()) } else if reply != "OK" { - t.Error("Calling ApierV1.SetAccountActions got reply: ", reply) + t.Error("Calling ApierV1.LoadAccountActions got reply: ", reply) } } From 12e896576ca1d65f104a3c196d63d6920cf3233d Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 9 Jan 2014 13:32:52 +0100 Subject: [PATCH 6/7] Adding Apier tests for cdrserver, newly added commands for account triggers and account timings --- apier/v1/accounts.go | 6 +- apier/v1/apier_local_test.go | 150 ++++++++++++++++++++++++++++++++--- cmd/cgr-engine/cgr-engine.go | 4 + engine/storage_sql.go | 7 +- 4 files changed, 152 insertions(+), 15 deletions(-) diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index 6f15d29c3..ee2ab2b36 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -26,7 +26,7 @@ import ( "time" ) -type AttrAcntActionTimings struct { +type AttrAcntAction struct { Tenant string Account string Direction string @@ -45,7 +45,7 @@ type AccountActionTiming struct { NextExecTime time.Time // Next execution time } -func (self *ApierV1) GetAccountActionTimings(attrs AttrAcntActionTimings, reply *[]*AccountActionTiming) error { +func (self *ApierV1) GetAccountActionTimings(attrs AttrAcntAction, reply *[]*AccountActionTiming) error { if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account", "Direction"}); len(missing) != 0 { return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) } @@ -109,7 +109,7 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e } // Returns a list of ActionTriggers on an account -func (self *ApierV1) GetAccountActionTriggers(attrs AttrAcntActionTimings, reply *engine.ActionTriggerPriotityList) error { +func (self *ApierV1) GetAccountActionTriggers(attrs AttrAcntAction, reply *engine.ActionTriggerPriotityList) error { if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account", "Direction"}); len(missing) != 0 { return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) } diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 9c544421e..0c3ff8b51 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -31,6 +31,9 @@ import ( "reflect" "testing" "time" + "net/http" + "net/url" + "strings" ) // ToDo: Replace rpc.Client with internal rpc server and Apier using internal map as both data and stor so we can run the tests non-local @@ -119,7 +122,7 @@ func TestStartEngine(t *testing.T) { t.Fatal("Cannot find cgr-engine executable") } exec.Command("pkill", "cgr-engine").Run() // Just to make sure another one is not running, bit brutal maybe we can fine tune it - engine := exec.Command(enginePath, "-rater", "-scheduler", "-config", path.Join(*dataDir, "conf", "cgrates.cfg")) + engine := exec.Command(enginePath, "-rater", "-scheduler", "-cdrs", "-mediator", "-config", path.Join(*dataDir, "conf", "cgrates.cfg")) if err := engine.Start(); err != nil { t.Fatal("Cannot start cgr-engine: ", err.Error()) } @@ -984,23 +987,67 @@ func TestApierAddTriggeredAction(t *testing.T) { *attrs2 = *attrs attrs2.Account = "dan3" // Does not exist so it should error when adding triggers on it // Add trigger to an account which does n exist - if err := rater.Call("ApierV1.ExecuteAction", attrs2, &reply2); err == nil || reply2 == "OK" { + if err := rater.Call("ApierV1.AddTriggeredAction", attrs2, &reply2); err == nil || reply2 == "OK" { t.Error("Expecting error on ApierV1.AddTriggeredAction.", err, reply2) } } + + +// Test here AddTriggeredAction +func TestApierGetAccountActionTriggers(t *testing.T) { + if !*testLocal { + return + } + var reply engine.ActionTriggerPriotityList + req := AttrAcntAction{Tenant: "cgrates.org", Account:"dan2", Direction: "*out"} + if err := rater.Call("ApierV1.GetAccountActionTriggers", req, &reply); err != nil { + t.Error("Got error on ApierV1.GetAccountActionTimings: ", err.Error()) + } else if len(reply) != 1 || reply[0].ActionsId != "WARN_VIA_HTTP" { + t.Errorf("Unexpected action triggers received %v", reply) + } +} + +// Test here RemAccountActionTriggers +func TestApierRemAccountActionTriggers(t *testing.T) { + if !*testLocal { + return + } + // Test first get so we can steal the id which we need to remove + var reply engine.ActionTriggerPriotityList + req := AttrAcntAction{Tenant: "cgrates.org", Account:"dan2", Direction: "*out"} + if err := rater.Call("ApierV1.GetAccountActionTriggers", req, &reply); err != nil { + t.Error("Got error on ApierV1.GetAccountActionTimings: ", err.Error()) + } else if len(reply) != 1 || reply[0].ActionsId != "WARN_VIA_HTTP" { + t.Errorf("Unexpected action triggers received %v", reply) + } + var rmReply string + rmReq := AttrRemAcntActionTriggers{Tenant: "cgrates.org", Account:"dan2", Direction: "*out", ActionTriggerId: reply[0].Id} + if err := rater.Call("ApierV1.RemAccountActionTriggers", rmReq, &rmReply); err != nil { + t.Error("Got error on ApierV1.RemActionTiming: ", err.Error()) + } else if rmReply != OK { + t.Error("Unexpected answer received", rmReply) + } + if err := rater.Call("ApierV1.GetAccountActionTriggers", req, &reply); err != nil { + t.Error("Got error on ApierV1.GetAccountActionTriggers: ", err.Error()) + } else if len(reply) != 0 { + t.Errorf("Unexpected action triggers received %v", reply) + } +} + + // Test here AddAccount func TestApierAddAccount(t *testing.T) { if !*testLocal { return } - //reply := "" - attrs := &AttrAddAccount{Tenant: "cgrates.org", Direction: "*out", Account: "dan4", Type: "prepaid", ActionTimingsId: "PREPAID_10"} - //if err := rater.Call("ApierV1.AddAccount", attrs, &reply); err != nil { - // t.Error("Got error on ApierV1.AddAccount: ", err.Error()) - //} else if reply != "OK" { - // t.Errorf("Calling ApierV1.AddAccount received: %s", reply) - //} + reply := "" + attrs := &AttrAddAccount{Tenant: "cgrates.org", Direction: "*out", Account: "dan4", Type: "prepaid", ActionTimingsId: "ATMS_1"} + if err := rater.Call("ApierV1.AddAccount", attrs, &reply); err != nil { + t.Error("Got error on ApierV1.AddAccount: ", err.Error()) + } else if reply != "OK" { + t.Errorf("Calling ApierV1.AddAccount received: %s", reply) + } reply2 := "" attrs2 := new(AttrAddAccount) *attrs2 = *attrs @@ -1011,6 +1058,45 @@ func TestApierAddAccount(t *testing.T) { } } +// Test here GetAccountActionTimings +func TestApierGetAccountActionTimings(t *testing.T) { + if !*testLocal { + return + } + var reply []*AccountActionTiming + req := AttrAcntAction{Tenant: "cgrates.org", Account:"dan4", Direction: "*out"} + if err := rater.Call("ApierV1.GetAccountActionTimings", req, &reply); err != nil { + t.Error("Got error on ApierV1.GetAccountActionTimings: ", err.Error()) + } else if len(reply) != 1 { + t.Error("Unexpected action timings received") + } else { + if reply[0].ActionTimingsId != "ATMS_1" { + t.Errorf("Unexpected ActionTImingsId received") + } + } +} + +// Test here RemActionTiming +func TestApierRemActionTiming(t *testing.T) { + if !*testLocal { + return + } + var rmReply string + rmReq := AttrRemActionTiming{ActionTimingsId: "ATMS_1", Tenant: "cgrates.org", Account:"dan4", Direction: "*out"} + if err := rater.Call("ApierV1.RemActionTiming", rmReq, &rmReply); err != nil { + t.Error("Got error on ApierV1.RemActionTiming: ", err.Error()) + } else if rmReply != OK { + t.Error("Unexpected answer received", rmReply) + } + var reply []*AccountActionTiming + req := AttrAcntAction{Tenant: "cgrates.org", Account:"dan4", Direction: "*out"} + if err := rater.Call("ApierV1.GetAccountActionTimings", req, &reply); err != nil { + t.Error("Got error on ApierV1.GetAccountActionTimings: ", err.Error()) + } else if len(reply) != 0 { + t.Error("Action timings was not removed") + } +} + // Test here GetBalance func TestApierGetBalance(t *testing.T) { if !*testLocal { @@ -1080,6 +1166,52 @@ func TestResponderGetCost(t *testing.T) { } } +func TestCdrServer(t *testing.T) { + httpClient := new(http.Client) + cdrForm1 := url.Values{"accid": []string{"dsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, + "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"}, + "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} + cdrForm2 := url.Values{"accid": []string{"adsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, + "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"}, + "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} + for _, cdrForm := range []url.Values{cdrForm1, cdrForm2} { + cdrForm.Set(utils.CDRSOURCE, engine.TEST_SQL) + if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", cfg.CdrcCdrs), cdrForm); err != nil { + t.Error(err.Error()) + } + } +} + +func TestExportCdrsToFile(t *testing.T) { + var reply *utils.ExportedFileCdrs + req := utils.AttrExpFileCdrs{} + if err := rater.Call("ApierV1.ExportCdrsToFile", req, &reply); err == nil || !strings.HasPrefix(err.Error(), utils.ERR_MANDATORY_IE_MISSING) { + t.Error("Failed to detect missing parameter") + } + req.CdrFormat = utils.CDRE_DRYRUN + expectReply := &utils.ExportedFileCdrs{NumberOfRecords: 2} + if err := rater.Call("ApierV1.ExportCdrsToFile", req, &reply); err != nil { + t.Error(err.Error()) + } else if !reflect.DeepEqual(reply, expectReply) { + t.Errorf("Unexpected reply: %v", reply) + } + /* Need to implement temporary file writing in order to test removal from db, not possible on DRYRUN + req.RemoveFromDb = true + if err := rater.Call("ApierV1.ExportCdrsToFile", req, &reply); err != nil { + t.Error(err.Error()) + } else if !reflect.DeepEqual(reply, expectReply) { + t.Errorf("Unexpected reply: %v", reply) + } + expectReply.NumberOfRecords = 0 // We should have deleted previously + if err := rater.Call("ApierV1.ExportCdrsToFile", req, &reply); err != nil { + t.Error(err.Error()) + } else if !reflect.DeepEqual(reply, expectReply) { + t.Errorf("Unexpected reply: %v", reply) + } + */ +} + + // Simply kill the engine after we are done with tests within this file func TestStopEngine(t *testing.T) { if !*testLocal { diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 653105811..bcf04aae5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -65,6 +65,7 @@ var ( schedEnabled = flag.Bool("scheduler", false, "Enforce starting of the scheduler daemon overwriting config") cdrsEnabled = flag.Bool("cdrs", false, "Enforce starting of the cdrs daemon overwriting config") cdrcEnabled = flag.Bool("cdrc", false, "Enforce starting of the cdrc service overwriting config") + mediatorEnabled = flag.Bool("mediator", false, "Enforce starting of the mediator service overwriting config") pidFile = flag.String("pid", "", "Write pid file") bal = balancer2go.NewBalancer() exitChan = make(chan bool) @@ -373,6 +374,9 @@ func main() { if *cdrcEnabled { cfg.CdrcEnabled = *cdrcEnabled } + if *mediatorEnabled { + cfg.MediatorEnabled = *mediatorEnabled + } if cfg.RaterEnabled { if err := ratingDb.CacheRating(nil, nil, nil); err != nil { engine.Logger.Crit(fmt.Sprintf("Cache rating error: %v", err)) diff --git a/engine/storage_sql.go b/engine/storage_sql.go index b4c1a36ac..539c41a29 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -758,11 +758,12 @@ func (self *SQLStorage) GetRatedCdrs(timeStart, timeEnd time.Time) ([]*utils.Rat } defer rows.Close() for rows.Next() { - var cgrid, accid, cdrhost, cdrsrc, reqtype, direction, tenant, tor, account, subject, destination, runid string + var cgrid, accid, cdrhost, cdrsrc, reqtype, direction, tenant, tor, account, subject, destination string var extraFields []byte var answerTime time.Time var duration int64 - var cost float64 + var runid sql.NullString // So we can export unmediated CDRs + var cost sql.NullFloat64 // So we can export unmediated CDRs var extraFieldsMp map[string]string if err := rows.Scan(&cgrid, &accid, &cdrhost, &cdrsrc, &reqtype, &direction, &tenant, &tor, &account, &subject, &destination, &answerTime, &duration, &extraFields, &runid, &cost); err != nil { @@ -774,7 +775,7 @@ func (self *SQLStorage) GetRatedCdrs(timeStart, timeEnd time.Time) ([]*utils.Rat storCdr := &utils.RatedCDR{ CgrId: cgrid, AccId: accid, CdrHost: cdrhost, CdrSource: cdrsrc, ReqType: reqtype, Direction: direction, Tenant: tenant, TOR: tor, Account: account, Subject: subject, Destination: destination, AnswerTime: answerTime, Duration: time.Duration(duration), - ExtraFields: extraFieldsMp, MediationRunId: runid, Cost: cost, + ExtraFields: extraFieldsMp, MediationRunId: runid.String, Cost: cost.Float64, } cdrs = append(cdrs, storCdr) } From 23a95ed890d4f4016ed82119b508649ad784d750 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 9 Jan 2014 17:10:58 +0200 Subject: [PATCH 7/7] added mutex for schedular loop --- scheduler/scheduler.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 976ff45bc..b655a4be6 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -21,6 +21,7 @@ package scheduler import ( "fmt" "sort" + "sync" "time" "github.com/cgrates/cgrates/engine" @@ -30,6 +31,7 @@ type Scheduler struct { queue engine.ActionTimingPriotityList timer *time.Timer restartLoop chan bool + sync.Mutex } func NewScheduler() *Scheduler { @@ -38,6 +40,7 @@ func NewScheduler() *Scheduler { func (s *Scheduler) Loop() { for { + s.Lock() for len(s.queue) == 0 { //hang here if empty <-s.restartLoop } @@ -61,6 +64,7 @@ func (s *Scheduler) Loop() { // nothing to do, just continue the loop } } + s.Unlock() } } @@ -70,6 +74,7 @@ func (s *Scheduler) LoadActionTimings(storage engine.AccountingStorage) { engine.Logger.Warning(fmt.Sprintf("Cannot get action timings: %v", err)) } // recreate the queue + s.Lock() s.queue = engine.ActionTimingPriotityList{} for key, ats := range actionTimings { toBeSaved := false @@ -92,6 +97,7 @@ func (s *Scheduler) LoadActionTimings(storage engine.AccountingStorage) { } } sort.Sort(s.queue) + s.Unlock() } func (s *Scheduler) Restart() {