added source for logging operations

This commit is contained in:
Radu Ioan Fericean
2012-09-24 15:55:17 +03:00
parent 243f6d36be
commit 8af3a7c2c8
10 changed files with 62 additions and 51 deletions

View File

@@ -118,9 +118,10 @@ The structure of the table (as an SQL command) is the following::
CREATE TABLE callcosts (
uuid varchar(80) primary key,
direction varchar(32),
source varchar(32),
direction varchar(32),
tenant varchar(32),
tor varchar(32),
tor varchar(32),
subject varchar(32),
account varchar(32),
destination varchar(32),

View File

@@ -188,7 +188,7 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) {
func (m *Mediator) GetCostsFromDB(record []string) (cc *timespans.CallCost, err error) {
searchedUUID := record[m.uuidIndex]
cc, err = m.loggerDb.GetCallCostLog(searchedUUID)
cc, err = m.loggerDb.GetCallCostLog(searchedUUID, timespans.SESSION_MANAGER_SOURCE)
if err != nil || cc == nil {
cc, err = m.GetCostsFromRater(record)
}
@@ -222,9 +222,9 @@ func (m *Mediator) GetCostsFromRater(record []string) (cc *timespans.CallCost, e
TimeEnd: t1.Add(d)}
err = m.connector.GetCost(cd, cc)
if err != nil {
m.loggerDb.LogError(record[m.uuidIndex], err.Error())
m.loggerDb.LogError(record[m.uuidIndex], timespans.MEDIATOR_SOURCE, err.Error())
} else {
m.loggerDb.LogCallCost(record[m.uuidIndex], cc)
m.loggerDb.LogCallCost(record[m.uuidIndex], timespans.MEDIATOR_SOURCE, cc)
}
return
}

View File

@@ -136,7 +136,7 @@ func (s *Session) SaveOperations() {
firstCC.Merge(cc)
}
if s.sessionManager.GetDbLogger() != nil {
s.sessionManager.GetDbLogger().LogCallCost(s.uuid, firstCC)
s.sessionManager.GetDbLogger().LogCallCost(s.uuid, timespans.SESSION_MANAGER_SOURCE, firstCC)
}
timespans.Logger.Debug(firstCC.String())
}()

View File

@@ -226,7 +226,7 @@ func (at *ActionTiming) Execute() (err error) {
})
}
}
go storageLogger.LogActionTiming(at, aac)
go storageLogger.LogActionTiming(SCHED_SOURCE, at, aac)
return
}

View File

@@ -54,7 +54,7 @@ func (at *ActionTrigger) Execute(ub *UserBalance) (err error) {
go Logger.Info(fmt.Sprintf("Executing %v: %v", ub.Id, a))
err = actionFunction(ub, a)
}
go storageLogger.LogActionTrigger(ub.Id, at, aac)
go storageLogger.LogActionTrigger(ub.Id, RATER_SOURCE, at, aac)
at.Executed = true
storageGetter.SetUserBalance(ub)
return

View File

