gosdk/metric/service.go

136 lines
2.5 KiB
Go

package metric
import (
"fmt"
influxdb "github.com/influxdata/influxdb1-client"
"net/url"
"os"
"sort"
"strings"
"time"
)
type metric struct {
Timestamp int64 `json:"timestamp"`
Metric string `json:"metric"`
Value float64 `json:"value"`
Tags map[string]string `json:"tags"`
}
type service struct {
hostname string
serviceName string
hostIp string
podIp string
metrics chan *metric
megers map[string]*metric
config *Config
client *influxdb.Client
}
func NewService(c *Config) *service {
c.Default()
hostname, _ := os.Hostname()
infUrl, _ := url.Parse(c.Address)
infCfg := influxdb.NewConfig()
infCfg.Username = c.Username
infCfg.Password = c.Password
infCfg.URL = *infUrl
client, _ := influxdb.NewClient(infCfg)
return &service{
metrics: make(chan *metric, 100000),
hostname: hostname,
serviceName: os.Getenv("SERVICE_NAME"),
podIp: os.Getenv("POD_IP"),
hostIp: os.Getenv("HOST_IP"),
config: c,
client: client,
}
}
func (s *service) run() {
timer := time.NewTicker(time.Duration(s.config.Interval) * time.Second)
for {
select {
case m := <-s.metrics:
s.process(m)
case <-timer.C:
s.report()
}
}
}
func (s *service) sortMap(tag map[string]string) string {
arr := make([]string, 0)
for k, v := range tag {
arr = append(arr, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(arr)
return strings.Join(arr, ":")
}
func (s *service) process(m *metric) {
if s.megers == nil {
s.megers = make(map[string]*metric)
}
key := m.Metric
if m.Tags != nil {
key += "_" + s.sortMap(m.Tags)
}
if v, ok := s.megers[key]; ok {
v.Value += m.Value
v.Timestamp = m.Timestamp
return
}
s.megers[key] = m
}
func (s *service) defaultTags() map[string]string {
return map[string]string{
"hostname": s.hostname,
"host_ip": s.hostIp,
"pod_ip": s.podIp,
"service_name": s.serviceName,
}
}
func (s *service) report() {
if s.megers == nil {
return
}
var bp influxdb.BatchPoints
for _, v := range s.megers {
var p influxdb.Point
p.Measurement = v.Metric
p.Tags = v.Tags
p.Fields = map[string]interface{}{
"value": v.Value,
}
p.Time = time.Unix(v.Timestamp, 0)
bp.Points = append(bp.Points, p)
}
bp.Database = s.config.Database
bp.Tags = s.defaultTags()
s.client.Write(bp)
fmt.Println("ok write")
s.megers = nil
}
func (s *service) add(m *metric) {
select {
case s.metrics <- m:
default:
fmt.Println("chan is full")
}
}