From 4b7dfbe00ffff872fef74e25ab8298c40700b41a Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 12 Jun 2012 17:51:28 +0300 Subject: [PATCH] fallback importing --- cmd/cgr-loader/cgr-loader.go | 6 +++++- cmd/cgr-loader/helpers.go | 11 +++++++++++ cmd/cgr-loader/rates.go | 30 ++++++++++++++++++++++-------- data/RatingProfiles.csv | 8 ++++---- timespans/calldesc.go | 11 ++++++----- timespans/storage_interface.go | 1 + timespans/storage_redis.go | 4 ++++ 7 files changed, 53 insertions(+), 18 deletions(-) diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index f369260db..992886c91 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -28,6 +28,7 @@ var ( redisserver = flag.String("redisserver", "tcp:127.0.0.1:6379", "redis server address (tcp:127.0.0.1:6379)") redisdb = flag.Int("rdb", 10, "redis database number (10)") redispass = flag.String("pass", "", "redis database password") + flush = flag.Bool("flush", false, "Flush the database before importing") monthsFn = flag.String("month", "Months.csv", "Months file") monthdaysFn = flag.String("monthdays", "MonthDays.csv", "Month days file") weekdaysFn = flag.String("weekdays", "WeekDays.csv", "Week days file") @@ -51,13 +52,16 @@ func writeToDatabase() { if err != nil { log.Fatalf("Could not open database connection: %v", err) } + if *flush { + storage.Flush() + } for _, d := range destinations { storage.SetDestination(d) } for k, cds := range ratingProfiles { log.Print(k) for _, cd := range cds { - storage.SetActivationPeriodsOrFallback(cd.GetKey(), cd.ActivationPeriods, "") + storage.SetActivationPeriodsOrFallback(cd.GetKey(), cd.ActivationPeriods, cd.FallbackKey) log.Print(cd.GetKey()) } } diff --git a/cmd/cgr-loader/helpers.go b/cmd/cgr-loader/helpers.go index 2493fbce6..ad5d142b3 100644 --- a/cmd/cgr-loader/helpers.go +++ b/cmd/cgr-loader/helpers.go @@ -100,3 +100,14 @@ func (rt *RateTiming) GetInterval(r *Rate) (i *timespans.Interval) { } return } + +type CallDescriptors []*timespans.CallDescriptor + +func (cds CallDescriptors) getKey(key string) *timespans.CallDescriptor { + for _, cd := range cds { + if cd.GetKey() == key { + return cd + } + } + return nil +} diff --git a/cmd/cgr-loader/rates.go b/cmd/cgr-loader/rates.go index 75f57d1a7..b604f8880 100644 --- a/cmd/cgr-loader/rates.go +++ b/cmd/cgr-loader/rates.go @@ -34,7 +34,7 @@ var ( rates = make(map[string][]*Rate) timings = make(map[string][]*Timing) ratesTimings = make(map[string][]*RateTiming) - ratingProfiles = make(map[string][]*timespans.CallDescriptor) + ratingProfiles = make(map[string]CallDescriptors) ) func loadDataSeries() { @@ -235,13 +235,17 @@ func loadRatingProfiles() { // skip header line continue } - tenant, tor, subject, fallbacksubject := record[0], record[1], record[2], record[3] - at, err := time.Parse(time.RFC3339, record[5]) + if len(record) != 7 { + log.Printf("Malformed rating profile: %v", record) + continue + } + tenant, tor, direction, subject, fallbacksubject := record[0], record[1], record[2], record[3], record[4] + at, err := time.Parse(time.RFC3339, record[6]) if err != nil { log.Printf("Cannot parse activation time from %v", record[5]) continue } - rts, exists := ratesTimings[record[4]] + rts, exists := ratesTimings[record[5]] if !exists { log.Printf("Could not get rate timing for tag %v", record[4]) continue @@ -263,12 +267,13 @@ func loadRatingProfiles() { // Search for a CallDescriptor with the same key var cd *timespans.CallDescriptor for _, c := range ratingProfiles[p] { - if c.GetKey() == fmt.Sprintf("%s:%s:%s", tenant, subject, p) { + if c.GetKey() == r.DestinationsTag { cd = c } } if cd == nil { cd = ×pans.CallDescriptor{ + Direction: direction, Tenant: tenant, TOR: tor, Subject: subject, @@ -276,10 +281,19 @@ func loadRatingProfiles() { } ratingProfiles[p] = append(ratingProfiles[p], cd) } - if fallbacksubject != "" { - // construct a new cd!!!! - } cd.ActivationPeriods = append(cd.ActivationPeriods, ap) + if fallbacksubject != "" && + ratingProfiles[p].getKey(fmt.Sprintf("%s:%s:%s:%s:%s", direction, tenant, tor, subject, timespans.FallbackDestination)) == nil { + cd = ×pans.CallDescriptor{ + Direction: direction, + Tenant: tenant, + TOR: tor, + Subject: subject, + Destination: timespans.FallbackDestination, + FallbackKey: fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, fallbacksubject), + } + ratingProfiles[p] = append(ratingProfiles[p], cd) + } } } } diff --git a/data/RatingProfiles.csv b/data/RatingProfiles.csv index 7668c72b0..55e042598 100644 --- a/data/RatingProfiles.csv +++ b/data/RatingProfiles.csv @@ -1,4 +1,4 @@ -Tenant,TOR,Subject,RatesFallbackSubject,RatesTimingTag,ActivationTime -CUSTOMER_1,0,rif,danb,STANDARD,2012-01-01T00:00:00Z -CUSTOMER_2,0,danb,,STANDARD,2012-01-01T00:00:00Z -CUSTOMER_1,0,danb,,PREMIUM,2012-01-01T00:00:00Z \ No newline at end of file +Tenant,TOR,Direction,Subject,RatesFallbackSubject,RatesTimingTag,ActivationTime +CUSTOMER_1,0,OUT,rif:from:tm,danb,STANDARD,2012-01-01T00:00:00Z +CUSTOMER_2,0,OUT,danb:87.139.12.167,danb,STANDARD,2012-01-01T00:00:00Z +CUSTOMER_1,0,OUT,danb,,PREMIUM,2012-01-01T00:00:00Z \ No newline at end of file diff --git a/timespans/calldesc.go b/timespans/calldesc.go index 02f4c40ae..75dc83068 100644 --- a/timespans/calldesc.go +++ b/timespans/calldesc.go @@ -28,8 +28,9 @@ import ( const ( // the minimum length for a destination prefix to be matched. - MinPrefixLength = 2 - RecursionMaxDepth = 4 + MinPrefixLength = 2 + RecursionMaxDepth = 4 + FallbackDestination = "fallback" // the string to be used to mark the fallback destination ) /* @@ -95,12 +96,12 @@ Restores the activation periods for the specified prefix from storage. */ func (cd *CallDescriptor) SearchStorageForPrefix() (destPrefix string, err error) { cd.ActivationPeriods = make([]*ActivationPeriod, 0) - base := fmt.Sprintf("%s:%s:", cd.Tenant, cd.Subject) + base := fmt.Sprintf("%s:%s:%s:%s:", cd.Destination, cd.Tenant, cd.TOR, cd.Subject) destPrefix = cd.Destination key := base + destPrefix values, err := cd.getActivationPeriodsOrFallback(key, base, destPrefix, 1) if err != nil { - key := base + "*" + key := base + FallbackDestination values, err = cd.getActivationPeriodsOrFallback(key, base, destPrefix, 1) } //load the activation preriods @@ -147,7 +148,7 @@ Constructs the key for the storage lookup. The prefixLen is limiting the length of the destination prefix. */ func (cd *CallDescriptor) GetKey() string { - return fmt.Sprintf("%s:%s:%s", cd.Tenant, cd.Subject, cd.Destination) + return fmt.Sprintf("%s:%s:%s:%s:%s", cd.Direction, cd.Tenant, cd.TOR, cd.Subject, cd.Destination) } /* diff --git a/timespans/storage_interface.go b/timespans/storage_interface.go index fb031b284..41889ba83 100644 --- a/timespans/storage_interface.go +++ b/timespans/storage_interface.go @@ -23,6 +23,7 @@ Interface for storage providers. */ type StorageGetter interface { Close() + Flush() error GetActivationPeriodsOrFallback(string) ([]*ActivationPeriod, string, error) SetActivationPeriodsOrFallback(string, []*ActivationPeriod, string) error GetDestination(string) (*Destination, error) diff --git a/timespans/storage_redis.go b/timespans/storage_redis.go index 6e62953ce..215a5c142 100644 --- a/timespans/storage_redis.go +++ b/timespans/storage_redis.go @@ -37,6 +37,10 @@ func (rs *RedisStorage) Close() { rs.db.Quit() } +func (rs *RedisStorage) Flush() error { + return rs.db.Flushdb() +} + func (rs *RedisStorage) GetActivationPeriodsOrFallback(key string) (aps []*ActivationPeriod, fallbackKey string, err error) { //rs.db.Select(rs.dbNb) elem, err := rs.db.Get(key)