From bc9c6c5ce39044d76d8c47414e57fef40d675bb9 Mon Sep 17 00:00:00 2001 From: jiangyong Date: Sun, 21 Jan 2024 18:49:51 +0800 Subject: [PATCH] influxdb --- adapi/sdk.go | 2 +- adminapi/application.go | 2 +- adminapi/message.go | 2 +- go.mod | 7 ++- metric/config.go | 21 +++++---- metric/metric_test.go | 5 +- metric/service.go | 102 ++++++++++++++++++++++------------------ payapi/pay.go | 2 +- 8 files changed, 82 insertions(+), 61 deletions(-) diff --git a/adapi/sdk.go b/adapi/sdk.go index b7e5ca3..085b5d2 100644 --- a/adapi/sdk.go +++ b/adapi/sdk.go @@ -3,8 +3,8 @@ package adapi import ( "encoding/json" "fmt" - "github.com/smbrave/gosdk/util" "github.com/spf13/cast" + "gitlab.batiao8.com/open/gosdk/util" "net/url" "strconv" "strings" diff --git a/adminapi/application.go b/adminapi/application.go index 08f201d..b846712 100644 --- a/adminapi/application.go +++ b/adminapi/application.go @@ -3,7 +3,7 @@ package adminapi import ( "encoding/json" "fmt" - "github.com/smbrave/gosdk/util" + "gitlab.batiao8.com/open/gosdk/util" ) type Application struct { diff --git a/adminapi/message.go b/adminapi/message.go index 8ebba3c..76d9092 100644 --- a/adminapi/message.go +++ b/adminapi/message.go @@ -3,7 +3,7 @@ package adminapi import ( "encoding/json" "fmt" - "github.com/smbrave/gosdk/util" + "gitlab.batiao8.com/open/gosdk/util" "net/url" ) diff --git a/go.mod b/go.mod index 0faa4ce..4048e98 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,8 @@ -module github.com/smbrave/gosdk +module gitlab.batiao8.com/open/gosdk go 1.18 -require github.com/spf13/cast v1.5.0 +require ( + github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c + github.com/spf13/cast v1.5.0 +) diff --git a/metric/config.go b/metric/config.go index 46c0f92..033ab96 100644 --- a/metric/config.go +++ b/metric/config.go @@ -1,13 +1,14 @@ package metric -import "os" - var ( serv *service ) type Config struct { Address string + Username string + Password string + Database string Interval int } @@ -23,12 +24,16 @@ func Init(c *Config) error { return nil } - hostname, _ := os.Hostname() - serv = &service{ - config: c, - hostname: hostname, - metrics: make(chan *metric, 100000), - } + serv = NewService(c) go serv.run() return nil } + +func (c *Config) Default() { + if c.Database == "" { + c.Database = "telegraf" + } + if c.Interval == 0 { + c.Interval = 10 + } +} diff --git a/metric/metric_test.go b/metric/metric_test.go index 1b10128..4e9a813 100644 --- a/metric/metric_test.go +++ b/metric/metric_test.go @@ -8,10 +8,11 @@ import ( func TestName(t *testing.T) { c := NewConfg() - c.Address = "https://monitor.batiao8.com" + c.Address = "http://14.22.116.197:9305" + c.Interval = 1 Init(c) for i := 0; i < 100; i++ { - Metric("test", float64(rand.Int()%100), map[string]string{ + Metric("test.test.b", float64(rand.Int()%100), map[string]string{ "a": "b", "c": "d", }) diff --git a/metric/service.go b/metric/service.go index a76038d..2a876bd 100644 --- a/metric/service.go +++ b/metric/service.go @@ -1,13 +1,10 @@ package metric import ( - "bytes" - "encoding/json" "fmt" - "github.com/spf13/cast" - "io/ioutil" - "log" - "net/http" + influxdb "github.com/influxdata/influxdb1-client" + "net/url" + "os" "sort" "strings" "time" @@ -21,10 +18,37 @@ type metric struct { } type service struct { - hostname string - metrics chan *metric - megers map[string]*metric - config *Config + hostname string + serviceName string + hostIp string + podIp string + metrics chan *metric + megers map[string]*metric + config *Config + client *influxdb.Client +} + +func NewService(c *Config) *service { + c.Default() + hostname, _ := os.Hostname() + + infUrl, _ := url.Parse(c.Address) + infCfg := influxdb.NewConfig() + infCfg.Username = c.Username + infCfg.Password = c.Password + infCfg.URL = *infUrl + + client, _ := influxdb.NewClient(infCfg) + + return &service{ + metrics: make(chan *metric, 100000), + hostname: hostname, + serviceName: os.Getenv("SERVICE_NAME"), + podIp: os.Getenv("POD_IP"), + hostIp: os.Getenv("HOST_IP"), + config: c, + client: client, + } } func (s *service) run() { @@ -66,51 +90,39 @@ func (s *service) process(m *metric) { s.megers[key] = m } -func (s *service) defaultTags(tags map[string]string) map[string]string { - if tags == nil { - tags = map[string]string{ - "hostname": s.hostname, - } - } else { - tags["hostname"] = s.hostname +func (s *service) defaultTags() map[string]string { + + return map[string]string{ + "hostname": s.hostname, + "host_ip": s.hostIp, + "pod_ip": s.podIp, + "service_name": s.serviceName, } - return tags + } func (s *service) report() { if s.megers == nil { return } - metrics := make([]*metric, 0) + + var bp influxdb.BatchPoints for _, v := range s.megers { - v.Tags = s.defaultTags(v.Tags) - metrics = append(metrics, v) - } - reqUrl := fmt.Sprintf("%s/opentsdb/put", serv.config.Address) - - reqBody, _ := json.Marshal(metrics) - resp, err := http.Post(reqUrl, "application/json", bytes.NewBuffer(reqBody)) - if err != nil { - log.Printf("http.Post error :%s", err.Error()) - return - } - defer resp.Body.Close() - rspBody, err := ioutil.ReadAll(resp.Body) - if err != nil { - log.Printf(" ioutil.ReadAll error :%s", err.Error()) - return - } - result := make(map[string]interface{}) - if err := json.Unmarshal(rspBody, &result); err != nil { - log.Printf("json result : %s", string(rspBody)) - return + var p influxdb.Point + p.Measurement = v.Metric + p.Tags = v.Tags + p.Fields = map[string]interface{}{ + "value": v.Value, + } + p.Time = time.Unix(v.Timestamp, 0) + bp.Points = append(bp.Points, p) } - fail := cast.ToInt(result["fail"]) - if fail != 0 { - log.Printf("http result : %s", string(rspBody)) - return - } + bp.Database = s.config.Database + bp.Tags = s.defaultTags() + + s.client.Write(bp) + fmt.Println("ok write") s.megers = nil } diff --git a/payapi/pay.go b/payapi/pay.go index d90b937..0e68c01 100644 --- a/payapi/pay.go +++ b/payapi/pay.go @@ -4,7 +4,7 @@ import ( "encoding/json" "errors" "fmt" - "github.com/smbrave/gosdk/util" + "gitlab.batiao8.com/open/gosdk/util" ) var (