/* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ package engine import ( "bytes" "encoding/gob" "encoding/json" "reflect" "github.com/cgrates/cgrates/utils" "github.com/ugorji/go/codec" "gopkg.in/mgo.v2/bson" ) type Storage interface { Close() Flush(string) error GetKeysForPrefix(string) ([]string, error) RebuildReverseForPrefix(string) error GetVersions(itm string) (vrs Versions, err error) SetVersions(vrs Versions) (err error) RemoveVersions(vrs Versions) (err error) } // Interface for storage providers. type RatingStorage interface { Storage HasData(string, string) (bool, error) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) error GetRatingPlan(string, bool, string) (*RatingPlan, error) SetRatingPlan(*RatingPlan, string) error GetRatingProfile(string, bool, string) (*RatingProfile, error) SetRatingProfile(*RatingProfile, string) error RemoveRatingProfile(string, string) error GetDestination(string, bool, string) (*Destination, error) SetDestination(*Destination, string) error RemoveDestination(string, string) error SetReverseDestination(*Destination, string) error GetReverseDestination(string, bool, string) ([]string, error) UpdateReverseDestination(*Destination, *Destination, string) error GetLCR(string, bool, string) (*LCR, error) SetLCR(*LCR, string) error SetCdrStats(*CdrStats) error GetCdrStats(string) (*CdrStats, error) GetAllCdrStats() ([]*CdrStats, error) GetDerivedChargers(string, bool, string) (*utils.DerivedChargers, error) SetDerivedChargers(string, *utils.DerivedChargers, string) error GetActions(string, bool, string) (Actions, error) SetActions(string, Actions, string) error RemoveActions(string, string) error GetSharedGroup(string, bool, string) (*SharedGroup, error) SetSharedGroup(*SharedGroup, string) error GetActionTriggers(string, bool, string) (ActionTriggers, error) SetActionTriggers(string, ActionTriggers, string) error RemoveActionTriggers(string, string) error GetActionPlan(string, bool, string) (*ActionPlan, error) SetActionPlan(string, *ActionPlan, bool, string) error GetAllActionPlans() (map[string]*ActionPlan, error) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) RemAccountActionPlans(acntID string, aPlIDs []string) (err error) PushTask(*Task) error PopTask() (*Task, error) // CacheDataFromDB loads data to cache, prefix represents the cache prefix, IDs should be nil if all available data should be loaded CacheDataFromDB(prefix string, IDs []string, mustBeCached bool) error // ToDo: Move this to dataManager } type AccountingStorage interface { Storage LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs []string) error GetAccount(string) (*Account, error) SetAccount(*Account) error RemoveAccount(string) error GetCdrStatsQueue(string) (*StatsQueue, error) SetCdrStatsQueue(*StatsQueue) error GetSubscribers() (map[string]*SubscriberData, error) SetSubscriber(string, *SubscriberData) error RemoveSubscriber(string) error SetUser(*UserProfile) error GetUser(string) (*UserProfile, error) GetUsers() ([]*UserProfile, error) RemoveUser(string) error SetAlias(*Alias, string) error GetAlias(string, bool, string) (*Alias, error) RemoveAlias(string, string) error SetReverseAlias(*Alias, string) error GetReverseAlias(string, bool, string) ([]string, error) UpdateReverseAlias(*Alias, *Alias, string) error GetResourceLimit(string, bool, string) (*ResourceLimit, error) SetResourceLimit(*ResourceLimit, string) error RemoveResourceLimit(string, string) error GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error) AddLoadHistory(*utils.LoadInstance, int, string) error GetStructVersion() (*StructVersion, error) SetStructVersion(*StructVersion) error GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) // CacheDataFromDB loads data to cache, prefix represents the cache prefix, IDs should be nil if all available data should be loaded CacheDataFromDB(prefix string, IDs []string, mustBeCached bool) error } // OnlineStorage contains methods to use for administering online data type DataDB interface { Storage HasData(string, string) (bool, error) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) error GetRatingPlan(string, bool, string) (*RatingPlan, error) SetRatingPlan(*RatingPlan, string) error GetRatingProfile(string, bool, string) (*RatingProfile, error) SetRatingProfile(*RatingProfile, string) error RemoveRatingProfile(string, string) error GetDestination(string, bool, string) (*Destination, error) SetDestination(*Destination, string) error RemoveDestination(string, string) error SetReverseDestination(*Destination, string) error GetReverseDestination(string, bool, string) ([]string, error) UpdateReverseDestination(*Destination, *Destination, string) error GetLCR(string, bool, string) (*LCR, error) SetLCR(*LCR, string) error SetCdrStats(*CdrStats) error GetCdrStats(string) (*CdrStats, error) GetAllCdrStats() ([]*CdrStats, error) GetDerivedChargers(string, bool, string) (*utils.DerivedChargers, error) SetDerivedChargers(string, *utils.DerivedChargers, string) error GetActions(string, bool, string) (Actions, error) SetActions(string, Actions, string) error RemoveActions(string, string) error GetSharedGroup(string, bool, string) (*SharedGroup, error) SetSharedGroup(*SharedGroup, string) error GetActionTriggers(string, bool, string) (ActionTriggers, error) SetActionTriggers(string, ActionTriggers, string) error RemoveActionTriggers(string, string) error GetActionPlan(string, bool, string) (*ActionPlan, error) SetActionPlan(string, *ActionPlan, bool, string) error GetAllActionPlans() (map[string]*ActionPlan, error) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) RemAccountActionPlans(acntID string, apIDs []string) (err error) PushTask(*Task) error PopTask() (*Task, error) LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs []string) error GetAccount(string) (*Account, error) SetAccount(*Account) error RemoveAccount(string) error GetCdrStatsQueue(string) (*StatsQueue, error) SetCdrStatsQueue(*StatsQueue) error GetSubscribers() (map[string]*SubscriberData, error) SetSubscriber(string, *SubscriberData) error RemoveSubscriber(string) error SetUser(*UserProfile) error GetUser(string) (*UserProfile, error) GetUsers() ([]*UserProfile, error) RemoveUser(string) error SetAlias(*Alias, string) error GetAlias(string, bool, string) (*Alias, error) RemoveAlias(string, string) error SetReverseAlias(*Alias, string) error GetReverseAlias(string, bool, string) ([]string, error) UpdateReverseAlias(*Alias, *Alias, string) error GetResourceLimit(string, bool, string) (*ResourceLimit, error) SetResourceLimit(*ResourceLimit, string) error RemoveResourceLimit(string, string) error GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error) AddLoadHistory(*utils.LoadInstance, int, string) error GetStructVersion() (*StructVersion, error) SetStructVersion(*StructVersion) error GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) // CacheDataFromDB loads data to cache, prefix represents the cache prefix, IDs should be nil if all available data should be loaded CacheDataFromDB(prefix string, IDs []string, mustBeCached bool) error // ToDo: Move this to dataManager } type StorDB interface { CdrStorage LoadReader LoadWriter } type CdrStorage interface { Storage SetCDR(*CDR, bool) error SetSMCost(smc *SMCost) error GetSMCosts(cgrid, runid, originHost, originIDPrfx string) ([]*SMCost, error) GetCDRs(*utils.CDRsFilter, bool) ([]*CDR, int64, error) } type LoadStorage interface { Storage LoadReader LoadWriter } // LoadReader reads from .csv or TP tables and provides the data ready for the tp_db or data_db. type LoadReader interface { GetTpIds() ([]string, error) GetTpTableIds(string, string, utils.TPDistinctIds, map[string]string, *utils.Paginator) ([]string, error) GetTpTimings(string, string) ([]TpTiming, error) GetTPDestinations(string, string) ([]*utils.TPDestination, error) GetTpRates(string, string) ([]TpRate, error) GetTpDestinationRates(string, string, *utils.Paginator) ([]TpDestinationRate, error) GetTpRatingPlans(string, string, *utils.Paginator) ([]TpRatingPlan, error) GetTpRatingProfiles(*TpRatingProfile) ([]TpRatingProfile, error) GetTpSharedGroups(string, string) ([]TpSharedGroup, error) GetTpCdrStats(string, string) ([]TpCdrstat, error) GetTpLCRs(*TpLcrRule) ([]TpLcrRule, error) GetTpUsers(*TpUser) ([]TpUser, error) GetTpAliases(*TpAlias) ([]TpAlias, error) GetTpDerivedChargers(*TpDerivedCharger) ([]TpDerivedCharger, error) GetTpActions(string, string) ([]TpAction, error) GetTpActionPlans(string, string) ([]TpActionPlan, error) GetTpActionTriggers(string, string) ([]TpActionTrigger, error) GetTpAccountActions(*TpAccountAction) ([]TpAccountAction, error) GetTpResourceLimits(string, string) (TpResourceLimits, error) } type LoadWriter interface { RemTpData(string, string, map[string]string) error SetTpTimings([]TpTiming) error SetTPDestinations([]*utils.TPDestination) error SetTpRates([]TpRate) error SetTpDestinationRates([]TpDestinationRate) error SetTpRatingPlans([]TpRatingPlan) error SetTpRatingProfiles([]TpRatingProfile) error SetTpSharedGroups([]TpSharedGroup) error SetTpCdrStats([]TpCdrstat) error SetTpUsers([]TpUser) error SetTpAliases([]TpAlias) error SetTpDerivedChargers([]TpDerivedCharger) error SetTpLCRs([]TpLcrRule) error SetTpActions([]TpAction) error SetTpActionPlans([]TpActionPlan) error SetTpActionTriggers([]TpActionTrigger) error SetTpAccountActions([]TpAccountAction) error } type Marshaler interface { Marshal(v interface{}) ([]byte, error) Unmarshal(data []byte, v interface{}) error } type JSONMarshaler struct{} func (jm *JSONMarshaler) Marshal(v interface{}) ([]byte, error) { return json.Marshal(v) } func (jm *JSONMarshaler) Unmarshal(data []byte, v interface{}) error { return json.Unmarshal(data, v) } type BSONMarshaler struct{} func (jm *BSONMarshaler) Marshal(v interface{}) ([]byte, error) { return bson.Marshal(v) } func (jm *BSONMarshaler) Unmarshal(data []byte, v interface{}) error { return bson.Unmarshal(data, v) } type JSONBufMarshaler struct{} func (jbm *JSONBufMarshaler) Marshal(v interface{}) (data []byte, err error) { buf := new(bytes.Buffer) err = json.NewEncoder(buf).Encode(v) data = buf.Bytes() return } func (jbm *JSONBufMarshaler) Unmarshal(data []byte, v interface{}) error { return json.NewDecoder(bytes.NewBuffer(data)).Decode(v) } type CodecMsgpackMarshaler struct { mh *codec.MsgpackHandle } func NewCodecMsgpackMarshaler() *CodecMsgpackMarshaler { cmm := &CodecMsgpackMarshaler{new(codec.MsgpackHandle)} mh := cmm.mh mh.MapType = reflect.TypeOf(map[string]interface{}(nil)) mh.RawToString = true return cmm } func (cmm *CodecMsgpackMarshaler) Marshal(v interface{}) (b []byte, err error) { enc := codec.NewEncoderBytes(&b, cmm.mh) err = enc.Encode(v) return } func (cmm *CodecMsgpackMarshaler) Unmarshal(data []byte, v interface{}) error { dec := codec.NewDecoderBytes(data, cmm.mh) return dec.Decode(&v) } type BincMarshaler struct { bh *codec.BincHandle } func NewBincMarshaler() *BincMarshaler { return &BincMarshaler{new(codec.BincHandle)} } func (bm *BincMarshaler) Marshal(v interface{}) (b []byte, err error) { enc := codec.NewEncoderBytes(&b, bm.bh) err = enc.Encode(v) return } func (bm *BincMarshaler) Unmarshal(data []byte, v interface{}) error { dec := codec.NewDecoderBytes(data, bm.bh) return dec.Decode(&v) } type GOBMarshaler struct{} func (gm *GOBMarshaler) Marshal(v interface{}) (data []byte, err error) { buf := new(bytes.Buffer) err = gob.NewEncoder(buf).Encode(v) data = buf.Bytes() return } func (gm *GOBMarshaler) Unmarshal(data []byte, v interface{}) error { return gob.NewDecoder(bytes.NewBuffer(data)).Decode(v) } // Decide the value of cacheCommit parameter based on transactionID func cacheCommit(transactionID string) bool { if transactionID == utils.NonTransactional { return true } return false }