From 8af3a7c2c8271810a9f807c7f0a7240baafb9eb7 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 24 Sep 2012 15:55:17 +0300 Subject: [PATCH] added source for logging operations --- docs/tutorial.rst | 5 +++-- mediator/mediator.go | 6 +++--- sessionmanager/session.go | 2 +- timespans/action_timing.go | 2 +- timespans/action_trigger.go | 2 +- timespans/storage_interface.go | 15 +++++++++------ timespans/storage_map.go | 20 ++++++++++---------- timespans/storage_mongo.go | 24 ++++++++++++++---------- timespans/storage_postgres.go | 17 ++++++++++------- timespans/storage_redis.go | 20 ++++++++++---------- 10 files changed, 62 insertions(+), 51 deletions(-) diff --git a/docs/tutorial.rst b/docs/tutorial.rst index 479c67da3..ef2de0885 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -118,9 +118,10 @@ The structure of the table (as an SQL command) is the following:: CREATE TABLE callcosts ( uuid varchar(80) primary key, - direction varchar(32), + source varchar(32), + direction varchar(32), tenant varchar(32), - tor varchar(32), + tor varchar(32), subject varchar(32), account varchar(32), destination varchar(32), diff --git a/mediator/mediator.go b/mediator/mediator.go index eb9f5264b..59d937841 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -188,7 +188,7 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) { func (m *Mediator) GetCostsFromDB(record []string) (cc *timespans.CallCost, err error) { searchedUUID := record[m.uuidIndex] - cc, err = m.loggerDb.GetCallCostLog(searchedUUID) + cc, err = m.loggerDb.GetCallCostLog(searchedUUID, timespans.SESSION_MANAGER_SOURCE) if err != nil || cc == nil { cc, err = m.GetCostsFromRater(record) } @@ -222,9 +222,9 @@ func (m *Mediator) GetCostsFromRater(record []string) (cc *timespans.CallCost, e TimeEnd: t1.Add(d)} err = m.connector.GetCost(cd, cc) if err != nil { - m.loggerDb.LogError(record[m.uuidIndex], err.Error()) + m.loggerDb.LogError(record[m.uuidIndex], timespans.MEDIATOR_SOURCE, err.Error()) } else { - m.loggerDb.LogCallCost(record[m.uuidIndex], cc) + m.loggerDb.LogCallCost(record[m.uuidIndex], timespans.MEDIATOR_SOURCE, cc) } return } diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 0551d6e3e..653f1c8bd 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -136,7 +136,7 @@ func (s *Session) SaveOperations() { firstCC.Merge(cc) } if s.sessionManager.GetDbLogger() != nil { - s.sessionManager.GetDbLogger().LogCallCost(s.uuid, firstCC) + s.sessionManager.GetDbLogger().LogCallCost(s.uuid, timespans.SESSION_MANAGER_SOURCE, firstCC) } timespans.Logger.Debug(firstCC.String()) }() diff --git a/timespans/action_timing.go b/timespans/action_timing.go index b8b829a34..1c1b0209f 100644 --- a/timespans/action_timing.go +++ b/timespans/action_timing.go @@ -226,7 +226,7 @@ func (at *ActionTiming) Execute() (err error) { }) } } - go storageLogger.LogActionTiming(at, aac) + go storageLogger.LogActionTiming(SCHED_SOURCE, at, aac) return } diff --git a/timespans/action_trigger.go b/timespans/action_trigger.go index e8a7d48f4..da59fe271 100644 --- a/timespans/action_trigger.go +++ b/timespans/action_trigger.go @@ -54,7 +54,7 @@ func (at *ActionTrigger) Execute(ub *UserBalance) (err error) { go Logger.Info(fmt.Sprintf("Executing %v: %v", ub.Id, a)) err = actionFunction(ub, a) } - go storageLogger.LogActionTrigger(ub.Id, at, aac) + go storageLogger.LogActionTrigger(ub.Id, RATER_SOURCE, at, aac) at.Executed = true storageGetter.SetUserBalance(ub) return diff --git a/timespans/storage_interface.go b/timespans/storage_interface.go index 36a6e7ce8..c32471e3e 100644 --- a/timespans/storage_interface.go +++ b/timespans/storage_interface.go @@ -36,7 +36,10 @@ const ( LOG_ACTION_TRIGGER_PREFIX = "ltr_" LOG_ERR = "ler_" // sources - + SESSION_MANAGER_SOURCE = "SMR" + MEDIATOR_SOURCE = "MED" + SCHED_SOURCE = "MED" + RATER_SOURCE = "RAT" ) /* @@ -56,11 +59,11 @@ type DataStorage interface { GetActionTimings(string) ([]*ActionTiming, error) SetActionTimings(string, []*ActionTiming) error GetAllActionTimings() (map[string][]*ActionTiming, error) - LogCallCost(uuid string, cc *CallCost) error - LogError(uuid, errstr string) error - LogActionTrigger(ubId string, at *ActionTrigger, as []*Action) error - LogActionTiming(at *ActionTiming, as []*Action) error - GetCallCostLog(uuid string) (*CallCost, error) + LogCallCost(uuid, source string, cc *CallCost) error + LogError(uuid, source, errstr string) error + LogActionTrigger(ubId, source string, at *ActionTrigger, as []*Action) error + LogActionTiming(source string, at *ActionTiming, as []*Action) error + GetCallCostLog(uuid, source string) (*CallCost, error) } type Marshaler interface { diff --git a/timespans/storage_map.go b/timespans/storage_map.go index cd8671e14..0bc50942b 100644 --- a/timespans/storage_map.go +++ b/timespans/storage_map.go @@ -137,14 +137,14 @@ func (ms *MapStorage) GetAllActionTimings() (ats map[string][]*ActionTiming, err return } -func (ms *MapStorage) LogCallCost(uuid string, cc *CallCost) error { +func (ms *MapStorage) LogCallCost(uuid, source string, cc *CallCost) error { result, err := ms.ms.Marshal(cc) - ms.dict[LOG_CALL_COST_PREFIX+uuid] = result + ms.dict[LOG_CALL_COST_PREFIX+source+"_"+uuid] = result return err } -func (ms *MapStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) { - if values, ok := ms.dict[uuid]; ok { +func (ms *MapStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) { + if values, ok := ms.dict[LOG_CALL_COST_PREFIX+source+"_"+uuid]; ok { err = ms.ms.Unmarshal(values, &cc) } else { return nil, errors.New("not found") @@ -152,7 +152,7 @@ func (ms *MapStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) { return } -func (ms *MapStorage) LogActionTrigger(ubId string, at *ActionTrigger, as []*Action) (err error) { +func (ms *MapStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as []*Action) (err error) { mat, err := ms.ms.Marshal(at) if err != nil { return @@ -161,11 +161,11 @@ func (ms *MapStorage) LogActionTrigger(ubId string, at *ActionTrigger, as []*Act if err != nil { return } - ms.dict[LOG_ACTION_TRIGGER_PREFIX+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s*%s", ubId, string(mat), string(mas))) + ms.dict[LOG_ACTION_TRIGGER_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s*%s", ubId, string(mat), string(mas))) return } -func (ms *MapStorage) LogActionTiming(at *ActionTiming, as []*Action) (err error) { +func (ms *MapStorage) LogActionTiming(source string, at *ActionTiming, as []*Action) (err error) { mat, err := ms.ms.Marshal(at) if err != nil { return @@ -174,11 +174,11 @@ func (ms *MapStorage) LogActionTiming(at *ActionTiming, as []*Action) (err error if err != nil { return } - ms.dict[LOG_ACTION_TIMMING_PREFIX+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s", string(mat), string(mas))) + ms.dict[LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s", string(mat), string(mas))) return } -func (ms *MapStorage) LogError(uuid, errstr string) (err error) { - ms.dict[LOG_ERR+uuid] = []byte(errstr) +func (ms *MapStorage) LogError(uuid, source, errstr string) (err error) { + ms.dict[LOG_ERR+source+"_"+uuid] = []byte(errstr) return nil } diff --git a/timespans/storage_mongo.go b/timespans/storage_mongo.go index 2a0116a99..df79562b5 100644 --- a/timespans/storage_mongo.go +++ b/timespans/storage_mongo.go @@ -97,12 +97,14 @@ type AtKeyValue struct { type LogCostEntry struct { Id string `bson:"_id,omitempty"` CallCost *CallCost + Source string } type LogTimingEntry struct { ActionTiming *ActionTiming Actions []*Action LogTime time.Time + Source string } type LogTriggerEntry struct { @@ -110,11 +112,13 @@ type LogTriggerEntry struct { ActionTrigger *ActionTrigger Actions []*Action LogTime time.Time + Source string } type LogErrEntry struct { Id string `bson:"_id,omitempty"` ErrStr string + Source string } func (ms *MongoStorage) GetRatingProfile(key string) (rp *RatingProfile, err error) { @@ -180,25 +184,25 @@ func (ms *MongoStorage) GetAllActionTimings() (ats map[string][]*ActionTiming, e return } -func (ms *MongoStorage) LogCallCost(uuid string, cc *CallCost) error { - return ms.db.C("cclog").Insert(&LogCostEntry{uuid, cc}) +func (ms *MongoStorage) LogCallCost(uuid, source string, cc *CallCost) error { + return ms.db.C("cclog").Insert(&LogCostEntry{uuid, cc, source}) } -func (ms *MongoStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) { +func (ms *MongoStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) { result := new(LogCostEntry) - err = ms.db.C("cclog").Find(bson.M{"_id": uuid}).One(result) + err = ms.db.C("cclog").Find(bson.M{"_id": uuid, "source": source}).One(result) cc = result.CallCost return } -func (ms *MongoStorage) LogActionTrigger(ubId string, at *ActionTrigger, as []*Action) (err error) { - return ms.db.C("actlog").Insert(&LogTriggerEntry{ubId, at, as, time.Now()}) +func (ms *MongoStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as []*Action) (err error) { + return ms.db.C("actlog").Insert(&LogTriggerEntry{ubId, at, as, time.Now(), source}) } -func (ms *MongoStorage) LogActionTiming(at *ActionTiming, as []*Action) (err error) { - return ms.db.C("actlog").Insert(&LogTimingEntry{at, as, time.Now()}) +func (ms *MongoStorage) LogActionTiming(source string, at *ActionTiming, as []*Action) (err error) { + return ms.db.C("actlog").Insert(&LogTimingEntry{at, as, time.Now(), source}) } -func (ms *MongoStorage) LogError(uuid, errstr string) (err error) { - return ms.db.C("errlog").Insert(&LogErrEntry{uuid, errstr}) +func (ms *MongoStorage) LogError(uuid, source, errstr string) (err error) { + return ms.db.C("errlog").Insert(&LogErrEntry{uuid, errstr, source}) } diff --git a/timespans/storage_postgres.go b/timespans/storage_postgres.go index eb5b979e7..158340ccf 100644 --- a/timespans/storage_postgres.go +++ b/timespans/storage_postgres.go @@ -177,7 +177,7 @@ func (psl *PostgresStorage) SetActionTimings(key string, ats []*ActionTiming) (e func (psl *PostgresStorage) GetAllActionTimings() (ats map[string][]*ActionTiming, err error) { return } -func (psl *PostgresStorage) LogCallCost(uuid string, cc *CallCost) (err error) { +func (psl *PostgresStorage) LogCallCost(uuid, source string, cc *CallCost) (err error) { if psl.Db == nil { //timespans.Logger.Warning("Cannot write log to database.") return @@ -186,8 +186,9 @@ func (psl *PostgresStorage) LogCallCost(uuid string, cc *CallCost) (err error) { if err != nil { Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err)) } - _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdr VALUES ('%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", + _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdr VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", uuid, + source, cc.Direction, cc.Tenant, cc.TOR, @@ -203,8 +204,8 @@ func (psl *PostgresStorage) LogCallCost(uuid string, cc *CallCost) (err error) { return } -func (psl *PostgresStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) { - row := psl.Db.QueryRow(fmt.Sprintf("SELECT * FROM cdr WHERE uuid='%s'", uuid)) +func (psl *PostgresStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) { + row := psl.Db.QueryRow(fmt.Sprintf("SELECT * FROM cdr WHERE uuid='%s' AND source='%s'", uuid, source)) var uuid_found string var timespansJson string err = row.Scan(&uuid_found, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson) @@ -212,8 +213,10 @@ func (psl *PostgresStorage) GetCallCostLog(uuid string) (cc *CallCost, err error return } -func (psl *PostgresStorage) LogActionTrigger(ubId string, at *ActionTrigger, as []*Action) (err error) { +func (psl *PostgresStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as []*Action) (err error) { return } -func (psl *PostgresStorage) LogActionTiming(at *ActionTiming, as []*Action) (err error) { return } -func (psl *PostgresStorage) LogError(uuid, errstr string) (err error) { return } +func (psl *PostgresStorage) LogActionTiming(source string, at *ActionTiming, as []*Action) (err error) { + return +} +func (psl *PostgresStorage) LogError(uuid, source, errstr string) (err error) { return } diff --git a/timespans/storage_redis.go b/timespans/storage_redis.go index 587bc1aab..a2b065dbe 100644 --- a/timespans/storage_redis.go +++ b/timespans/storage_redis.go @@ -146,16 +146,16 @@ func (rs *RedisStorage) GetAllActionTimings() (ats map[string][]*ActionTiming, e return } -func (rs *RedisStorage) LogCallCost(uuid string, cc *CallCost) (err error) { +func (rs *RedisStorage) LogCallCost(uuid, source string, cc *CallCost) (err error) { result, err := rs.ms.Marshal(cc) if err != nil { return } - return rs.db.Set(LOG_CALL_COST_PREFIX+uuid, result).Err + return rs.db.Set(LOG_CALL_COST_PREFIX+source+"_"+uuid, result).Err } -func (rs *RedisStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) { - if values, err := rs.db.Get(uuid).Bytes(); err == nil { +func (rs *RedisStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) { + if values, err := rs.db.Get(LOG_CALL_COST_PREFIX + source + "_" + uuid).Bytes(); err == nil { err = rs.ms.Unmarshal(values, cc) } else { return nil, err @@ -163,7 +163,7 @@ func (rs *RedisStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) { return } -func (rs *RedisStorage) LogActionTrigger(ubId string, at *ActionTrigger, as []*Action) (err error) { +func (rs *RedisStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as []*Action) (err error) { mat, err := rs.ms.Marshal(at) if err != nil { return @@ -172,11 +172,11 @@ func (rs *RedisStorage) LogActionTrigger(ubId string, at *ActionTrigger, as []*A if err != nil { return } - rs.db.Set(LOG_ACTION_TRIGGER_PREFIX+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%s*%s*%s", ubId, string(mat), string(mas)))) + rs.db.Set(LOG_ACTION_TRIGGER_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%s*%s*%s", ubId, string(mat), string(mas)))) return } -func (rs *RedisStorage) LogActionTiming(at *ActionTiming, as []*Action) (err error) { +func (rs *RedisStorage) LogActionTiming(source string, at *ActionTiming, as []*Action) (err error) { mat, err := rs.ms.Marshal(at) if err != nil { return @@ -185,10 +185,10 @@ func (rs *RedisStorage) LogActionTiming(at *ActionTiming, as []*Action) (err err if err != nil { return } - rs.db.Set(LOG_ACTION_TIMMING_PREFIX+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%s*%s", string(mat), string(mas)))) + rs.db.Set(LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%s*%s", string(mat), string(mas)))) return } -func (rs *RedisStorage) LogError(uuid, errstr string) (err error) { - return rs.db.Set(LOG_ERR+uuid, errstr).Err +func (rs *RedisStorage) LogError(uuid, source, errstr string) (err error) { + return rs.db.Set(LOG_ERR+source+"_"+uuid, errstr).Err }