@@ -36,7 +36,10 @@ const (
LOG_ACTION_TRIGGER_PREFIX = "ltr_"
LOG_ERR = "ler_"
// sources
SESSION_MANAGER_SOURCE = "SMR"
MEDIATOR_SOURCE = "MED"
SCHED_SOURCE = "MED"
RATER_SOURCE = "RAT"
)
/*
@@ -56,11 +59,11 @@ type DataStorage interface {
GetActionTimings(string) ([]*ActionTiming, error)
SetActionTimings(string, []*ActionTiming) error
GetAllActionTimings() (map[string][]*ActionTiming, error)
LogCallCost(uuid string, cc *CallCost) error
LogError(uuid, errstr string) error
LogActionTrigger(ubId string, at *ActionTrigger, as []*Action) error
LogActionTiming(at *ActionTiming, as []*Action) error
GetCallCostLog(uuid string) (*CallCost, error)
LogCallCost(uuid, source string, cc *CallCost) error
LogError(uuid, source, errstr string) error
LogActionTrigger(ubId, source string, at *ActionTrigger, as []*Action) error
LogActionTiming(source string, at *ActionTiming, as []*Action) error
GetCallCostLog(uuid, source string) (*CallCost, error)
}
type Marshaler interface {

View File

@@ -137,14 +137,14 @@ func (ms *MapStorage) GetAllActionTimings() (ats map[string][]*ActionTiming, err
return
}
func (ms *MapStorage) LogCallCost(uuid string, cc *CallCost) error {
func (ms *MapStorage) LogCallCost(uuid, source string, cc *CallCost) error {
result, err := ms.ms.Marshal(cc)
ms.dict[LOG_CALL_COST_PREFIX+uuid] = result
ms.dict[LOG_CALL_COST_PREFIX+source+"_"+uuid] = result
return err
}
func (ms *MapStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) {
if values, ok := ms.dict[uuid]; ok {
func (ms *MapStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) {
if values, ok := ms.dict[LOG_CALL_COST_PREFIX+source+"_"+uuid]; ok {
err = ms.ms.Unmarshal(values, &cc)
} else {
return nil, errors.New("not found")
@@ -152,7 +152,7 @@ func (ms *MapStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) {
return
}
func (ms *MapStorage) LogActionTrigger(ubId string, at *ActionTrigger, as []*Action) (err error) {
func (ms *MapStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as []*Action) (err error) {
mat, err := ms.ms.Marshal(at)
if err != nil {
return
@@ -161,11 +161,11 @@ func (ms *MapStorage) LogActionTrigger(ubId string, at *ActionTrigger, as []*Act
if err != nil {
return
}
ms.dict[LOG_ACTION_TRIGGER_PREFIX+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s*%s", ubId, string(mat), string(mas)))
ms.dict[LOG_ACTION_TRIGGER_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s*%s", ubId, string(mat), string(mas)))
return
}
func (ms *MapStorage) LogActionTiming(at *ActionTiming, as []*Action) (err error) {
func (ms *MapStorage) LogActionTiming(source string, at *ActionTiming, as []*Action) (err error) {
mat, err := ms.ms.Marshal(at)
if err != nil {
return
@@ -174,11 +174,11 @@ func (ms *MapStorage) LogActionTiming(at *ActionTiming, as []*Action) (err error
if err != nil {
return
}
ms.dict[LOG_ACTION_TIMMING_PREFIX+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s", string(mat), string(mas)))
ms.dict[LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s", string(mat), string(mas)))
return
}
func (ms *MapStorage) LogError(uuid, errstr string) (err error) {
ms.dict[LOG_ERR+uuid] = []byte(errstr)
func (ms *MapStorage) LogError(uuid, source, errstr string) (err error) {
ms.dict[LOG_ERR+source+"_"+uuid] = []byte(errstr)
return nil
}

View File

@@ -97,12 +97,14 @@ type AtKeyValue struct {
type LogCostEntry struct {
Id string `bson:"_id,omitempty"`
CallCost *CallCost
Source string
}
type LogTimingEntry struct {
ActionTiming *ActionTiming
Actions []*Action
LogTime time.Time
Source string
}
type LogTriggerEntry struct {
@@ -110,11 +112,13 @@ type LogTriggerEntry struct {
ActionTrigger *ActionTrigger
Actions []*Action
LogTime time.Time
Source string
}
type LogErrEntry struct {
Id string `bson:"_id,omitempty"`
ErrStr string
Source string
}
func (ms *MongoStorage) GetRatingProfile(key string) (rp *RatingProfile, err error) {
@@ -180,25 +184,25 @@ func (ms *MongoStorage) GetAllActionTimings() (ats map[string][]*ActionTiming, e
return
}
func (ms *MongoStorage) LogCallCost(uuid string, cc *CallCost) error {
return ms.db.C("cclog").Insert(&LogCostEntry{uuid, cc})
func (ms *MongoStorage) LogCallCost(uuid, source string, cc *CallCost) error {
return ms.db.C("cclog").Insert(&LogCostEntry{uuid, cc, source})
}
func (ms *MongoStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) {
func (ms *MongoStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) {
result := new(LogCostEntry)
err = ms.db.C("cclog").Find(bson.M{"_id": uuid}).One(result)
err = ms.db.C("cclog").Find(bson.M{"_id": uuid, "source": source}).One(result)
cc = result.CallCost
return
}
func (ms *MongoStorage) LogActionTrigger(ubId string, at *ActionTrigger, as []*Action) (err error) {
return ms.db.C("actlog").Insert(&LogTriggerEntry{ubId, at, as, time.Now()})
func (ms *MongoStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as []*Action) (err error) {
return ms.db.C("actlog").Insert(&LogTriggerEntry{ubId, at, as, time.Now(), source})
}
func (ms *MongoStorage) LogActionTiming(at *ActionTiming, as []*Action) (err error) {
return ms.db.C("actlog").Insert(&LogTimingEntry{at, as, time.Now()})
func (ms *MongoStorage) LogActionTiming(source string, at *ActionTiming, as []*Action) (err error) {
return ms.db.C("actlog").Insert(&LogTimingEntry{at, as, time.Now(), source})
}
func (ms *MongoStorage) LogError(uuid, errstr string) (err error) {
return ms.db.C("errlog").Insert(&LogErrEntry{uuid, errstr})
func (ms *MongoStorage) LogError(uuid, source, errstr string) (err error) {
return ms.db.C("errlog").Insert(&LogErrEntry{uuid, errstr, source})
}

View File

@@ -177,7 +177,7 @@ func (psl *PostgresStorage) SetActionTimings(key string, ats []*ActionTiming) (e
func (psl *PostgresStorage) GetAllActionTimings() (ats map[string][]*ActionTiming, err error) { return }
func (psl *PostgresStorage) LogCallCost(uuid string, cc *CallCost) (err error) {
func (psl *PostgresStorage) LogCallCost(uuid, source string, cc *CallCost) (err error) {
if psl.Db == nil {
//timespans.Logger.Warning("Cannot write log to database.")
return
@@ -186,8 +186,9 @@ func (psl *PostgresStorage) LogCallCost(uuid string, cc *CallCost) (err error) {
if err != nil {
Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err))
}
_, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdr VALUES ('%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')",
_, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdr VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')",
uuid,
source,
cc.Direction,
cc.Tenant,
cc.TOR,
@@ -203,8 +204,8 @@ func (psl *PostgresStorage) LogCallCost(uuid string, cc *CallCost) (err error) {
return
}
func (psl *PostgresStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) {
row := psl.Db.QueryRow(fmt.Sprintf("SELECT * FROM cdr WHERE uuid='%s'", uuid))
func (psl *PostgresStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) {
row := psl.Db.QueryRow(fmt.Sprintf("SELECT * FROM cdr WHERE uuid='%s' AND source='%s'", uuid, source))
var uuid_found string
var timespansJson string
err = row.Scan(&uuid_found, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, &timespansJson)
@@ -212,8 +213,10 @@ func (psl *PostgresStorage) GetCallCostLog(uuid string) (cc *CallCost, err error
return
}
func (psl *PostgresStorage) LogActionTrigger(ubId string, at *ActionTrigger, as []*Action) (err error) {
func (psl *PostgresStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as []*Action) (err error) {
return
}
func (psl *PostgresStorage) LogActionTiming(at *ActionTiming, as []*Action) (err error) { return }
func (psl *PostgresStorage) LogError(uuid, errstr string) (err error) { return }
func (psl *PostgresStorage) LogActionTiming(source string, at *ActionTiming, as []*Action) (err error) {
return
}
func (psl *PostgresStorage) LogError(uuid, source, errstr string) (err error) { return }

View File

@@ -146,16 +146,16 @@ func (rs *RedisStorage) GetAllActionTimings() (ats map[string][]*ActionTiming, e
return
}
func (rs *RedisStorage) LogCallCost(uuid string, cc *CallCost) (err error) {
func (rs *RedisStorage) LogCallCost(uuid, source string, cc *CallCost) (err error) {
result, err := rs.ms.Marshal(cc)
if err != nil {
return
}
return rs.db.Set(LOG_CALL_COST_PREFIX+uuid, result).Err
return rs.db.Set(LOG_CALL_COST_PREFIX+source+"_"+uuid, result).Err
}
func (rs *RedisStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) {
if values, err := rs.db.Get(uuid).Bytes(); err == nil {
func (rs *RedisStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) {
if values, err := rs.db.Get(LOG_CALL_COST_PREFIX + source + "_" + uuid).Bytes(); err == nil {
err = rs.ms.Unmarshal(values, cc)
} else {
return nil, err
@@ -163,7 +163,7 @@ func (rs *RedisStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) {
return
}
func (rs *RedisStorage) LogActionTrigger(ubId string, at *ActionTrigger, as []*Action) (err error) {
func (rs *RedisStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as []*Action) (err error) {
mat, err := rs.ms.Marshal(at)
if err != nil {
return
@@ -172,11 +172,11 @@ func (rs *RedisStorage) LogActionTrigger(ubId string, at *ActionTrigger, as []*A
if err != nil {
return
}
rs.db.Set(LOG_ACTION_TRIGGER_PREFIX+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%s*%s*%s", ubId, string(mat), string(mas))))
rs.db.Set(LOG_ACTION_TRIGGER_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%s*%s*%s", ubId, string(mat), string(mas))))
return
}
func (rs *RedisStorage) LogActionTiming(at *ActionTiming, as []*Action) (err error) {
func (rs *RedisStorage) LogActionTiming(source string, at *ActionTiming, as []*Action) (err error) {
mat, err := rs.ms.Marshal(at)
if err != nil {
return
@@ -185,10 +185,10 @@ func (rs *RedisStorage) LogActionTiming(at *ActionTiming, as []*Action) (err err
if err != nil {
return
}
rs.db.Set(LOG_ACTION_TIMMING_PREFIX+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%s*%s", string(mat), string(mas))))
rs.db.Set(LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%s*%s", string(mat), string(mas))))
return
}
func (rs *RedisStorage) LogError(uuid, errstr string) (err error) {
return rs.db.Set(LOG_ERR+uuid, errstr).Err
func (rs *RedisStorage) LogError(uuid, source, errstr string) (err error) {
return rs.db.Set(LOG_ERR+source+"_"+uuid, errstr).Err
}