From b9843605484d239c3ebb4ac566f93a53f2a84624 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 11 Jun 2015 19:15:04 +0200 Subject: [PATCH 1/4] Adding subscribe_park configuration in sm-freeswitch --- cdrc/cdrc_test.go | 41 ++++++++++++++++-------------- config/config_defaults.go | 1 + config/config_json_test.go | 1 + config/libconfig_json.go | 1 + config/smconfig.go | 4 +++ config/smconfig_test.go | 8 +++--- data/conf/cgrates/cgrates.json | 1 + sessionmanager/fssessionmanager.go | 17 ++++++++----- 8 files changed, 45 insertions(+), 29 deletions(-) diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go index 3df10de4f..fbf467716 100644 --- a/cdrc/cdrc_test.go +++ b/cdrc/cdrc_test.go @@ -32,6 +32,8 @@ func TestRecordForkCdr(t *testing.T) { cgrConfig, _ := config.NewDefaultCGRConfig() cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT] cdrcConfig.CdrFields = append(cdrcConfig.CdrFields, &config.CfgCdrField{Tag: "SupplierTest", Type: utils.CDRFIELD, CdrFieldId: "supplier", Value: []*utils.RSRField{&utils.RSRField{Id: "14"}}}) + cdrcConfig.CdrFields = append(cdrcConfig.CdrFields, &config.CfgCdrField{Tag: "DisconnectCauseTest", Type: utils.CDRFIELD, CdrFieldId: utils.DISCONNECT_CAUSE, + Value: []*utils.RSRField{&utils.RSRField{Id: "16"}}}) cdrc := &Cdrc{CdrFormat: CSV, cdrSourceIds: []string{"TEST_CDRC"}, cdrFields: [][]*config.CfgCdrField{cdrcConfig.CdrFields}} cdrRow := []string{"firstField", "secondField"} _, err := cdrc.recordToStoredCdr(cdrRow, 0) @@ -39,30 +41,31 @@ func TestRecordForkCdr(t *testing.T) { t.Error("Failed to corectly detect missing fields from record") } cdrRow = []string{"ignored", "ignored", utils.VOICE, "acc1", utils.META_PREPAID, "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963", - "2013-02-03 19:50:00", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1"} + "2013-02-03 19:50:00", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1", "NORMAL_DISCONNECT"} rtCdr, err := cdrc.recordToStoredCdr(cdrRow, 0) if err != nil { t.Error("Failed to parse CDR in rated cdr", err) } expectedCdr := &engine.StoredCdr{ - CgrId: utils.Sha1(cdrRow[3], time.Date(2013, 2, 3, 19, 50, 0, 0, time.UTC).String()), - TOR: cdrRow[2], - AccId: cdrRow[3], - CdrHost: "0.0.0.0", // Got it over internal interface - CdrSource: "TEST_CDRC", - ReqType: cdrRow[4], - Direction: cdrRow[5], - Tenant: cdrRow[6], - Category: cdrRow[7], - Account: cdrRow[8], - Subject: cdrRow[9], - Destination: cdrRow[10], - SetupTime: time.Date(2013, 2, 3, 19, 50, 0, 0, time.UTC), - AnswerTime: time.Date(2013, 2, 3, 19, 54, 0, 0, time.UTC), - Usage: time.Duration(62) * time.Second, - Supplier: "supplier1", - ExtraFields: map[string]string{}, - Cost: -1, + CgrId: utils.Sha1(cdrRow[3], time.Date(2013, 2, 3, 19, 50, 0, 0, time.UTC).String()), + TOR: cdrRow[2], + AccId: cdrRow[3], + CdrHost: "0.0.0.0", // Got it over internal interface + CdrSource: "TEST_CDRC", + ReqType: cdrRow[4], + Direction: cdrRow[5], + Tenant: cdrRow[6], + Category: cdrRow[7], + Account: cdrRow[8], + Subject: cdrRow[9], + Destination: cdrRow[10], + SetupTime: time.Date(2013, 2, 3, 19, 50, 0, 0, time.UTC), + AnswerTime: time.Date(2013, 2, 3, 19, 54, 0, 0, time.UTC), + Usage: time.Duration(62) * time.Second, + Supplier: "supplier1", + DisconnectCause: "NORMAL_DISCONNECT", + ExtraFields: map[string]string{}, + Cost: -1, } if !reflect.DeepEqual(expectedCdr, rtCdr) { t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) diff --git a/config/config_defaults.go b/config/config_defaults.go index a7092ced9..0162026b8 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -191,6 +191,7 @@ const CGRATES_CFG_JSON = ` "low_balance_ann_file": "", // file to be played when low balance is reached for prepaid calls "empty_balance_context": "", // if defined, prepaid calls will be transfered to this context on empty balance "empty_balance_ann_file": "", // file to be played before disconnecting prepaid calls on empty balance (applies only if no context defined) + "subscribe_park": true, // subscribe via fsock to receive park events "connections":[ // instantiate connections to multiple FreeSWITCH servers {"server": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5} ], diff --git a/config/config_json_test.go b/config/config_json_test.go index 5f8fc7252..a16e69e4e 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -319,6 +319,7 @@ func TestSmFsJsonCfg(t *testing.T) { Low_balance_ann_file: utils.StringPointer(""), Empty_balance_context: utils.StringPointer(""), Empty_balance_ann_file: utils.StringPointer(""), + Subscribe_park: utils.BoolPointer(true), Connections: &[]*FsConnJsonCfg{ &FsConnJsonCfg{ Server: utils.StringPointer("127.0.0.1:8021"), diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 2a4cd0094..9b00ee879 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -151,6 +151,7 @@ type SmFsJsonCfg struct { Low_balance_ann_file *string Empty_balance_context *string Empty_balance_ann_file *string + Subscribe_park *bool Connections *[]*FsConnJsonCfg } diff --git a/config/smconfig.go b/config/smconfig.go index 6da21dce4..62fc30ad9 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -69,6 +69,7 @@ type SmFsConfig struct { LowBalanceAnnFile string EmptyBalanceContext string EmptyBalanceAnnFile string + SubscribePark bool Connections []*FsConnConfig } @@ -126,6 +127,9 @@ func (self *SmFsConfig) loadFromJsonCfg(jsnCfg *SmFsJsonCfg) error { if jsnCfg.Empty_balance_ann_file != nil { self.EmptyBalanceAnnFile = *jsnCfg.Empty_balance_ann_file } + if jsnCfg.Subscribe_park != nil { + self.SubscribePark = *jsnCfg.Subscribe_park + } if jsnCfg.Connections != nil { self.Connections = make([]*FsConnConfig, len(*jsnCfg.Connections)) for idx, jsnConnCfg := range *jsnCfg.Connections { diff --git a/config/smconfig_test.go b/config/smconfig_test.go index 1f486867d..4276f9089 100644 --- a/config/smconfig_test.go +++ b/config/smconfig_test.go @@ -26,8 +26,9 @@ import ( func TesSmFsConfigLoadFromJsonCfg(t *testing.T) { smFsJsnCfg := &SmFsJsonCfg{ - Enabled: utils.BoolPointer(true), - Create_cdr: utils.BoolPointer(true), + Enabled: utils.BoolPointer(true), + Create_cdr: utils.BoolPointer(true), + Subscribe_park: utils.BoolPointer(true), Connections: &[]*FsConnJsonCfg{ &FsConnJsonCfg{ Server: utils.StringPointer("1.2.3.4:8021"), @@ -42,7 +43,8 @@ func TesSmFsConfigLoadFromJsonCfg(t *testing.T) { }, } eSmFsConfig := &SmFsConfig{Enabled: true, - CreateCdr: true, + CreateCdr: true, + SubscribePark: true, Connections: []*FsConnConfig{ &FsConnConfig{Server: "1.2.3.4:8021", Password: "ClueCon", Reconnects: 5}, &FsConnConfig{Server: "1.2.3.4:8021", Password: "ClueCon", Reconnects: 5}, diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 328d61044..8486315d2 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -171,6 +171,7 @@ // "low_balance_ann_file": "", // file to be played when low balance is reached for prepaid calls // "empty_balance_context": "", // if defined, prepaid calls will be transfered to this context on empty balance // "empty_balance_ann_file": "", // file to be played before disconnecting prepaid calls on empty balance (applies only if no context defined) +// "subscribe_park": true, // subscribe via fsock to receive park events // "connections":[ // instantiate connections to multiple FreeSWITCH servers // {"server": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5} // ], diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index c2b65a5f4..6db569dc6 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -74,11 +74,7 @@ func (sm *FSSessionManager) Connect() error { return err } -func (sm *FSSessionManager) createHandlers() (handlers map[string][]func(string, string)) { - cp := func(body, connId string) { - ev := new(FSEvent).AsEvent(body) - sm.onChannelPark(ev, connId) - } +func (sm *FSSessionManager) createHandlers() map[string][]func(string, string) { ca := func(body, connId string) { ev := new(FSEvent).AsEvent(body) sm.onChannelAnswer(ev, connId) @@ -87,11 +83,18 @@ func (sm *FSSessionManager) createHandlers() (handlers map[string][]func(string, ev := new(FSEvent).AsEvent(body) sm.onChannelHangupComplete(ev) } - return map[string][]func(string, string){ - "CHANNEL_PARK": []func(string, string){cp}, + handlers := map[string][]func(string, string){ "CHANNEL_ANSWER": []func(string, string){ca}, "CHANNEL_HANGUP_COMPLETE": []func(string, string){ch}, } + if sm.cfg.SubscribePark { + cp := func(body, connId string) { + ev := new(FSEvent).AsEvent(body) + sm.onChannelPark(ev, connId) + } + handlers["CHANNEL_PARK"] = []func(string, string){cp} + } + return handlers } // Searches and return the session with the specifed uuid From 1dfc6c5b3ec053af6429853c9253fdc9a2e327fa Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 12 Jun 2015 10:28:53 +0200 Subject: [PATCH 2/4] Fix nil pointer error when no database connection on dry_run --- engine/tp_reader.go | 70 +++++++++++++++++++++++++++++---------------- 1 file changed, 45 insertions(+), 25 deletions(-) diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 3c5d5f06c..20d04f2a2 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -158,13 +158,14 @@ func (tpr *TpReader) LoadDestinationRates() (err error) { if !destinationExists { _, destinationExists = tpr.destinations[dr.DestinationId] } - if !destinationExists { - if dbExists, err := tpr.ratingStorage.HasData(DESTINATION_PREFIX, dr.DestinationId); err != nil { + if !destinationExists && tpr.ratingStorage != nil { + if destinationExists, err = tpr.ratingStorage.HasData(DESTINATION_PREFIX, dr.DestinationId); err != nil { return err - } else if !dbExists { - return fmt.Errorf("could not get destination for tag %v", dr.DestinationId) } } + if !destinationExists { + return fmt.Errorf("could not get destination for tag %v", dr.DestinationId) + } } } return nil @@ -224,14 +225,19 @@ func (tpr *TpReader) LoadRatingPlansFiltered(tag string) (bool, error) { dms, err := TpDestinations(tpDests).GetDestinations() if err != nil { return false, err - } else if len(dms) == 0 { + } + destsExist := len(dms) != 0 + if !destsExist && tpr.ratingStorage != nil { if dbExists, err := tpr.ratingStorage.HasData(DESTINATION_PREFIX, drate.DestinationId); err != nil { return false, err - } else if !dbExists { - return false, fmt.Errorf("could not get destination for tag %v", drate.DestinationId) + } else if dbExists { + destsExist = true } continue } + if !destsExist { + return false, fmt.Errorf("could not get destination for tag %v", drate.DestinationId) + } for _, destination := range dms { tpr.ratingStorage.SetDestination(destination) } @@ -298,13 +304,14 @@ func (tpr *TpReader) LoadRatingProfilesFiltered(qriedRpf *TpRatingProfile) error return fmt.Errorf("cannot parse activation time from %v", tpRa.ActivationTime) } _, exists := tpr.ratingPlans[tpRa.RatingPlanId] - if !exists { - if dbExists, err := tpr.ratingStorage.HasData(RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil { + if !exists && tpr.ratingStorage != nil { + if exists, err = tpr.ratingStorage.HasData(RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil { return err - } else if !dbExists { - return fmt.Errorf("could not load rating plans for tag: %v", tpRa.RatingPlanId) } } + if !exists { + return fmt.Errorf("could not load rating plans for tag: %v", tpRa.RatingPlanId) + } resultRatingProfile.RatingPlanActivations = append(resultRatingProfile.RatingPlanActivations, &RatingPlanActivation{ ActivationTime: at, @@ -347,13 +354,14 @@ func (tpr *TpReader) LoadRatingProfiles() (err error) { return fmt.Errorf("cannot parse activation time from %v", tpRa.ActivationTime) } _, exists := tpr.ratingPlans[tpRa.RatingPlanId] - if !exists { - if dbExists, err := tpr.ratingStorage.HasData(RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil { + if !exists && tpr.ratingStorage != nil { // Only query if there is a connection, eg on dry run there is none + if exists, err = tpr.ratingStorage.HasData(RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil { return err - } else if !dbExists { - return fmt.Errorf("could not load rating plans for tag: %v", tpRa.RatingPlanId) } } + if !exists { + return fmt.Errorf("could not load rating plans for tag: %v", tpRa.RatingPlanId) + } rpf.RatingPlanActivations = append(rpf.RatingPlanActivations, &RatingPlanActivation{ ActivationTime: at, @@ -422,18 +430,28 @@ func (tpr *TpReader) LoadLCRs() (err error) { break } } - if !found { - if keys, err := tpr.ratingStorage.GetKeysForPrefix(RATING_PROFILE_PREFIX + ratingProfileSearchKey); err != nil || len(keys) == 0 { - return fmt.Errorf("[LCR] could not find ratingProfiles with prefix %s", ratingProfileSearchKey) + if !found && tpr.ratingStorage != nil { + if keys, err := tpr.ratingStorage.GetKeysForPrefix(RATING_PROFILE_PREFIX + ratingProfileSearchKey); err != nil { + return fmt.Errorf("[LCR] error querying ratingDb %s", err.Error()) + } else if len(keys) != 0 { + found = true } } + if !found { + return fmt.Errorf("[LCR] could not find ratingProfiles with prefix %s", ratingProfileSearchKey) + } + // check destination tags if tpLcr.DestinationTag != "" && tpLcr.DestinationTag != utils.ANY { - if _, found := tpr.destinations[tpLcr.DestinationTag]; !found { - if found, err := tpr.ratingStorage.HasData(DESTINATION_PREFIX, tpLcr.DestinationTag); err != nil || !found { - return fmt.Errorf("[LCR] could not find destination with tag %s", tpLcr.DestinationTag) + _, found := tpr.destinations[tpLcr.DestinationTag] + if !found && tpr.ratingStorage != nil { + if found, err = tpr.ratingStorage.HasData(DESTINATION_PREFIX, tpLcr.DestinationTag); err != nil { + return fmt.Errorf("[LCR] error querying ratingDb %s", err.Error()) } } + if !found { + return fmt.Errorf("[LCR] could not find destination with tag %s", tpLcr.DestinationTag) + } } tag := utils.LCRKey(tpLcr.Direction, tpLcr.Tenant, tpLcr.Category, tpLcr.Account, tpLcr.Subject) activationTime, _ := utils.ParseTimeDetectLayout(tpLcr.ActivationTime) @@ -545,12 +563,14 @@ func (tpr *TpReader) LoadActionPlans() (err error) { for _, at := range ats { _, exists := tpr.actions[at.ActionsId] - if !exists { - if dbExists, err := tpr.ratingStorage.HasData(ACTION_PREFIX, at.ActionsId); err != nil || !dbExists { - return fmt.Errorf("[ActionPlans] Could not load the action for tag: %v", - at.ActionsId) + if !exists && tpr.ratingStorage != nil { + if exists, err = tpr.ratingStorage.HasData(ACTION_PREFIX, at.ActionsId); err != nil { + return fmt.Errorf("[ActionPlans] Error querying actions: %v - %s", at.ActionsId, err.Error()) } } + if !exists { + return fmt.Errorf("[ActionPlans] Could not load the action for tag: %v", at.ActionsId) + } t, exists := tpr.timings[at.TimingId] if !exists { return fmt.Errorf("[ActionPlans] Could not load the timing for tag: %v", at.TimingId) From 276695b37d76848ff52080f22b571588878890ca Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 12 Jun 2015 10:42:32 +0200 Subject: [PATCH 3/4] Processing cgr_ignorepark channel variable for SM-FreeSWITCH --- sessionmanager/fsevent.go | 1 + sessionmanager/fssessionmanager.go | 25 +++++++++++++------------ 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/sessionmanager/fsevent.go b/sessionmanager/fsevent.go index da11912bd..4ae1e889b 100644 --- a/sessionmanager/fsevent.go +++ b/sessionmanager/fsevent.go @@ -69,6 +69,7 @@ const ( HANGUP_CAUSE = "Hangup-Cause" PDD_MEDIA_MS = "variable_progress_mediamsec" PDD_NOMEDIA_MS = "variable_progressmsec" + IGNOREPARK = "variable_cgr_ignorepark" VAR_CGR_DISCONNECT_CAUSE = "variable_" + utils.CGR_DISCONNECT_CAUSE ) diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 6db569dc6..e337ef1f9 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -191,19 +191,9 @@ func (sm *FSSessionManager) setCgrLcr(ev engine.Event, connId string) error { return nil } -// Sends the transfer command to unpark the call to freeswitch -func (sm *FSSessionManager) unparkCall(uuid, connId, call_dest_nb, notify string) { - _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)) - if err != nil { - engine.Logger.Err(fmt.Sprintf(" Could not send unpark api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) - } - if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)); err != nil { - engine.Logger.Err(fmt.Sprintf(" Could not send unpark api call to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) - } -} - func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) { - if ev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request + fsev := ev.(FSEvent) + if ev.GetReqType(utils.META_DEFAULT) == utils.META_NONE || fsev[IGNOREPARK] == "true" { // Do not process this request return } var maxCallDuration float64 // This will be the maximum duration this channel will be allowed to last @@ -229,6 +219,17 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) { sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK) } +// Sends the transfer command to unpark the call to freeswitch +func (sm *FSSessionManager) unparkCall(uuid, connId, call_dest_nb, notify string) { + _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)) + if err != nil { + engine.Logger.Err(fmt.Sprintf(" Could not send unpark api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) + } + if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)); err != nil { + engine.Logger.Err(fmt.Sprintf(" Could not send unpark api call to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) + } +} + func (sm *FSSessionManager) onChannelAnswer(ev engine.Event, connId string) { if ev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request return From ac228a6bc5776bef58e7db41c379d032076daf22 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 12 Jun 2015 13:09:06 +0300 Subject: [PATCH 4/4] added test for shared groups set/get --- data/docker/devel/start.sh | 5 ++++- engine/sharedgroup_test.go | 29 ++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/data/docker/devel/start.sh b/data/docker/devel/start.sh index 548f3bf3f..975d5831b 100755 --- a/data/docker/devel/start.sh +++ b/data/docker/devel/start.sh @@ -25,9 +25,12 @@ go install github.com/cgrates/cgrates ln -s /root/code/bin/cgr-engine /usr/bin/cgr-engine # expand freeswitch conf -cd /usr/share/cgrates/tutorials/fs_evsock/freeswitch/etc/ && tar xzf freeswitch_conf.tar.gz +cd /usr/share/cgrates/tutorials/fs_evsock/freeswitch/etc/ && tar xzf freeswitch_conf.tar.gz cd /root/cgr echo "for cgradmin run: cgr-engine -config_dir data/conf/samples/cgradmin" echo 'export GOROOT=/root/go; export GOPATH=/root/code; export PATH=$GOROOT/bin:$GOPATH/bin:$PATH'>>/root/.zshrc + +upgrade_oh_my_zsh + zsh diff --git a/engine/sharedgroup_test.go b/engine/sharedgroup_test.go index a521028de..50a231798 100644 --- a/engine/sharedgroup_test.go +++ b/engine/sharedgroup_test.go @@ -18,7 +18,34 @@ along with this program. If not, see package engine -import "testing" +import ( + "reflect" + "testing" +) + +func TestSharedSetGet(t *testing.T) { + id := "TEST_SG100" + sg := &SharedGroup{ + Id: id, + AccountParameters: map[string]*SharingParameters{ + "test": &SharingParameters{Strategy: STRATEGY_HIGHEST}, + }, + MemberIds: []string{"1", "2", "3"}, + } + err := accountingStorage.SetSharedGroup(sg) + if err != nil { + t.Error("Error storing Shared groudp: ", err) + } + received, err := accountingStorage.GetSharedGroup(id, true) + if err != nil || received == nil || !reflect.DeepEqual(sg, received) { + t.Error("Error getting shared group: ", err, received) + } + received, err = accountingStorage.GetSharedGroup(id, false) + if err != nil || received == nil || !reflect.DeepEqual(sg, received) { + t.Error("Error getting cached shared group: ", err, received) + } + +} func TestSharedPopBalanceByStrategyLow(t *testing.T) { bc := BalanceChain{