gosdk/metric/service.go

165 lines
3.1 KiB
Go

package metric
import (
"fmt"
influxdb "github.com/influxdata/influxdb1-client"
"net/url"
"os"
"sort"
"strings"
"time"
)
type metric struct {
Type string `json:"type"`
Timestamp int64 `json:"timestamp"`
Metric string `json:"metric"`
Count int64 `json:"count"`
Value int64 `json:"value"`
Max int64 `json:"max"`
Min int64 `json:"min"`
Tags map[string]string `json:"tags"`
}
type service struct {
hostname string
serviceName string
hostIp string
podIp string
metrics chan *metric
megers map[string]*metric
config *Config
client *influxdb.Client
}
func NewService(c *Config) *service {
hostname, _ := os.Hostname()
infUrl, _ := url.Parse(c.Address)
infCfg := influxdb.NewConfig()
infCfg.Username = c.Username
infCfg.Password = c.Password
infCfg.URL = *infUrl
client, _ := influxdb.NewClient(infCfg)
return &service{
metrics: make(chan *metric, 100000),
hostname: hostname,
serviceName: os.Getenv("SERVICE_NAME"),
podIp: os.Getenv("POD_IP"),
hostIp: os.Getenv("HOST_IP"),
config: c,
client: client,
}
}
func (s *service) run() {
timer := time.NewTicker(time.Duration(s.config.Interval) * time.Second)
for {
select {
case m := <-s.metrics:
s.process(m)
case <-timer.C:
s.report()
}
}
}
func (s *service) sortMap(tag map[string]string) string {
arr := make([]string, 0)
for k, v := range tag {
arr = append(arr, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(arr)
return strings.Join(arr, ":")
}
func (s *service) process(m *metric) {
if s.megers == nil {
s.megers = make(map[string]*metric)
}
key := m.Metric
if m.Tags != nil {
key += "_" + s.sortMap(m.Tags)
}
if v, ok := s.megers[key]; ok {
v.Value += m.Value
v.Count += m.Count
v.Timestamp = m.Timestamp
if m.Value > v.Max {
v.Max = m.Value
}
if m.Value < v.Min {
v.Value = m.Min
}
return
}
m.Max = m.Value
m.Min = m.Value
s.megers[key] = m
}
func (s *service) defaultTags() map[string]string {
return map[string]string{
"hostname": s.hostname,
"host_ip": s.hostIp,
"pod_ip": s.podIp,
"service_name": s.serviceName,
}
}
func (s *service) report() {
if s.megers == nil {
return
}
var bp influxdb.BatchPoints
for _, v := range s.megers {
var p influxdb.Point
p.Measurement = v.Metric
p.Tags = v.Tags
p.Fields = s.getField(v)
p.Time = time.Unix(v.Timestamp, 0)
bp.Points = append(bp.Points, p)
}
bp.Database = s.config.Database
bp.Tags = s.defaultTags()
s.client.Write(bp)
s.megers = nil
}
func (s *service) add(m *metric) {
select {
case s.metrics <- m:
default:
fmt.Println("chan is full")
}
}
func (s *service) getField(m *metric) map[string]interface{} {
if m.Type == TypeCount {
return map[string]interface{}{
"count": m.Value,
}
}
if m.Type == TypeDuration {
avgMs := int64(float64(m.Value) / float64(m.Count))
return map[string]interface{}{
"avg": avgMs,
"count": m.Count,
"max": m.Max,
"min": m.Min,
}
}
return nil
}