博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
03.InfluxDB系统化学习-MetaStore
阅读量:4110 次
发布时间:2019-05-25

本文共 14008 字,大约阅读时间需要 46 分钟。

概述

在《》讲述了influxdb数据启动过程中加载的服务,其中在

cmd/influxdb/run/server.go中创建NewServer对象时有以下代码

MetaClient: meta.NewClient(c.Meta)

该代码主要是完成Meta相关的初始化,本节主要是针对Meta做详细的介绍

Meta的作用

Meta中的信息存储在meta.db的数据文件中,具体配置信息参考《配置文件》章节。

meta 为InfluxDB的元数据服务,用于管理数据库的元数据相关内容;influxdb将meta数据库,包装成一个MetaClient对外提供数据,需要meta的模块都引用这个MetaCient。meta.db 直接使用protobuf格式的数据作为持久化文件meta加载持久化文件后,会将全部内容缓存在内存中。当有meta改写时, MetaClient会将更新后的数据序列化然后写入磁盘中。MetaClient一部分数据已slice的形式存储,很多api都会将该slice返回给调用方,从而脱离了其锁的保护,有数据并发竞争 访问的问题存在。meta.db中存储每个database的元数据(名称、过期策略、ContinuousQuery)和用户信息

配置文件

全部的配置文件参考《》,关于meta的配置信息如下所示:

###### [meta]### 存储有关InfluxDB集群元数据的 Raft consensus group 的控制参数将在下面被配置。### Controls the parameters for the Raft consensus group that stores metadata### about the InfluxDB cluster.###[meta]  # Where the metadata/raft database is stored  # 元数据/raft 数据库被存储的路径  即meta目录  dir = "/var/lib/influxdb/meta"  # Automatically create a default retention policy when creating a database.  # 当创建一个新的数据库时自动为其创建一个默认的rentention policy(保留策略)  # retention-autocreate = true  # If log messages are printed for the meta service  # 是否为meta服务打印日志  # logging-enabled = true

代码分析

通过《》中的说明,指导在服务启动过程中会初始化MetaClient并且打开加载相关文件,代码如下:

// NewServer returns a new instance of Server built from a config.// 依据配置Server对象和它管理的各个组件func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {    。。。。。。       。。。。。。       bind := c.BindAddress    s := &Server{        buildInfo: *buildInfo,        err: make(chan error),        closing: make(chan struct{}),        BindAddress: bind,        Logger: logger.New(os.Stderr),        MetaClient: meta.NewClient(c.Meta),        reportingDisabled: c.ReportingDisabled,        httpAPIAddr: c.HTTPD.BindAddress,        httpUseTLS: c.HTTPD.HTTPSEnabled,        tcpAddr: bind,        config: c,    }    s.Monitor = monitor.New(s, c.Monitor)    s.config.registerDiagnostics(s.Monitor)    if err := s.MetaClient.Open(); err != nil {        return nil, err    }        。。。。。。        。。。。。。        return s, nil}

MetaClient定义

定义在services/meta/client.go中,负责所有和meta data有关的操作和请求处理(核心定义是cacheData *Data):

// Client is used to execute commands on and read data from// a meta service cluster.// 负责所有和meta data有关的操作和请求处理type Client struct {    logger *zap.Logger    mu sync.RWMutex    closing chan struct{}    changed chan struct{}    cacheData *Data //主要被处理的数据(meta信息的增,删,查,改操作)    // Authentication cache.    authCache map[string]authUser    path string    retentionAutoCreate bool}type authUser struct {    bhash string    salt []byte    hash []byte}// NewClient returns a new *Client.func NewClient(config *Config) *Client {    return &Client{        cacheData: &Data{ // Data中定义了meta需要管理的信息            ClusterID: uint64(rand.Int63()),            Index: 1,        },        closing: make(chan struct{}),        changed: make(chan struct{}),        logger: zap.NewNop(),        authCache: make(map[string]authUser),        path: config.Dir,        retentionAutoCreate: config.RetentionAutoCreate,    }}

Data定义

定义在services/meta/data.go中,详细记录了meta中存储的信息:

