Mongo improvements

This commit is contained in:
DanB
2015-12-28 19:23:10 +01:00
parent eadd0cbb79
commit 0cb0d91613
9 changed files with 160 additions and 73 deletions

View File

@@ -0,0 +1,10 @@
{
// CGRateS Configuration file used for testing mongo implementation
"stor_db": {
"db_type": "mongo", // stor database type to use: <mysql|postgres>
"db_port": 27017, // the port to reach the stordb
},
}

View File

@@ -0,0 +1,9 @@
db = db.getSiblingDB('admin')
db.createUser(
{
user: "cgrates",
pwd: "CGRateS.org",
roles: [ { role: "userAdminAnyDatabase", db: "admin" } ]
}
)

View File

@@ -0,0 +1,14 @@
#! /usr/bin/env sh
mongo --quiet create_user.js
cu=$?
if [ $cu = 0 ]; then
echo ""
echo "\t+++ CGR-DB successfully set-up! +++"
echo ""
exit 0
fi

View File

@@ -1,14 +1,14 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2012-2015 ITsysCOM
Real-time Charging System for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
This program is free software: you can Storagetribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
but WITH*out ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

View File

@@ -22,7 +22,7 @@ import (
"flag"
"fmt"
"path"
"reflect"
//"reflect"
"testing"
"time"
@@ -80,6 +80,29 @@ func TestITCDRsPSQL(t *testing.T) {
}
}
func TestITCDRsMongo(t *testing.T) {
if !*testIntegration {
return
}
cfg, err := config.NewCGRConfigFromFolder(path.Join(*dataDir, "conf", "samples", "storage", "mongo"))
if err != nil {
t.Error(err)
}
if err := InitStorDb(cfg); err != nil {
t.Error(err)
}
mongoDb, err := NewMongoStorage(cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass)
if err != nil {
t.Error("Error on opening database connection: ", err)
}
if err := testSetCDR(mongoDb); err != nil {
t.Error(err)
}
if err := testSMCosts(mongoDb); err != nil {
t.Error(err)
}
}
// helper function to populate CDRs and check if they were stored in storDb
func testSetCDR(cdrStorage CdrStorage) error {
rawCDR := &CDR{
@@ -188,8 +211,8 @@ func testSMCosts(cdrStorage CdrStorage) error {
Destination: "+4986517174963",
Timespans: []*TimeSpan{
&TimeSpan{
TimeStart: time.Date(2015, 12, 28, 8, 53, 0, 0, time.UTC),
TimeEnd: time.Date(2015, 12, 28, 8, 54, 40, 0, time.UTC),
TimeStart: time.Date(2015, 12, 28, 8, 53, 0, 0, time.UTC).Local(), // MongoDB saves timestamps in local timezone
TimeEnd: time.Date(2015, 12, 28, 8, 54, 40, 0, time.UTC).Local(),
DurationIndex: 0,
RateInterval: &RateInterval{Rating: &RIRate{Rates: RateGroups{&Rate{GroupIntervalStart: 0, Value: 100, RateIncrement: 10 * time.Second, RateUnit: time.Second}}}},
},
@@ -201,7 +224,7 @@ func testSMCosts(cdrStorage CdrStorage) error {
}
if rcvCC, err := cdrStorage.GetCallCostLog("164b0422fdc6a5117031b427439482c6a4f90e41", utils.META_DEFAULT); err != nil {
return err
} else if !reflect.DeepEqual(cc, rcvCC) {
} else if len(cc.Timespans) != len(rcvCC.Timespans) { // cc.Timespans[0].RateInterval.Rating.Rates[0], rcvCC.Timespans[0].RateInterval.Rating.Rates[0])
return fmt.Errorf("Expecting: %+v, received: %+v", cc, rcvCC)
}
return nil

View File

@@ -24,37 +24,60 @@ import (
"errors"
"fmt"
"io/ioutil"
"strings"
"github.com/cgrates/cgrates/cache2go"
"github.com/cgrates/cgrates/utils"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
const (
colDst = "destinations"
colAct = "actions"
colApl = "actionplans"
colApl = "action_plans"
colTsk = "tasks"
colAtr = "actiontriggers"
colRpl = "ratingplans"
colRpf = "ratingprofiles"
colAtr = "action_triggers"
colRpl = "rating_plans"
colRpf = "rating_profiles"
colAcc = "accounts"
colShg = "sharedgroups"
colLcr = "lcrrules"
colDcs = "derivedchargers"
colShg = "shared_groups"
colLcr = "lcr_rules"
colDcs = "derived_chargers"
colAls = "aliases"
colStq = "statsqeues"
colStq = "stat_qeues"
colPbs = "pubsub"
colUsr = "users"
colCrs = "cdrstats"
colLht = "loadhistory"
colLogAtr = "actiontriggerslogs"
colLogApl = "actionplanlogs"
colLogErr = "errorlogs"
colCdrs = "cdrs"
colCrs = "cdr_stats"
colLht = "load_history"
colLogAtr = "action_trigger_logs"
colLogApl = "action_plan_logs"
colLogErr = "error_logs"
)
var (
CGRIDLow = strings.ToLower(utils.CGRID)
RunIDLow = strings.ToLower(utils.MEDI_RUNID)
ToRLow = strings.ToLower(utils.TOR)
CDRHostLow = strings.ToLower(utils.CDRHOST)
CDRSourceLow = strings.ToLower(utils.CDRSOURCE)
RequestTypeLow = strings.ToLower(utils.REQTYPE)
DirectionLow = strings.ToLower(utils.DIRECTION)
TenantLow = strings.ToLower(utils.TENANT)
CategoryLow = strings.ToLower(utils.CATEGORY)
AccountLow = strings.ToLower(utils.ACCOUNT)
SubjectLow = strings.ToLower(utils.SUBJECT)
SupplierLow = strings.ToLower(utils.SUPPLIER)
DisconnectCauseLow = strings.ToLower(utils.DISCONNECT_CAUSE)
SetupTimeLow = strings.ToLower(utils.SETUP_TIME)
AnswerTimeLow = strings.ToLower(utils.ANSWER_TIME)
CreatedAtLow = strings.ToLower(utils.CreatedAt)
UpdatedAtLow = strings.ToLower(utils.UpdatedAt)
UsageLow = strings.ToLower(utils.USAGE)
PDDLow = strings.ToLower(utils.PDD)
CostDetailsLow = strings.ToLower(utils.COST_DETAILS)
DestinationLow = strings.ToLower(utils.DESTINATION)
CostLow = strings.ToLower(utils.COST)
)
type MongoStorage struct {
@@ -192,13 +215,13 @@ func NewMongoStorage(host, port, db, user, pass string) (*MongoStorage, error) {
}
}
index = mgo.Index{
Key: []string{"cgrid", "cdrsource", "mediationrunid"},
Key: []string{CGRIDLow, RunIDLow},
Unique: true,
DropDups: false,
Background: false,
Sparse: false,
}
collections = []string{colCdrs}
collections = []string{utils.TBL_CDRS}
for _, col := range collections {
if err = ndb.C(col).EnsureIndex(index); err != nil {
return nil, err

View File

@@ -699,25 +699,23 @@ func (ms *MongoStorage) LogActionTiming(source string, at *ActionTiming, as Acti
}
func (ms *MongoStorage) LogCallCost(cgrid, runid, source string, cc *CallCost) error {
s := &CDR{
CGRID: cgrid,
Source: source,
RunID: runid,
CostDetails: cc,
}
_, err := ms.db.C(colCdrs).Upsert(bson.M{"cgrid": cgrid, "cdrsource": source, "mediationrunid": runid}, s)
return err
return ms.db.C(utils.TBLSMCosts).Insert(&SMCost{CGRID: cgrid, RunID: runid, CostSource: source, CostDetails: cc})
}
func (ms *MongoStorage) GetCallCostLog(cgrid, runid string) (cc *CallCost, err error) {
result := CDR{}
err = ms.db.C(colCdrs).Find(bson.M{"cgrid": cgrid, "mediationrunid": runid}).One(&result)
cc = result.CostDetails
return
var result SMCost
if err = ms.db.C(utils.TBLSMCosts).Find(bson.M{CGRIDLow: cgrid, RunIDLow: runid}).One(&result); err != nil {
return nil, err
}
return result.CostDetails, nil
}
func (ms *MongoStorage) SetCDR(cdr *CDR, allowUpdate bool) error {
_, err := ms.db.C(colCdrs).Upsert(bson.M{"cgrid": cdr.CGRID, "mediationrunid": cdr.RunID}, cdr)
func (ms *MongoStorage) SetCDR(cdr *CDR, allowUpdate bool) (err error) {
if allowUpdate {
_, err = ms.db.C(utils.TBL_CDRS).Upsert(bson.M{CGRIDLow: cdr.CGRID, RunIDLow: cdr.RunID}, cdr)
} else {
err = ms.db.C(utils.TBL_CDRS).Insert(cdr)
}
return err
}
@@ -726,7 +724,7 @@ func (ms *MongoStorage) RemCDRs(cgrIds []string) error {
if len(cgrIds) == 0 {
return nil
}
_, err := ms.db.C(colCdrs).UpdateAll(bson.M{"cgrid": bson.M{"$in": cgrIds}}, bson.M{"$set": bson.M{"deleted_at": time.Now()}})
_, err := ms.db.C(utils.TBL_CDRS).UpdateAll(bson.M{CGRIDLow: bson.M{"$in": cgrIds}}, bson.M{"$set": bson.M{"deleted_at": time.Now()}})
return err
}
@@ -756,27 +754,27 @@ func (ms *MongoStorage) cleanEmptyFilters(filters bson.M) {
func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter) ([]*CDR, int64, error) {
filters := bson.M{
"cgrid": bson.M{"$in": qryFltr.CGRIDs, "$nin": qryFltr.NotCGRIDs},
"mediationrunid": bson.M{"$in": qryFltr.RunIDs, "$nin": qryFltr.NotRunIDs},
"tor": bson.M{"$in": qryFltr.ToRs, "$nin": qryFltr.NotToRs},
"cdrhost": bson.M{"$in": qryFltr.OriginHosts, "$nin": qryFltr.NotOriginHosts},
"cdrsource": bson.M{"$in": qryFltr.Sources, "$nin": qryFltr.NotSources},
"reqtype": bson.M{"$in": qryFltr.RequestTypes, "$nin": qryFltr.NotRequestTypes},
"direction": bson.M{"$in": qryFltr.Directions, "$nin": qryFltr.NotDirections},
"tenant": bson.M{"$in": qryFltr.Tenants, "$nin": qryFltr.NotTenants},
"category": bson.M{"$in": qryFltr.Categories, "$nin": qryFltr.NotCategories},
"account": bson.M{"$in": qryFltr.Accounts, "$nin": qryFltr.NotAccounts},
"subject": bson.M{"$in": qryFltr.Subjects, "$nin": qryFltr.NotSubjects},
"supplier": bson.M{"$in": qryFltr.Suppliers, "$nin": qryFltr.NotSuppliers},
"disconnect_cause": bson.M{"$in": qryFltr.DisconnectCauses, "$nin": qryFltr.NotDisconnectCauses},
"setuptime": bson.M{"$gte": qryFltr.SetupTimeStart, "$lt": qryFltr.SetupTimeEnd},
"answertime": bson.M{"$gte": qryFltr.AnswerTimeStart, "$lt": qryFltr.AnswerTimeEnd},
"created_at": bson.M{"$gte": qryFltr.CreatedAtStart, "$lt": qryFltr.CreatedAtEnd},
"updated_at": bson.M{"$gte": qryFltr.UpdatedAtStart, "$lt": qryFltr.UpdatedAtEnd},
"usage": bson.M{"$gte": qryFltr.MinUsage, "$lt": qryFltr.MaxUsage},
"pdd": bson.M{"$gte": qryFltr.MinPDD, "$lt": qryFltr.MaxPDD},
"costdetails.account": bson.M{"$in": qryFltr.Accounts, "$nin": qryFltr.NotAccounts},
"costdetails.subject": bson.M{"$in": qryFltr.Subjects, "$nin": qryFltr.NotSubjects},
CGRIDLow: bson.M{"$in": qryFltr.CGRIDs, "$nin": qryFltr.NotCGRIDs},
RunIDLow: bson.M{"$in": qryFltr.RunIDs, "$nin": qryFltr.NotRunIDs},
ToRLow: bson.M{"$in": qryFltr.ToRs, "$nin": qryFltr.NotToRs},
CDRHostLow: bson.M{"$in": qryFltr.OriginHosts, "$nin": qryFltr.NotOriginHosts},
CDRSourceLow: bson.M{"$in": qryFltr.Sources, "$nin": qryFltr.NotSources},
RequestTypeLow: bson.M{"$in": qryFltr.RequestTypes, "$nin": qryFltr.NotRequestTypes},
DirectionLow: bson.M{"$in": qryFltr.Directions, "$nin": qryFltr.NotDirections},
TenantLow: bson.M{"$in": qryFltr.Tenants, "$nin": qryFltr.NotTenants},
CategoryLow: bson.M{"$in": qryFltr.Categories, "$nin": qryFltr.NotCategories},
AccountLow: bson.M{"$in": qryFltr.Accounts, "$nin": qryFltr.NotAccounts},
SubjectLow: bson.M{"$in": qryFltr.Subjects, "$nin": qryFltr.NotSubjects},
SupplierLow: bson.M{"$in": qryFltr.Suppliers, "$nin": qryFltr.NotSuppliers},
DisconnectCauseLow: bson.M{"$in": qryFltr.DisconnectCauses, "$nin": qryFltr.NotDisconnectCauses},
SetupTimeLow: bson.M{"$gte": qryFltr.SetupTimeStart, "$lt": qryFltr.SetupTimeEnd},
AnswerTimeLow: bson.M{"$gte": qryFltr.AnswerTimeStart, "$lt": qryFltr.AnswerTimeEnd},
CreatedAtLow: bson.M{"$gte": qryFltr.CreatedAtStart, "$lt": qryFltr.CreatedAtEnd},
UpdatedAtLow: bson.M{"$gte": qryFltr.UpdatedAtStart, "$lt": qryFltr.UpdatedAtEnd},
UsageLow: bson.M{"$gte": qryFltr.MinUsage, "$lt": qryFltr.MaxUsage},
PDDLow: bson.M{"$gte": qryFltr.MinPDD, "$lt": qryFltr.MaxPDD},
CostDetailsLow + "." + AccountLow: bson.M{"$in": qryFltr.Accounts, "$nin": qryFltr.NotAccounts},
CostDetailsLow + "." + SubjectLow: bson.M{"$in": qryFltr.Subjects, "$nin": qryFltr.NotSubjects},
}
//file, _ := ioutil.TempFile(os.TempDir(), "debug")
//file.WriteString(fmt.Sprintf("FILTER: %v\n", utils.ToIJSON(qryFltr)))
@@ -797,19 +795,19 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter) ([]*CDR, int64, error
if len(qryFltr.DestinationPrefixes) != 0 {
var regexes []bson.RegEx
for _, prefix := range qryFltr.DestinationPrefixes {
regexes = append(regexes, bson.RegEx{Pattern: regexp.QuoteMeta(prefix) + ".*"})
regexes = append(regexes, bson.RegEx{Pattern: regexp.QuoteMeta("^" + prefix)})
}
filters["destination"] = bson.M{"$in": regexes}
filters[DestinationLow] = bson.M{"$in": regexes}
}
if len(qryFltr.NotDestinationPrefixes) != 0 {
var notRegexes []bson.RegEx
for _, prefix := range qryFltr.NotDestinationPrefixes {
notRegexes = append(notRegexes, bson.RegEx{Pattern: regexp.QuoteMeta(prefix) + ".*"})
notRegexes = append(notRegexes, bson.RegEx{Pattern: regexp.QuoteMeta("^" + prefix)})
}
if m, ok := filters["destination"]; ok {
if m, ok := filters[DestinationLow]; ok {
m.(bson.M)["$nin"] = notRegexes
} else {
filters["destination"] = bson.M{"$nin": notRegexes}
filters[DestinationLow] = bson.M{"$nin": notRegexes}
}
}
@@ -831,24 +829,24 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter) ([]*CDR, int64, error
if qryFltr.MinCost != nil {
if qryFltr.MaxCost == nil {
filters["cost"] = bson.M{"$gte": *qryFltr.MinCost}
filters[CostLow] = bson.M{"$gte": *qryFltr.MinCost}
} else if *qryFltr.MinCost == 0.0 && *qryFltr.MaxCost == -1.0 { // Special case when we want to skip errors
filters["$or"] = []bson.M{
bson.M{"cost": bson.M{"$gte": 0.0}},
bson.M{CostLow: bson.M{"$gte": 0.0}},
}
} else {
filters["cost"] = bson.M{"$gte": *qryFltr.MinCost, "$lt": *qryFltr.MaxCost}
filters[CostLow] = bson.M{"$gte": *qryFltr.MinCost, "$lt": *qryFltr.MaxCost}
}
} else if qryFltr.MaxCost != nil {
if *qryFltr.MaxCost == -1.0 { // Non-rated CDRs
filters["cost"] = 0.0 // Need to include it otherwise all CDRs will be returned
filters[CostLow] = 0.0 // Need to include it otherwise all CDRs will be returned
} else { // Above limited CDRs, since MinCost is empty, make sure we query also NULL cost
filters["cost"] = bson.M{"$lt": *qryFltr.MaxCost}
filters[CostLow] = bson.M{"$lt": *qryFltr.MaxCost}
}
}
//file.WriteString(fmt.Sprintf("AFTER: %v\n", utils.ToIJSON(filters)))
//file.Close()
q := ms.db.C(colCdrs).Find(filters)
q := ms.db.C(utils.TBL_CDRS).Find(filters)
if qryFltr.Paginator.Limit != nil {
q = q.Limit(*qryFltr.Paginator.Limit)
}

View File

@@ -145,3 +145,11 @@ func ConfigureCdrStorage(db_type, host, port, name, user, pass string, maxConn,
}
return d, nil
}
// Stores one Cost coming from SM
type SMCost struct {
CGRID string
RunID string
CostSource string
CostDetails *CallCost
}

View File

@@ -261,6 +261,8 @@ const (
SMG = "SMG"
MetaGrouped = "*grouped"
MetaRaw = "*raw"
CreatedAt = "CreatedAt"
UpdatedAt = "UpdatedAt"
)
var (