From aff481375792947d47f089184c94d671ef889f2b Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 28 Jun 2013 22:47:39 +0300 Subject: [PATCH] unified sql storages and worked on rating profile load by tag --- rater/loader_db.go | 79 +++++-- rater/ratingprofile.go | 8 +- rater/storage_interface.go | 21 +- rater/storage_map.go | 27 +-- rater/storage_mongo.go | 26 ++- rater/storage_mysql.go | 419 +------------------------------------ rater/storage_postgres.go | 401 +---------------------------------- rater/storage_redis.go | 26 ++- utils/cdr.go | 48 +++++ 9 files changed, 173 insertions(+), 882 deletions(-) diff --git a/rater/loader_db.go b/rater/loader_db.go index 31e63d047..117dd147e 100644 --- a/rater/loader_db.go +++ b/rater/loader_db.go @@ -117,22 +117,22 @@ func (dbr *DbReader) WriteToDatabase(storage DataStorage, flush, verbose bool) ( } func (dbr *DbReader) LoadDestinations() (err error) { - dbr.destinations, err = dbr.storDB.GetTpDestinations(dbr.tpid) + dbr.destinations, err = dbr.storDB.GetTpDestinations(dbr.tpid, "") return } func (dbr *DbReader) LoadRates() (err error) { - dbr.rates, err = dbr.storDB.GetTpRates(dbr.tpid) + dbr.rates, err = dbr.storDB.GetTpRates(dbr.tpid, "") return err } func (dbr *DbReader) LoadTimings() (err error) { - dbr.timings, err = dbr.storDB.GetTpTimings(dbr.tpid) + dbr.timings, err = dbr.storDB.GetTpTimings(dbr.tpid, "") return err } func (dbr *DbReader) LoadRateTimings() error { - rts, err := dbr.storDB.GetTpRateTimings(dbr.tpid) + rts, err := dbr.storDB.GetTpRateTimings(dbr.tpid, "") if err != nil { return err } @@ -141,11 +141,7 @@ func (dbr *DbReader) LoadRateTimings() error { if !exists { return errors.New(fmt.Sprintf("Could not get timing for tag %v", rt.TimingsTag)) } - rateTiming := &RateTiming{ - RatesTag: rt.RatesTag, - Weight: rt.Weight, - timing: t, - } + rt.timing = t rs, exists := dbr.rates[rt.RatesTag] if !exists { return errors.New(fmt.Sprintf("Could not find rate for tag %v", rt.RatesTag)) @@ -155,14 +151,14 @@ func (dbr *DbReader) LoadRateTimings() error { if !exists { dbr.activationPeriods[rt.Tag] = &ActivationPeriod{} } - dbr.activationPeriods[rt.Tag].AddIntervalIfNotPresent(rateTiming.GetInterval(r)) + dbr.activationPeriods[rt.Tag].AddIntervalIfNotPresent(rt.GetInterval(r)) } } return nil } func (dbr *DbReader) LoadRatingProfiles() error { - rpfs, err := dbr.storDB.GetTpRatingProfiles(dbr.tpid) + rpfs, err := dbr.storDB.GetTpRatingProfiles(dbr.tpid, "") if err != nil { return err } @@ -180,21 +176,68 @@ func (dbr *DbReader) LoadRatingProfiles() error { //copy(newAP.Intervals, ap.Intervals) newAP.Intervals = append(newAP.Intervals, ap.Intervals...) rp.AddActivationPeriodIfNotPresent(d.Id, newAP) - if rp.fallbackSubject != "" { - rp.FallbackKey = fmt.Sprintf("%s:%s:%s:%s", rp.direction, rp.tenant, rp.tor, rp.fallbackSubject) - } + } } return nil } +func (dbr *DbReader) LoadRatingProfile(tag string) error { + activationPeriods := make(map[string]*ActivationPeriod) + + rpm, err := dbr.storDB.GetTpRatingProfiles(dbr.tpid, tag) + if err != nil { + return err + } + for _, ratingProfile := range rpm { + at, err := time.Parse(time.RFC3339, ratingProfile.activationTime) + if err != nil { + return errors.New(fmt.Sprintf("Cannot parse activation time from %v", ratingProfile.activationTime)) + } + rtm, err := dbr.storDB.GetTpRateTimings(dbr.tpid, ratingProfile.ratesTimingTag) + if err != nil { + return err + } + for _, rateTiming := range rtm { + tm, err := dbr.storDB.GetTpTimings(dbr.tpid, rateTiming.TimingsTag) + if err != nil { + return err + } + rateTiming.timing = tm[rateTiming.TimingsTag] + rm, err := dbr.storDB.GetTpRates(dbr.tpid, rateTiming.RatesTag) + if err != nil { + return err + } + for _, rate := range rm[rateTiming.RatesTag] { + _, exists := activationPeriods[rateTiming.Tag] + if !exists { + activationPeriods[rateTiming.Tag] = &ActivationPeriod{} + } + activationPeriods[rateTiming.Tag].AddIntervalIfNotPresent(rateTiming.GetInterval(rate)) + dm, err := dbr.storDB.GetTpDestinations(dbr.tpid, rate.DestinationsTag) + if err != nil { + return err + } + for _, destination := range dm { + ap := activationPeriods[ratingProfile.ratesTimingTag] + newAP := &ActivationPeriod{ActivationTime: at} + newAP.Intervals = append(newAP.Intervals, ap.Intervals...) + ratingProfile.AddActivationPeriodIfNotPresent(destination.Id, newAP) + } + } + } + } + + return nil +} + func (dbr *DbReader) LoadActions() (err error) { - dbr.actions, err = dbr.storDB.GetTpActions(dbr.tpid) + dbr.actions, err = dbr.storDB.GetTpActions(dbr.tpid, "") return err } func (dbr *DbReader) LoadActionTimings() (err error) { - atsMap, err := dbr.storDB.GetTpActionTimings(dbr.tpid) + atsMap, err := dbr.storDB.GetTpActionTimings(dbr.tpid, "") if err != nil { return err } @@ -227,12 +270,12 @@ func (dbr *DbReader) LoadActionTimings() (err error) { } func (dbr *DbReader) LoadActionTriggers() (err error) { - dbr.actionsTriggers, err = dbr.storDB.GetTpActionTriggers(dbr.tpid) + dbr.actionsTriggers, err = dbr.storDB.GetTpActionTriggers(dbr.tpid, "") return err } func (dbr *DbReader) LoadAccountActions() (err error) { - acs, err := dbr.storDB.GetTpAccountActions(dbr.tpid) + acs, err := dbr.storDB.GetTpAccountActions(dbr.tpid, "") if err != nil { return err } diff --git a/rater/ratingprofile.go b/rater/ratingprofile.go index e8242f566..2a8205d6b 100644 --- a/rater/ratingprofile.go +++ b/rater/ratingprofile.go @@ -29,10 +29,10 @@ const ( ) type RatingProfile struct { - Id string - FallbackKey string - DestinationMap map[string][]*ActivationPeriod - tenant, tor, direction, subject, fallbackSubject, ratesTimingTag, activationTime string // used only for loading + Id string + FallbackKey string + DestinationMap map[string][]*ActivationPeriod + ratesTimingTag, activationTime string // used only for loading } // Adds an activation period that applyes to current rating profile if not already present. diff --git a/rater/storage_interface.go b/rater/storage_interface.go index 74f576b41..cb6019a26 100644 --- a/rater/storage_interface.go +++ b/rater/storage_interface.go @@ -57,7 +57,7 @@ type DataStorage interface { SetRatingProfile(*RatingProfile) error GetDestination(string) (*Destination, error) SetDestination(*Destination) error - GetTPDestination(string,string) (*Destination, error) + GetTPDestination(string, string) (*Destination, error) GetActions(string) (Actions, error) SetActions(string, Actions) error GetUserBalance(string) (*UserBalance, error) @@ -67,6 +67,7 @@ type DataStorage interface { GetAllActionTimings() (map[string]ActionTimings, error) SetCdr(utils.CDR) error SetRatedCdr(utils.CDR, *CallCost) error + GetAllRatedCdr() ([]utils.CDR, error) //GetAllActionTimingsLogs() (map[string]ActionsTimings, error) LogCallCost(uuid, source string, cc *CallCost) error LogError(uuid, source, errstr string) error @@ -74,15 +75,15 @@ type DataStorage interface { LogActionTiming(source string, at *ActionTiming, as Actions) error GetCallCostLog(uuid, source string) (*CallCost, error) // loader functions - GetTpDestinations(string) ([]*Destination, error) - GetTpRates(string) (map[string][]*Rate, error) - GetTpTimings(string) (map[string]*Timing, error) - GetTpRateTimings(string) ([]*RateTiming, error) - GetTpRatingProfiles(string) (map[string]*RatingProfile, error) - GetTpActions(string) (map[string][]*Action, error) - GetTpActionTimings(string) (map[string][]*ActionTiming, error) - GetTpActionTriggers(string) (map[string][]*ActionTrigger, error) - GetTpAccountActions(string) ([]*AccountAction, error) + GetTpDestinations(string, string) ([]*Destination, error) + GetTpRates(string, string) (map[string][]*Rate, error) + GetTpTimings(string, string) (map[string]*Timing, error) + GetTpRateTimings(string, string) ([]*RateTiming, error) + GetTpRatingProfiles(string, string) (map[string]*RatingProfile, error) + GetTpActions(string, string) (map[string][]*Action, error) + GetTpActionTimings(string, string) (map[string][]*ActionTiming, error) + GetTpActionTriggers(string, string) (map[string][]*ActionTrigger, error) + GetTpAccountActions(string, string) ([]*AccountAction, error) } type Marshaler interface { diff --git a/rater/storage_map.go b/rater/storage_map.go index 7dd7afd7b..a326111f3 100644 --- a/rater/storage_map.go +++ b/rater/storage_map.go @@ -74,11 +74,10 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) { } // Extracts destinations from StorDB on specific tariffplan id -func (ms *MapStorage) GetTPDestination( tpid, destTag string ) (*Destination, error) { - return nil, nil +func (ms *MapStorage) GetTPDestination(tpid, destTag string) (*Destination, error) { + return nil, nil } - func (ms *MapStorage) GetActions(key string) (as Actions, err error) { if values, ok := ms.dict[ACTION_PREFIX+key]; ok { err = ms.ms.Unmarshal(values, &as) @@ -198,31 +197,35 @@ func (ms *MapStorage) SetRatedCdr(utils.CDR, *CallCost) error { return nil } -func (ms *MapStorage) GetTpDestinations(tpid string) ([]*Destination, error) { +func (ms *MapStorage) GetAllRatedCdr() ([]utils.CDR, error) { return nil, nil } -func (ms *MapStorage) GetTpRates(string) (map[string][]*Rate, error) { +func (ms *MapStorage) GetTpDestinations(tpid, tag string) ([]*Destination, error) { return nil, nil } -func (ms *MapStorage) GetTpTimings(string) (map[string]*Timing, error) { + +func (ms *MapStorage) GetTpRates(tpid, tag string) (map[string][]*Rate, error) { return nil, nil } -func (ms *MapStorage) GetTpRateTimings(string) ([]*RateTiming, error) { +func (ms *MapStorage) GetTpTimings(tpid, tag string) (map[string]*Timing, error) { return nil, nil } -func (ms *MapStorage) GetTpRatingProfiles(string) (map[string]*RatingProfile, error) { +func (ms *MapStorage) GetTpRateTimings(tpid, tag string) ([]*RateTiming, error) { return nil, nil } -func (ms *MapStorage) GetTpActions(string) (map[string][]*Action, error) { +func (ms *MapStorage) GetTpRatingProfiles(tpid, tag string) (map[string]*RatingProfile, error) { return nil, nil } -func (ms *MapStorage) GetTpActionTimings(string) (map[string][]*ActionTiming, error) { +func (ms *MapStorage) GetTpActions(tpid, tag string) (map[string][]*Action, error) { return nil, nil } -func (ms *MapStorage) GetTpActionTriggers(string) (map[string][]*ActionTrigger, error) { +func (ms *MapStorage) GetTpActionTimings(tpid, tag string) (map[string][]*ActionTiming, error) { return nil, nil } -func (ms *MapStorage) GetTpAccountActions(string) ([]*AccountAction, error) { +func (ms *MapStorage) GetTpActionTriggers(tpid, tag string) (map[string][]*ActionTrigger, error) { + return nil, nil +} +func (ms *MapStorage) GetTpAccountActions(tpid, tag string) ([]*AccountAction, error) { return nil, nil } diff --git a/rater/storage_mongo.go b/rater/storage_mongo.go index 4a3bd3f04..4a4290a88 100644 --- a/rater/storage_mongo.go +++ b/rater/storage_mongo.go @@ -19,8 +19,8 @@ along with this program. If not, see package rater import ( - "fmt" "errors" + "fmt" "github.com/cgrates/cgrates/utils" "labix.org/v2/mgo" "labix.org/v2/mgo/bson" @@ -148,7 +148,7 @@ func (ms *MongoStorage) SetDestination(dest *Destination) error { } // Extracts destinations from StorDB on specific tariffplan id -func (ms *MongoStorage) GetTPDestination( tpid, destTag string ) (*Destination, error) { +func (ms *MongoStorage) GetTPDestination(tpid, destTag string) (*Destination, error) { return nil, errors.New(utils.ERR_NOT_IMPLEMENTED) } @@ -223,35 +223,39 @@ func (ms *MongoStorage) SetRatedCdr(utils.CDR, *CallCost) error { return nil } +func (ms *MongoStorage) GetAllRatedCdr() ([]utils.CDR, error) { + return nil, nil +} + func (ms *MongoStorage) GetDestinations(tpid string) ([]*Destination, error) { return nil, nil } -func (ms *MongoStorage) GetTpDestinations(tpid string) ([]*Destination, error) { +func (ms *MongoStorage) GetTpDestinations(tpid, tag string) ([]*Destination, error) { return nil, nil } -func (ms *MongoStorage) GetTpRates(string) (map[string][]*Rate, error) { +func (ms *MongoStorage) GetTpRates(tpid, tag string) (map[string][]*Rate, error) { return nil, nil } -func (ms *MongoStorage) GetTpTimings(string) (map[string]*Timing, error) { +func (ms *MongoStorage) GetTpTimings(tpid, tag string) (map[string]*Timing, error) { return nil, nil } -func (ms *MongoStorage) GetTpRateTimings(string) ([]*RateTiming, error) { +func (ms *MongoStorage) GetTpRateTimings(tpid, tag string) ([]*RateTiming, error) { return nil, nil } -func (ms *MongoStorage) GetTpRatingProfiles(string) (map[string]*RatingProfile, error) { +func (ms *MongoStorage) GetTpRatingProfiles(tpid, tag string) (map[string]*RatingProfile, error) { return nil, nil } -func (ms *MongoStorage) GetTpActions(string) (map[string][]*Action, error) { +func (ms *MongoStorage) GetTpActions(tpid, tag string) (map[string][]*Action, error) { return nil, nil } -func (ms *MongoStorage) GetTpActionTimings(string) (map[string][]*ActionTiming, error) { +func (ms *MongoStorage) GetTpActionTimings(tpid, tag string) (map[string][]*ActionTiming, error) { return nil, nil } -func (ms *MongoStorage) GetTpActionTriggers(string) (map[string][]*ActionTrigger, error) { +func (ms *MongoStorage) GetTpActionTriggers(tpid, tag string) (map[string][]*ActionTrigger, error) { return nil, nil } -func (ms *MongoStorage) GetTpAccountActions(string) ([]*AccountAction, error) { +func (ms *MongoStorage) GetTpAccountActions(tpid, tag string) ([]*AccountAction, error) { return nil, nil } diff --git a/rater/storage_mysql.go b/rater/storage_mysql.go index 5b37d076c..af92432df 100644 --- a/rater/storage_mysql.go +++ b/rater/storage_mysql.go @@ -20,14 +20,12 @@ package rater import ( "database/sql" - "encoding/json" "fmt" - "github.com/cgrates/cgrates/utils" _ "github.com/go-sql-driver/mysql" ) type MySQLStorage struct { - Db *sql.DB + *SQLStorage } func NewMySQLStorage(host, port, name, user, password string) (DataStorage, error) { @@ -35,418 +33,5 @@ func NewMySQLStorage(host, port, name, user, password string) (DataStorage, erro if err != nil { return nil, err } - return &MySQLStorage{db}, nil -} - -func (mys *MySQLStorage) Close() {} - -func (mys *MySQLStorage) Flush() (err error) { - return -} - -func (mys *MySQLStorage) GetRatingProfile(string) (rp *RatingProfile, err error) { - /*row := mys.Db.QueryRow(fmt.Sprintf("SELECT * FROM ratingprofiles WHERE id='%s'", id)) - err = row.Scan(&rp, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson) - err = json.Unmarshal([]byte(timespansJson), cc.Timespans)*/ - return -} - -func (mys *MySQLStorage) SetRatingProfile(rp *RatingProfile) (err error) { - return -} - -func (mys *MySQLStorage) GetDestination(string) (d *Destination, err error) { - return -} - -func (mys *MySQLStorage) SetDestination(d *Destination) (err error) { - return -} - -// Extracts destinations from StorDB on specific tariffplan id -func (mys *MySQLStorage) GetTPDestination(tpid, destTag string) (*Destination, error) { - rows, err := mys.Db.Query(fmt.Sprintf("SELECT prefix FROM tp_destinatins WHERE id='%s' AND tag='%s'", tpid, destTag)) - if err != nil { - return nil, err - } - defer rows.Close() - d := &Destination{Id: destTag} - i := 0 - for rows.Next() { - i++ //Keep here a reference so we know we got at least one prefix - var pref string - err = rows.Scan(&pref) - if err != nil { - return nil, err - } - d.Prefixes = append(d.Prefixes, pref) - } - if i == 0 { - return nil, nil - } - return d, nil -} - -func (mys *MySQLStorage) GetActions(string) (as Actions, err error) { - return -} - -func (mys *MySQLStorage) SetActions(key string, as Actions) (err error) { return } - -func (mys *MySQLStorage) GetUserBalance(string) (ub *UserBalance, err error) { return } - -func (mys *MySQLStorage) SetUserBalance(ub *UserBalance) (err error) { return } - -func (mys *MySQLStorage) GetActionTimings(key string) (ats ActionTimings, err error) { return } - -func (mys *MySQLStorage) SetActionTimings(key string, ats ActionTimings) (err error) { return } - -func (mys *MySQLStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) { - return -} - -func (mys *MySQLStorage) LogCallCost(uuid, source string, cc *CallCost) (err error) { - if mys.Db == nil { - //timespans.Logger.Warning("Cannot write log to database.") - return - } - tss, err := json.Marshal(cc.Timespans) - if err != nil { - Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err)) - } - _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO callcosts VALUES ('NULL','%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", - uuid, - source, - cc.Direction, - cc.Tenant, - cc.TOR, - cc.Subject, - cc.Account, - cc.Destination, - cc.Cost, - cc.ConnectFee, - tss)) - if err != nil { - Logger.Err(fmt.Sprintf("failed to execute insert statement: %v", err)) - } - return -} - -func (mys *MySQLStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) { - row := mys.Db.QueryRow(fmt.Sprintf("SELECT * FROM callcosts WHERE uuid='%s' AND source='%s'", uuid, source)) - var uuid_found string - var timespansJson string - err = row.Scan(&uuid_found, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson) - err = json.Unmarshal([]byte(timespansJson), cc.Timespans) - return -} - -func (mys *MySQLStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) { - return -} -func (mys *MySQLStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) { - return -} -func (mys *MySQLStorage) LogError(uuid, source, errstr string) (err error) { return } - -func (mys *MySQLStorage) SetCdr(cdr utils.CDR) (err error) { - startTime, err := cdr.GetAnswerTime() - if err != nil { - return err - } - _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_primary VALUES (NULL, '%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', %d)", - cdr.GetCgrId(), - cdr.GetAccId(), - cdr.GetCdrHost(), - cdr.GetReqType(), - cdr.GetDirection(), - cdr.GetTenant(), - cdr.GetTOR(), - cdr.GetAccount(), - cdr.GetSubject(), - cdr.GetDestination(), - startTime, - cdr.GetDuration(), - )) - if err != nil { - Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) - } - extraFields, err := json.Marshal(cdr.GetExtraFields()) - if err != nil { - Logger.Err(fmt.Sprintf("Error marshalling cdr extra fields to json: %v", err)) - } - _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('NULL','%s', '%s')", - cdr.GetCgrId(), - extraFields, - )) - if err != nil { - Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) - } - - return -} - -func (mys *MySQLStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost) (err error) { - _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO rated_cdrs VALUES ('%s', '%s', '%s', '%s')", - cdr.GetCgrId(), - cc.Cost, - "cgrcostid", - "cdrsrc", - )) - if err != nil { - Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) - } - - return -} - -func (mys *MySQLStorage) GetTpDestinations(tpid string) ([]*Destination, error) { - var dests []*Destination - rows, err := mys.Db.Query("SELECT * FROM tp_destinations WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var tpid, tag, prefix string - if err := rows.Scan(id, tpid, &tag, &prefix); err != nil { - return nil, err - } - var dest *Destination - for _, d := range dests { - if d.Id == tag { - dest = d - break - } - } - if dest == nil { - dest = &Destination{Id: tag} - dests = append(dests, dest) - } - dest.Prefixes = append(dest.Prefixes, prefix) - } - return dests, rows.Err() -} - -func (mys *MySQLStorage) GetTpRates(tpid string) (map[string][]*Rate, error) { - rts := make(map[string][]*Rate) - rows, err := mys.Db.Query("SELECT * FROM tp_rates WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var tpid, tag, destinations_tag string - var connect_fee, rate, priced_units, rate_increments float64 - if err := rows.Scan(&id, &tpid, &tag, &destinations_tag, &connect_fee, &rate, &priced_units, &rate_increments); err != nil { - return nil, err - } - - r := &Rate{ - DestinationsTag: destinations_tag, - ConnectFee: connect_fee, - Price: rate, - PricedUnits: priced_units, - RateIncrements: rate_increments, - } - - rts[tag] = append(rts[tag], r) - } - return rts, rows.Err() -} - -func (mys *MySQLStorage) GetTpTimings(tpid string) (map[string]*Timing, error) { - tms := make(map[string]*Timing) - rows, err := mys.Db.Query("SELECT * FROM tp_timings WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var tpid, tag, years, months, month_days, week_days, start_time string - if err := rows.Scan(&id, &tpid, &tag, &years, &months, &month_days, &week_days, &start_time); err != nil { - return nil, err - } - tms[tag] = NewTiming(years, months, month_days, week_days, start_time) - } - return tms, rows.Err() -} - -func (mys *MySQLStorage) GetTpRateTimings(tpid string) ([]*RateTiming, error) { - var rts []*RateTiming - rows, err := mys.Db.Query("SELECT * FROM tp_rate_timings WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var weight float64 - var tpid, tag, rates_tag, timings_tag string - if err := rows.Scan(&id, &tpid, &tag, &rates_tag, &timings_tag, &weight); err != nil { - return nil, err - } - rt := &RateTiming{ - Tag: tag, - RatesTag: rates_tag, - Weight: weight, - TimingsTag: timings_tag, - } - rts = append(rts, rt) - } - return rts, rows.Err() -} - -func (mys *MySQLStorage) GetTpRatingProfiles(tpid string) (map[string]*RatingProfile, error) { - rpfs := make(map[string]*RatingProfile) - rows, err := mys.Db.Query("SELECT * FROM tp_rate_profiles WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var tpid, tenant, tor, direction, subject, fallbacksubject, rates_timing_tag, activation_time string - - if err := rows.Scan(&id, &tpid, &tenant, &tor, &direction, &subject, &fallbacksubject, &rates_timing_tag, &activation_time); err != nil { - return nil, err - } - key := fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, subject) - rp, ok := rpfs[key] - if !ok { - rp = &RatingProfile{Id: key} - rpfs[key] = rp - } - rp.tor = tor - rp.direction = direction - rp.subject = subject - rp.fallbackSubject = fallbacksubject - rp.ratesTimingTag = rates_timing_tag - rp.activationTime = activation_time - } - return rpfs, rows.Err() -} -func (mys *MySQLStorage) GetTpActions(tpid string) (map[string][]*Action, error) { - as := make(map[string][]*Action) - rows, err := mys.Db.Query("SELECT * FROM tp_actions WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var units, rate, minutes_weight, weight float64 - var tpid, tag, action, balance_tag, direction, destinations_tag, rate_type string - if err := rows.Scan(&id, &tpid, &tag, &action, &balance_tag, &direction, &units, &destinations_tag, &rate_type, &rate, &minutes_weight, &weight); err != nil { - return nil, err - } - var a *Action - if balance_tag != MINUTES { - a = &Action{ - ActionType: action, - BalanceId: balance_tag, - Direction: direction, - Units: units, - } - } else { - var percent, price float64 - if rate_type == PERCENT { - percent = rate - } - if rate_type == ABSOLUTE { - price = rate - } - a = &Action{ - Id: utils.GenUUID(), - ActionType: action, - BalanceId: balance_tag, - Direction: direction, - Weight: weight, - MinuteBucket: &MinuteBucket{ - Seconds: units, - Weight: minutes_weight, - Price: price, - Percent: percent, - DestinationId: destinations_tag, - }, - } - } - as[tag] = append(as[tag], a) - } - return as, rows.Err() -} - -func (mys *MySQLStorage) GetTpActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) { - ats = make(map[string][]*ActionTiming) - rows, err := mys.Db.Query("SELECT * FROM tp_action_timings WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var weight float64 - var tpid, tag, actions_tag, timings_tag string - if err := rows.Scan(&id, &tpid, &tag, &actions_tag, &timings_tag, &weight); err != nil { - return nil, err - } - - at := &ActionTiming{ - Id: utils.GenUUID(), - Tag: timings_tag, - Weight: weight, - ActionsId: actions_tag, - } - ats[tag] = append(ats[tag], at) - } - return ats, rows.Err() -} - -func (mys *MySQLStorage) GetTpActionTriggers(tpid string) (map[string][]*ActionTrigger, error) { - ats := make(map[string][]*ActionTrigger) - rows, err := mys.Db.Query("SELECT * FROM tp_action_triggers WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var threshold, weight float64 - var tpid, tag, balances_tag, direction, destinations_tag, actions_tag string - if err := rows.Scan(&id, &tpid, &tag, &balances_tag, &direction, &threshold, &destinations_tag, &actions_tag, &weight); err != nil { - return nil, err - } - - at := &ActionTrigger{ - Id: utils.GenUUID(), - BalanceId: balances_tag, - Direction: direction, - ThresholdValue: threshold, - DestinationId: destinations_tag, - ActionsId: actions_tag, - Weight: weight, - } - ats[tag] = append(ats[tag], at) - } - return ats, rows.Err() -} - -func (mys *MySQLStorage) GetTpAccountActions(tpid string) ([]*AccountAction, error) { - var acs []*AccountAction - rows, err := mys.Db.Query("SELECT * FROM tp_account_actions WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var tpid, tenant, account, direction, action_timings_tag, action_triggers_tag string - if err := rows.Scan(&id, &tpid, &tenant, &account, &direction, &action_timings_tag, &action_triggers_tag); err != nil { - return nil, err - } - - aa := &AccountAction{ - Tenant: tenant, - Account: account, - Direction: direction, - ActionTimingsTag: action_timings_tag, - ActionTriggersTag: action_triggers_tag, - } - acs = append(acs, aa) - } - return acs, rows.Err() + return &MySQLStorage{&SQLStorage{db}}, nil } diff --git a/rater/storage_postgres.go b/rater/storage_postgres.go index 9f1b01059..f495cf43d 100644 --- a/rater/storage_postgres.go +++ b/rater/storage_postgres.go @@ -20,14 +20,12 @@ package rater import ( "database/sql" - "encoding/json" "fmt" _ "github.com/bmizerany/pq" - "github.com/cgrates/cgrates/utils" ) type PostgresStorage struct { - Db *sql.DB + *SQLStorage } func NewPostgresStorage(host, port, name, user, password string) (DataStorage, error) { @@ -35,400 +33,5 @@ func NewPostgresStorage(host, port, name, user, password string) (DataStorage, e if err != nil { return nil, err } - return &PostgresStorage{db}, nil -} - -func (psl *PostgresStorage) Close() {} - -func (psl *PostgresStorage) Flush() (err error) { - return -} - -func (psl *PostgresStorage) GetRatingProfile(string) (rp *RatingProfile, err error) { - /*row := psl.Db.QueryRow(fmt.Sprintf("SELECT * FROM ratingprofiles WHERE id='%s'", id)) - err = row.Scan(&rp, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson) - err = json.Unmarshal([]byte(timespansJson), cc.Timespans)*/ - return -} - -func (psl *PostgresStorage) SetRatingProfile(rp *RatingProfile) (err error) { - return -} - -func (psl *PostgresStorage) GetDestination(string) (d *Destination, err error) { - return -} - -func (psl *PostgresStorage) SetDestination(d *Destination) (err error) { - return -} - -// Extracts destinations from StorDB on specific tariffplan id -func (psl *PostgresStorage) GetTPDestination(tpid, destTag string) (*Destination, error) { - return nil, nil -} - -func (psl *PostgresStorage) GetActions(string) (as Actions, err error) { - return -} - -func (psl *PostgresStorage) SetActions(key string, as Actions) (err error) { return } - -func (psl *PostgresStorage) GetUserBalance(string) (ub *UserBalance, err error) { return } - -func (psl *PostgresStorage) SetUserBalance(ub *UserBalance) (err error) { return } - -func (psl *PostgresStorage) GetActionTimings(key string) (ats ActionTimings, err error) { return } - -func (psl *PostgresStorage) SetActionTimings(key string, ats ActionTimings) (err error) { return } - -func (psl *PostgresStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) { return } - -func (psl *PostgresStorage) LogCallCost(uuid, source string, cc *CallCost) (err error) { - if psl.Db == nil { - //timespans.Logger.Warning("Cannot write log to database.") - return - } - tss, err := json.Marshal(cc.Timespans) - if err != nil { - Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err)) - } - _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdr VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", - uuid, - source, - cc.Direction, - cc.Tenant, - cc.TOR, - cc.Subject, - cc.Account, - cc.Destination, - cc.Cost, - cc.ConnectFee, - tss)) - if err != nil { - Logger.Err(fmt.Sprintf("failed to execute insert statement: %v", err)) - } - return -} - -func (psl *PostgresStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) { - row := psl.Db.QueryRow(fmt.Sprintf("SELECT * FROM cdr WHERE uuid='%s' AND source='%s'", uuid, source)) - var uuid_found string - var timespansJson string - err = row.Scan(&uuid_found, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson) - err = json.Unmarshal([]byte(timespansJson), cc.Timespans) - return -} - -func (psl *PostgresStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) { - return -} -func (psl *PostgresStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) { - return -} -func (psl *PostgresStorage) LogError(uuid, source, errstr string) (err error) { return } - -func (psl *PostgresStorage) SetCdr(cdr utils.CDR) (err error) { - startTime, err := cdr.GetAnswerTime() - if err != nil { - return err - } - _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_primary VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", - cdr.GetCgrId(), - cdr.GetAccId(), - cdr.GetCdrHost(), - cdr.GetReqType(), - cdr.GetDirection(), - cdr.GetTenant(), - cdr.GetTOR(), - cdr.GetAccount(), - cdr.GetSubject(), - cdr.GetDestination(), - startTime, - cdr.GetDuration(), - )) - if err != nil { - Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) - } - extraFields, err := json.Marshal(cdr.GetExtraFields()) - if err != nil { - Logger.Err(fmt.Sprintf("Error marshalling cdr extra fields to json: %v", err)) - } - _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s')", - cdr.GetCgrId(), - extraFields, - )) - if err != nil { - Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) - } - - return -} - -func (psl *PostgresStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost) (err error) { - if err != nil { - return err - } - _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s', '%s', '%s')", - cdr.GetCgrId(), - cc.Cost, - "cgrcostid", - "cdrsrc", - )) - if err != nil { - Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) - } - - return -} - -func (psl *PostgresStorage) GetTpDestinations(tpid string) ([]*Destination, error) { - var dests []*Destination - rows, err := psl.Db.Query("SELECT * FROM tp_destinations WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var tpid, tag, prefix string - if err := rows.Scan(id, tpid, &tag, &prefix); err != nil { - return nil, err - } - var dest *Destination - for _, d := range dests { - if d.Id == tag { - dest = d - break - } - } - if dest == nil { - dest = &Destination{Id: tag} - dests = append(dests, dest) - } - dest.Prefixes = append(dest.Prefixes, prefix) - } - return dests, rows.Err() -} - -func (psl *PostgresStorage) GetTpRates(tpid string) (map[string][]*Rate, error) { - rts := make(map[string][]*Rate) - rows, err := psl.Db.Query("SELECT * FROM tp_rates WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var tpid, tag, destinations_tag string - var connect_fee, rate, priced_units, rate_increments float64 - if err := rows.Scan(&id, &tpid, &tag, &destinations_tag, &connect_fee, &rate, &priced_units, &rate_increments); err != nil { - return nil, err - } - - r := &Rate{ - DestinationsTag: destinations_tag, - ConnectFee: connect_fee, - Price: rate, - PricedUnits: priced_units, - RateIncrements: rate_increments, - } - - rts[tag] = append(rts[tag], r) - } - return rts, rows.Err() -} - -func (psl *PostgresStorage) GetTpTimings(tpid string) (map[string]*Timing, error) { - tms := make(map[string]*Timing) - rows, err := psl.Db.Query("SELECT * FROM tp_timings WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var tpid, tag, years, months, month_days, week_days, start_time string - if err := rows.Scan(&id, &tpid, &tag, &years, &months, &month_days, &week_days, &start_time); err != nil { - return nil, err - } - tms[tag] = NewTiming(years, months, month_days, week_days, start_time) - } - return tms, rows.Err() -} - -func (psl *PostgresStorage) GetTpRateTimings(tpid string) ([]*RateTiming, error) { - var rts []*RateTiming - rows, err := psl.Db.Query("SELECT * FROM tp_rate_timings WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var weight float64 - var tpid, tag, rates_tag, timings_tag string - if err := rows.Scan(&id, &tpid, &tag, &rates_tag, &timings_tag, &weight); err != nil { - return nil, err - } - rt := &RateTiming{ - Tag: tag, - RatesTag: rates_tag, - Weight: weight, - TimingsTag: timings_tag, - } - rts = append(rts, rt) - } - return rts, rows.Err() -} - -func (psl *PostgresStorage) GetTpRatingProfiles(tpid string) (map[string]*RatingProfile, error) { - rpfs := make(map[string]*RatingProfile) - rows, err := psl.Db.Query("SELECT * FROM tp_rate_profiles WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var tpid, tenant, tor, direction, subject, fallbacksubject, rates_timing_tag, activation_time string - - if err := rows.Scan(&id, &tpid, &tenant, &tor, &direction, &subject, &fallbacksubject, &rates_timing_tag, &activation_time); err != nil { - return nil, err - } - key := fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, subject) - rp, ok := rpfs[key] - if !ok { - rp = &RatingProfile{Id: key} - rpfs[key] = rp - } - rp.tor = tor - rp.direction = direction - rp.subject = subject - rp.fallbackSubject = fallbacksubject - rp.ratesTimingTag = rates_timing_tag - rp.activationTime = activation_time - } - return rpfs, rows.Err() -} -func (psl *PostgresStorage) GetTpActions(tpid string) (map[string][]*Action, error) { - as := make(map[string][]*Action) - rows, err := psl.Db.Query("SELECT * FROM tp_actions WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var units, rate, minutes_weight, weight float64 - var tpid, tag, action, balances_tag, direction, destinations_tag, rate_type string - if err := rows.Scan(&id, &tpid, &tag, &action, &balances_tag, &direction, &units, &destinations_tag, &rate_type, &rate, &minutes_weight, &weight); err != nil { - return nil, err - } - var a *Action - if balances_tag != MINUTES { - a = &Action{ - ActionType: action, - BalanceId: balances_tag, - Direction: direction, - Units: units, - } - } else { - var percent, price float64 - if rate_type == PERCENT { - percent = rate - } - if rate_type == ABSOLUTE { - price = rate - } - a = &Action{ - Id: utils.GenUUID(), - ActionType: action, - BalanceId: balances_tag, - Direction: direction, - Weight: weight, - MinuteBucket: &MinuteBucket{ - Seconds: units, - Weight: minutes_weight, - Price: price, - Percent: percent, - DestinationId: destinations_tag, - }, - } - } - as[tag] = append(as[tag], a) - } - return as, rows.Err() -} - -func (psl *PostgresStorage) GetTpActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) { - ats = make(map[string][]*ActionTiming) - rows, err := psl.Db.Query("SELECT * FROM tp_action_timings WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var weight float64 - var tpid, tag, actions_tag, timings_tag string - if err := rows.Scan(&id, &tpid, &tag, &actions_tag, &timings_tag, &weight); err != nil { - return nil, err - } - - at := &ActionTiming{ - Id: utils.GenUUID(), - Tag: timings_tag, - Weight: weight, - ActionsId: actions_tag, - } - ats[tag] = append(ats[tag], at) - } - return ats, rows.Err() -} - -func (psl *PostgresStorage) GetTpActionTriggers(tpid string) (map[string][]*ActionTrigger, error) { - ats := make(map[string][]*ActionTrigger) - rows, err := psl.Db.Query("SELECT * FROM tp_action_triggers WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var threshold, weight float64 - var tpid, tag, balances_tag, direction, destinations_tag, actions_tag string - if err := rows.Scan(&id, &tpid, &tag, &balances_tag, &direction, &threshold, &destinations_tag, &actions_tag, &weight); err != nil { - return nil, err - } - - at := &ActionTrigger{ - Id: utils.GenUUID(), - BalanceId: balances_tag, - Direction: direction, - ThresholdValue: threshold, - DestinationId: destinations_tag, - ActionsId: actions_tag, - Weight: weight, - } - ats[tag] = append(ats[tag], at) - } - return ats, rows.Err() -} - -func (psl *PostgresStorage) GetTpAccountActions(tpid string) ([]*AccountAction, error) { - var acs []*AccountAction - rows, err := psl.Db.Query("SELECT * FROM tp_account_actions WHERE tpid=?", tpid) - if err != nil { - return nil, err - } - for rows.Next() { - var id int - var tpid, tenant, account, direction, action_timings_tag, action_triggers_tag string - if err := rows.Scan(&id, &tpid, &tenant, &account, &direction, &action_timings_tag, &action_triggers_tag); err != nil { - return nil, err - } - - aa := &AccountAction{ - Tenant: tenant, - Account: account, - Direction: direction, - ActionTimingsTag: action_timings_tag, - ActionTriggersTag: action_triggers_tag, - } - acs = append(acs, aa) - } - return acs, rows.Err() + return &PostgresStorage{&SQLStorage{db}}, nil } diff --git a/rater/storage_redis.go b/rater/storage_redis.go index f04e6da9e..144750a52 100644 --- a/rater/storage_redis.go +++ b/rater/storage_redis.go @@ -19,8 +19,8 @@ along with this program. If not, see package rater import ( - "fmt" "errors" + "fmt" "menteslibres.net/gosexy/redis" //"log" "github.com/cgrates/cgrates/utils" @@ -103,7 +103,7 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) { } // Extracts destinations from StorDB on specific tariffplan id -func (rs *RedisStorage) GetTPDestination( tpid, destTag string ) (*Destination, error) { +func (rs *RedisStorage) GetTPDestination(tpid, destTag string) (*Destination, error) { return nil, errors.New(utils.ERR_NOT_IMPLEMENTED) } @@ -232,31 +232,35 @@ func (rs *RedisStorage) SetRatedCdr(utils.CDR, *CallCost) error { return nil } -func (rs *RedisStorage) GetTpDestinations(tpid string) ([]*Destination, error) { +func (rs *RedisStorage) GetAllRatedCdr() ([]utils.CDR, error) { return nil, nil } -func (rs *RedisStorage) GetTpRates(string) (map[string][]*Rate, error) { +func (rs *RedisStorage) GetTpDestinations(tpid, tag string) ([]*Destination, error) { return nil, nil } -func (rs *RedisStorage) GetTpTimings(string) (map[string]*Timing, error) { + +func (rs *RedisStorage) GetTpRates(tpid, tag string) (map[string][]*Rate, error) { return nil, nil } -func (rs *RedisStorage) GetTpRateTimings(string) ([]*RateTiming, error) { +func (rs *RedisStorage) GetTpTimings(tpid, tag string) (map[string]*Timing, error) { return nil, nil } -func (rs *RedisStorage) GetTpRatingProfiles(string) (map[string]*RatingProfile, error) { +func (rs *RedisStorage) GetTpRateTimings(tpid, tag string) ([]*RateTiming, error) { return nil, nil } -func (rs *RedisStorage) GetTpActions(string) (map[string][]*Action, error) { +func (rs *RedisStorage) GetTpRatingProfiles(tpid, tag string) (map[string]*RatingProfile, error) { return nil, nil } -func (rs *RedisStorage) GetTpActionTimings(string) (map[string][]*ActionTiming, error) { +func (rs *RedisStorage) GetTpActions(tpid, tag string) (map[string][]*Action, error) { return nil, nil } -func (rs *RedisStorage) GetTpActionTriggers(string) (map[string][]*ActionTrigger, error) { +func (rs *RedisStorage) GetTpActionTimings(tpid, tag string) (map[string][]*ActionTiming, error) { return nil, nil } -func (rs *RedisStorage) GetTpAccountActions(string) ([]*AccountAction, error) { +func (rs *RedisStorage) GetTpActionTriggers(tpid, tag string) (map[string][]*ActionTrigger, error) { + return nil, nil +} +func (rs *RedisStorage) GetTpAccountActions(tpid, tag string) ([]*AccountAction, error) { return nil, nil } diff --git a/utils/cdr.go b/utils/cdr.go index b8dbff990..fe75b53d0 100644 --- a/utils/cdr.go +++ b/utils/cdr.go @@ -39,3 +39,51 @@ type CDR interface { GetFallbackSubj() string GetExtraFields() map[string]string //Stores extra CDR Fields } + +type GenericCdr map[string]string + +func (gcdr GenericCdr) GetCgrId() string { + return "" +} +func (gcdr GenericCdr) GetAccId() string { + return "" +} +func (gcdr GenericCdr) GetCdrHost() string { + return "" +} +func (gcdr GenericCdr) GetDirection() string { + return "" +} +func (gcdr GenericCdr) GetOrigId() string { + return "" +} +func (gcdr GenericCdr) GetSubject() string { + return "" +} +func (gcdr GenericCdr) GetAccount() string { + return "" +} +func (gcdr GenericCdr) GetDestination() string { + return "" +} +func (gcdr GenericCdr) GetTOR() string { + return "" +} +func (gcdr GenericCdr) GetTenant() string { + return "" +} +func (gcdr GenericCdr) GetReqType() string { + return "" +} +func (gcdr GenericCdr) GetAnswerTime() (time.Time, error) { + return time.Now(), nil +} +func (gcdr GenericCdr) GetDuration() int64 { + return 0.0 +} +func (gcdr GenericCdr) GetFallbackSubj() string { + return "" +} +func (gcdr GenericCdr) GetExtraFields() map[string]string { + return nil +}