// Data represents the top level collection of all metadata.type Data struct {    Term uint64 // associated raft term    Index uint64 // associated raft index    ClusterID uint64    Databases []DatabaseInfo    Users []UserInfo    // adminUserExists provides a constant time mechanism for determining    // if there is at least one admin user.    adminUserExists bool    MaxShardGroupID uint64    MaxShardID uint64}

DatabaseInfo

定义数据库的名称,查询方式和回收策略

// DatabaseInfo represents information about a database in the system.type DatabaseInfo struct { // 管理RetentionPolicies与ContinuousQueries    Name string    DefaultRetentionPolicy string    RetentionPolicies []RetentionPolicyInfo    ContinuousQueries []ContinuousQueryInfo}

RetentionPolicyInfo

Influxdb是按时间写入数据的,每个DB都有自己的Retention Policy,这个Retention Policy规定了每两个ShardGroup之间的时间跨度ShardGroup Duration, 即每过一个ShardGrup Duration就会生产切换到下一个新的ShardGroup;

1.封装了Retention Policy: 包括了复本个数,数据保留时长,ShardGroup切分时长和当前节点的所有ShardGroup信息

2.定义了按时间和时间范围查找相应ShardGroup的方法

// RetentionPolicyInfo represents metadata about a retention policy.type RetentionPolicyInfo struct {    Name string    ReplicaN int    Duration time.Duration    ShardGroupDuration time.Duration    ShardGroups []ShardGroupInfo    Subscriptions []SubscriptionInfo}

ShardGroupInfo记载了当前ShardGroupInfo的信息,并且根据StartTime和EndTime,就可以按时间和时间范围来查找到相应的ShardGroup;

代码如下:

// ShardGroupInfo represents metadata about a shard group. The DeletedAt field is important// because it makes it clear that a ShardGroup has been marked as deleted, and allow the system// to be sure that a ShardGroup is not simply missing. If the DeletedAt is set, the system can// safely delete any associated shards.type ShardGroupInfo struct {    ID uint64    StartTime time.Time // 这个group 的最早时间    EndTime time.Time // 这个Group里最晚的时间    DeletedAt time.Time    // 这个ShardGroup包含的所有Shard,对于同一个ShardGroup,按Series key(Point key)不同散列写到不同的Shard中;    Shards []ShardInfo    TruncatedAt time.Time}// ShardInfo represents metadata about a shard.type ShardInfo struct {    ID uint64    Owners []ShardOwner}// ShardOwner represents a node that owns a shard.type ShardOwner struct {    NodeID uint64}

ContinuousQueryInfo

// ContinuousQueryInfo represents metadata about a continuous query.type ContinuousQueryInfo struct {    Name string    Query string}

UserInfo 

// UserInfo represents metadata about a user in the system.type UserInfo struct { // 封装了用户信息:用户名,密码,对db的操作权限    // User's name.    Name string    // Hashed password.    Hash string    // Whether the user is an admin, i.e. allowed to do everything.    Admin bool    // Map of database name to granted privilege.    Privileges map[string]influxql.Privilege}

数据加载

初始化MetaClient之后调用cmd/influxd/run/server.go中的NewServer()方法如下方法完成meta问价的加载和处理:

if err := s.MetaClient.Open(); err != nil {    return nil, err}

调用services/meta/client.go中的Open()方法,核心方法是Load()和snapshot()

// Open a connection to a meta service cluster.func (c *Client) Open() error {    c.mu.Lock()    defer c.mu.Unlock()    // Try to load from disk    // meta数据是会保存到磁盘的,influxdb启动时也会从磁盘上读取:    if err := c.Load(); err != nil {        return err    }    // If this is a brand new instance, persist to disk immediatly.    if c.cacheData.Index == 1 {        if err := snapshot(c.path, c.cacheData); err != nil {            return err        }    }    return nil}

Load()

meta数据存储在磁盘meta.db文件中,influxdb在启动时会从磁盘中加载到内存中,代码参考如下:

// Load loads the current meta data from disk.func (c *Client) Load() error {    file := filepath.Join(c.path, metaFile)    f, err := os.Open(file)    if err != nil {        if os.IsNotExist(err) {            return nil        }        return err    }    defer f.Close()    data, err := ioutil.ReadAll(f)    if err != nil {        return err    }    // 利用protocol buffer作反序列化,获取文件中的数据存入缓存中    if err := c.cacheData.UnmarshalBinary(data); err != nil {// services/meta/data.go        return err    }    return nil}

services/meta/data.go

// unmarshal deserializes from a protobuf representation.func (data *Data) unmarshal(pb *internal.Data) {    data.Term = pb.GetTerm()    data.Index = pb.GetIndex()    data.ClusterID = pb.GetClusterID()    data.MaxShardGroupID = pb.GetMaxShardGroupID()    data.MaxShardID = pb.GetMaxShardID()    data.Databases = make([]DatabaseInfo, len(pb.GetDatabases()))    for i, x := range pb.GetDatabases() {        data.Databases[i].unmarshal(x)    }    data.Users = make([]UserInfo, len(pb.GetUsers()))    for i, x := range pb.GetUsers() {        data.Users[i].unmarshal(x)    }    // Exhaustively determine if there is an admin user. The marshalled cache    // value may not be correct.    data.adminUserExists = data.hasAdminUser()}

snapshot()

将meta数据写入磁盘,所有的meta信息都有对应的protocol buffer结构,依赖protocol buffer作序列化和反序列化

// snapshot saves the current meta data to disk.func snapshot(path string, data *Data) error {    filename := filepath.Join(path, metaFile)    tmpFile := filename + "tmp"    f, err := os.Create(tmpFile)    if err != nil {        return err    }    defer f.Close()    var d []byte    // 利用protocol buffer作二进制的序列化    if b, err := data.MarshalBinary(); err != nil {        return err    } else {        d = b    }    // 写入文件    if _, err := f.Write(d); err != nil {        return err    }    if err = f.Sync(); err != nil {        return err    }    //close file handle before renaming to support Windows    if err = f.Close(); err != nil {        return err    }    return file.RenameFile(tmpFile, filename)}

其他方法

Commit

influxdb运行时,所有的meta信息在内存里都缓存一分,当meta信息有改动时,通过此方法立即写入磁盘,同时更新内存里的缓存

// commit writes data to the underlying store.// This method assumes c's mutex is already locked.func (c *Client) commit(data *Data) error {    data.Index++    // try to write to disk before updating in memory    if err := snapshot(c.path, data); err != nil {        return err    }    // update in memory    c.cacheData = data    // close channels to signal changes    close(c.changed)    c.changed = make(chan struct{})    return nil}

ShardGroupsByTimeRange和ShardsByTimeRange

按给定的时间查找已有的ShardGroup和Shard,和后续需要讲解的查询息息相关

// 按给定的时间查找已有的ShardGroup和Shard// ShardGroupsByTimeRange returns a list of all shard groups on a database and policy that may contain data// for the specified time range. Shard groups are sorted by start time.func (c *Client) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []ShardGroupInfo, err error) {    c.mu.RLock()    defer c.mu.RUnlock()    // Find retention policy.    // 先找到RetentionPolicyInfo    rpi, err := c.cacheData.RetentionPolicy(database, policy)    if err != nil {        return nil, err    } else if rpi == nil {        return nil, influxdb.ErrRetentionPolicyNotFound(policy)    }    groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups))    // 遍历RPI中的所有ShardGroup    for _, g := range rpi.ShardGroups {        if g.Deleted() || !g.Overlaps(min, max) {            continue        }        groups = append(groups, g)    }    return groups, nil}// ShardsByTimeRange returns a slice of shards that may contain data in the time range.func (c *Client) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []ShardInfo, err error) {    m := make(map[*ShardInfo]struct{})    for _, mm := range sources.Measurements() {        groups, err := c.ShardGroupsByTimeRange(mm.Database, mm.RetentionPolicy, tmin, tmax)        if err != nil {            return nil, err        }        for _, g := range groups {            for i := range g.Shards {                m[&g.Shards[i]] = struct{}{}            }        }    }    a = make([]ShardInfo, 0, len(m))    for sh := range m {        a = append(a, *sh)    }    return a, nil}

PrecreateShardGroups

预先创建ShardGroup, 避免在相应时间段数据到达时才创建ShardGroup

// 预先创建ShardGroup, 避免在相应时间段数据到达时才创建ShardGroup// Influxdb定义了一个Service:Precreator Serivec(services/precreator/service.go),// 实现比较简单,周期性的调用PrecreateShardGroups,看是否需要创建ShardGroup// PrecreateShardGroups creates shard groups whose endtime is before the 'to' time passed in, but// is yet to expire before 'from'. This is to avoid the need for these shards to be created when data// for the corresponding time range arrives. Shard creation involves Raft consensus, and precreation// avoids taking the hit at write-time.func (c *Client) PrecreateShardGroups(from, to time.Time) error {    c.mu.Lock()    defer c.mu.Unlock()    data := c.cacheData.Clone()    var changed bool    // 遍历所有的DatabaseInfo信息    for _, di := range data.Databases {        for _, rp := range di.RetentionPolicies {            if len(rp.ShardGroups) == 0 {                // No data was ever written to this group, or all groups have been deleted.                continue            }            // ShardGroups中的所有ShardGroup已经是按时间排序好的,最后一个也就是最新的一个ShardGroup            g := rp.ShardGroups[len(rp.ShardGroups)-1] // Get the last group in time.            if !g.Deleted() && g.EndTime.Before(to) && g.EndTime.After(from) {                // Group is not deleted, will end before the future time, but is still yet to expire.                // This last check is important, so the system doesn't create shards groups wholly                // in the past.                // Create successive shard group.                // 计算出需要创建的ShardGroup的开始时间                nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)                // if it already exists, continue                if sg, _ := data.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); sg != nil {                    c.logger.Info("Shard group already exists",                        logger.ShardGroup(sg.ID),                        logger.Database(di.Name),                        logger.RetentionPolicy(rp.Name))                    continue                }                newGroup, err := createShardGroup(data, di.Name, rp.Name, nextShardGroupTime)                if err != nil {                    c.logger.Info("Failed to precreate successive shard group",                        zap.Uint64("group_id", g.ID), zap.Error(err))                    continue                }                changed = true                c.logger.Info("New shard group successfully precreated",                    logger.ShardGroup(newGroup.ID),                    logger.Database(di.Name),                    logger.RetentionPolicy(rp.Name))            }        }    }    if changed {        if err := c.commit(data); err != nil {            return err        }    }    return nil}

Influxdb定义了一个Service:Precreator Serivec(services/precreator/service.go),实现比较简单,周期性的调用PrecreateShardGroups,看是否需要创建ShardGroup

// runPrecreation continually checks if resources need precreation.func (s *Service) runPrecreation() {    defer s.wg.Done()    for {        select {        case <-time.After(s.checkInterval):            if err := s.precreate(time.Now().UTC()); err != nil {                s.Logger.Info("Failed to precreate shards", zap.Error(err))            }        case <-s.done:            s.Logger.Info("Terminating precreation service")            return        }    }}

Cluster版本中的Meta

 

整体架构

参考

 

 

转载地址:http://hfasi.baihongyu.com/

你可能感兴趣的文章
使用jsoup或者httpclient解析https协议通用方法
查看>>
linux之jdk安装
查看>>
Solr之Solrj封装
查看>>
Solr之IKAnalyzer集成
查看>>
sed之递归替换操作
查看>>
Mysql之修改表的编码字符集
查看>>
Mysql之Incorrect string value: '\xF0\x9F\x98\x89 \xE6...'
查看>>
linux之shell遍历目录下所有文件
查看>>
freemarker模板中long类型的逗号问题
查看>>
class.getResources("/")报空指针
查看>>
linux系统下查看nginx配置文件路径
查看>>
mysql查询某表索引
查看>>
shell读取文件内容并进行变量赋值
查看>>
shell之命令行参数传递
查看>>
Java标准库提取URL的域名
查看>>
apache common包的configuration2使用
查看>>
Java中的非对称加密算法RSA实现
查看>>
linux截取指定行数范围内的内容
查看>>
shell查找某字符在文件中出现行数
查看>>
linux下获取某文件的总行数
查看>>