91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Loki 的作用是什么

發布時間:2021-06-16 09:54:18 來源:億速云 閱讀:228 作者:chen 欄目:web開發

本篇內容介紹了“Loki 的作用是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

前面我們介紹了 Loki  的一些基本使用配置,但是對 Loki 還是了解不夠深入,官方文檔寫得較為凌亂,而且沒有跟上新版本,為了能夠對 Loki  有一個更深入的認識,做到有的放矢,這里面我們嘗試對 Loki  的源碼進行一些簡單的分析,由于有很多模塊和實現細節,這里我們主要是對核心功能進行分析,希望對大家有所幫助。本文首先對日志的寫入過程進行簡單分析。

Loki 的作用是什么

Distributor Push API

Promtail 通過 Loki 的 Push API 接口推送日志數據,該接口在初始化 Distributor  的時候進行初始化,在控制器基礎上包裝了兩個中間件,其中的 HTTPAuthMiddleware 就是獲取租戶 ID,如果開啟了認證配置,則從  X-Scope-OrgID 這個請求 Header 頭里面獲取,如果沒有配置則用默認的 fake 代替。

// pkg/loki/modules.go func (t *Loki) initDistributor() (services.Service, error) {  ......  if t.cfg.Target != All {   logproto.RegisterPusherServer(t.Server.GRPC, t.distributor)  }   pushHandler := middleware.Merge(   serverutil.RecoveryHTTPMiddleware,   t.HTTPAuthMiddleware,  ).Wrap(http.HandlerFunc(t.distributor.PushHandler))   t.Server.HTTP.Handle("/api/prom/push", pushHandler)  t.Server.HTTP.Handle("/loki/api/v1/push", pushHandler)  return t.distributor, nil }

Push API 處理器實現如下所示,首先通過 ParseRequest 函數將 Http 請求轉換成  logproto.PushRequest,然后直接調用 Distributor 下面的 Push 函數來推送日志數據:

// pkg/distributor/http.go  // PushHandler 從 HTTP body 中讀取一個 snappy 壓縮的 proto func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {  logger := util_log.WithContext(r.Context(), util_log.Logger)  userID, _ := user.ExtractOrgID(r.Context())  req, err := ParseRequest(logger, userID, r)  ......  _, err = d.Push(r.Context(), req)  ...... }  func ParseRequest(logger gokit.Logger, userID string, r *http.Request) (*logproto.PushRequest, error) {  var body lokiutil.SizeReader  contentEncoding := r.Header.Get(contentEnc)  switch contentEncoding {  case "":   body = lokiutil.NewSizeReader(r.Body)  case "snappy":   body = lokiutil.NewSizeReader(r.Body)  case "gzip":   gzipReader, err := gzip.NewReader(r.Body)   if err != nil {    return nil, err   }   defer gzipReader.Close()   body = lokiutil.NewSizeReader(gzipReader)  default:   return nil, fmt.Errorf("Content-Encoding %q not supported", contentEncoding)  }   contentType := r.Header.Get(contentType)  var req logproto.PushRequest  ......  switch contentType {  case applicationJSON:   var err error   if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 {    err = unmarshal.DecodePushRequest(body, &req)   } else {    err = unmarshal_legacy.DecodePushRequest(body, &req)   }   if err != nil {    return nil, err   }  default:   // When no content-type header is set or when it is set to   // `application/x-protobuf`: expect snappy compression.   if err := util.ParseProtoReader(r.Context(), body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil {    return nil, err   }  }  return &req, nil }

首先我們先了解下 PushRequest 的結構,PushRequest 就是一個 Stream 集合:

// pkg/logproto/logproto.pb.go type PushRequest struct {  Streams []Stream `protobuf:"bytes,1,rep,name=streams,proto3,customtype=Stream" json:"streams"` }  // pkg/logproto/types.go // Stream 流包含一個唯一的標簽集,作為一個字符串,然后還包含一組日志條目 type Stream struct {  Labels  string  `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"`  Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"` }  // Entry 是一個帶有時間戳的日志條目 type Entry struct {  Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`  Line      string    `protobuf:"bytes,2,opt,name=line,proto3" json:"line"` }

 Loki 的作用是什么

Loki 的作用是什么

然后查看 Distributor 下的 Push 函數實現:

// pkg/distributor/distributor.go // Push 日志流集合 func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {  // 獲取租戶ID  userID, err := user.ExtractOrgID(ctx)  ......   // 首先把請求平鋪成一個樣本的列表  streams := make([]streamTracker, 0, len(req.Streams))  keys := make([]uint32, 0, len(req.Streams))  var validationErr error  validatedSamplesSize := 0  validatedSamplesCount := 0   validationContext := d.validator.getValidationContextFor(userID)   for _, stream := range req.Streams {   // 解析日志流標簽   stream.Labels, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)   ......   n := 0   for _, entry := range stream.Entries {    // 校驗一個日志Entry實體    if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil {     validationErr = err     continue    }    stream.Entries[n] = entry    n++    // 校驗成功的樣本大小和個數    validatedSamplesSize += len(entry.Line)    validatedSamplesCount++   }   // 去掉校驗失敗的實體   stream.Entries = stream.Entries[:n]    if len(stream.Entries) == 0 {    continue   }   // 為當前日志流生成用于hash換的token值   keys = append(keys, util.TokenFor(userID, stream.Labels))   streams = append(streams, streamTracker{    stream: stream,   })  }   if len(streams) == 0 {   return &logproto.PushResponse{}, validationErr  }   now := time.Now()  // 每個租戶有一個限速器,判斷可以正常傳輸的日志大小是否應該被限制  if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamplesSize) {   // 返回429表明客戶端被限速了   ......   return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize)  }   const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck  var descs [maxExpectedReplicationSet]ring.InstanceDesc   samplesByIngester := map[string][]*streamTracker{}  ingesterDescs := map[string]ring.InstanceDesc{}  for i, key := range keys {   // ReplicationSet 描述了一個指定的鍵與哪些 Ingesters 進行對話,以及可以容忍多少個錯誤   // 根據 label hash 到 hash 環上獲取對應的 ingester 節點,一個節點可能有多個對等的 ingester 副本來做 HA   replicationSet, err := d.ingestersRing.Get(key, ring.Write, descs[:0], nil, nil)   ......   // 最小成功的實例樹   streams[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors   // 可容忍的最大故障實例數   streams[i].maxFailures = replicationSet.MaxErrors   // 將 Stream 按對應的 ingester 進行分組   for _, ingester := range replicationSet.Ingesters {    // 配置每個 ingester 副本對應的日志流數據    samplesByIngester[ingester.Addr] = append(samplesByIngester[ingester.Addr], &streams[i])    ingesterDescs[ingester.Addr] = ingester   }  }   tracker := pushTracker{   done: make(chan struct{}),   err:  make(chan error),  }  tracker.samplesPending.Store(int32(len(streams)))  // 循環Ingesters  for ingester, samples := range samplesByIngester {   // 讓ingester并行處理通過hash環對應的日志流列表   go func(ingester ring.InstanceDesc, samples []*streamTracker) {    ......    // 將日志流樣本數據下發給對應的 ingester 節點    d.sendSamples(localCtx, ingester, samples, &tracker)   }(ingesterDescs[ingester], samples)  }  ...... }

Push 函數的核心就是根據日志流的標簽來計算一個 Token 值,根據這個 Token 值去哈希環上獲取對應的處理日志的 Ingester  實例,然后并行通過 Ingester 處理日志流數據,通過 sendSamples 函數為單個 ingester 去發送日志樣本數據:

// pkg/distributor/distributor.go  func (d *Distributor) sendSamples(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) {  err := d.sendSamplesErr(ctx, ingester, streamTrackers)  ...... }  func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.InstanceDesc, streams []*streamTracker) error {  // 根據 ingester 地址獲取 client  c, err := d.pool.GetClientFor(ingester.Addr)  ......  // 重新構造 PushRequest  req := &logproto.PushRequest{   Streams: make([]logproto.Stream, len(streams)),  }  for i, s := range streams {   req.Streams[i] = s.stream  }  // 通過 Ingester 客戶端請求數據  _, err = c.(logproto.PusherClient).Push(ctx, req)  ...... }

Ingester 寫入日志

Ingester 客戶端中的 Push 函數實際上就是一個 gRPC 服務的客戶端:

// pkg/ingester/ingester.go  // Push 實現 logproto.Pusher. func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {  // 獲取租戶ID  instanceID, err := user.ExtractOrgID(ctx)  ......  // 根據租戶ID獲取 instance 對象  instance := i.getOrCreateInstance(instanceID)  // 直接調用 instance 對象 Push 數據  err = instance.Push(ctx, req)  return &logproto.PushResponse{}, err }

instance 下的 Push 函數:

// pkg/ingester/instance.go  func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {  record := recordPool.GetRecord()  record.UserID = i.instanceID  defer recordPool.PutRecord(record)   i.streamsMtx.Lock()  defer i.streamsMtx.Unlock()   var appendErr error  for _, s := range req.Streams {   // 獲取一個 stream 對象   stream, err := i.getOrCreateStream(s, false, record)   if err != nil {    appendErr = err    continue   }   // 真正用于數據處理的是 stream 對象中的 Push 函數   if _, err := stream.Push(ctx, s.Entries, record); err != nil {    appendErr = err    continue   }  }  ......  return appendErr }  func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, record *WALRecord) (*stream, error) {  if lock {   i.streamsMtx.Lock()   defer i.streamsMtx.Unlock()  }  // 如果 streams 中包含當前標簽列表對應的 stream 對象,則直接返回  stream, ok := i.streams[pushReqStream.Labels]  if ok {   return stream, nil  }  // record 只在重放 WAL 時為 nil  // 我們不希望在重放 WAL 后丟掉數據  // 為 instance 降低 stream 流限制  var err error  if record != nil {   // 限流器判斷   // AssertMaxStreamsPerUser 確保與當前輸入的流數量沒有達到限制   err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))  }  ......  // 解析日志流標簽集  labels, err := logql.ParseLabels(pushReqStream.Labels)  ......  // 獲取對應標簽集的指紋  fp := i.getHashForLabels(labels)  // 重新實例化一個 stream 對象,這里還會維護日志流的倒排索引  sortedLabels := i.index.Add(client.FromLabelsToLabelAdapters(labels), fp)  stream = newStream(i.cfg, fp, sortedLabels, i.metrics)  // 將stream設置到streams中去  i.streams[pushReqStream.Labels] = stream  i.streamsByFP[fp] = stream   // 當重放 wal 的時候 record 是 nil (我們不希望在重放時重寫 wal entries).  if record != nil {   record.Series = append(record.Series, tsdb_record.RefSeries{    Ref:    uint64(fp),    Labels: sortedLabels,   })  } else {   // 如果 record 為 nil,這就是一個 WAL 恢復   i.metrics.recoveredStreamsTotal.Inc()  }  ......  i.addTailersToNewStream(stream)  return stream, nil }

這個里面涉及到 WAL 這一塊的設計,比較復雜,我們可以先看 stream 下面的 Push 函數實現,主要就是將收到的 []Entry 先 Append  到內存中的 Chunk 流([]chunkDesc) 中:

// pkg/ingester/stream.go func (s *stream) Push(ctx context.Context, entries []logproto.Entry, record *WALRecord) (int, error) {  s.chunkMtx.Lock()  defer s.chunkMtx.Unlock()  var bytesAdded int  prevNumChunks := len(s.chunks)  var lastChunkTimestamp time.Time  // 如果之前的 chunks 列表為空,則創建一個新的 chunk  if prevNumChunks == 0 {   s.chunks = append(s.chunks, chunkDesc{    chunk: s.NewChunk(),   })   chunksCreatedTotal.Inc()  } else {   // 獲取最新一個chunk的日志時間戳   _, lastChunkTimestamp = s.chunks[len(s.chunks)-1].chunk.Bounds()  }   var storedEntries []logproto.Entry  failedEntriesWithError := []entryWithError{}   for i := range entries {   // 如果這個日志條目與我們最后 append 的一行的時間戳和內容相匹配,則忽略它   if entries[i].Timestamp.Equal(s.lastLine.ts) && entries[i].Line == s.lastLine.content {    continue   }    // 最新的一個 chunk   chunk := &s.chunks[len(s.chunks)-1]   // 如果當前chunk已經關閉 或者 已經達到設置的最大 Chunk 大小   if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) {    // 如果 chunk 沒有更多的空間,則調用 Close 來以確保 head block 中的數據都被切割和壓縮。    err := chunk.chunk.Close()    ......    chunk.closed = true    ......    // Append 一個新的 Chunk    s.chunks = append(s.chunks, chunkDesc{     chunk: s.NewChunk(),    })    chunk = &s.chunks[len(s.chunks)-1]    lastChunkTimestamp = time.Time{}   }   // 往 chunk 里面 Append 日志數據   if err := chunk.chunk.Append(&entries[i]); err != nil {    failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err})   } else {    // 存儲添加到 chunk 中的日志數據    storedEntries = append(storedEntries, entries[i])    // 配置最后日志行的數據    lastChunkTimestamp = entries[i].Timestamp    s.lastLine.ts = lastChunkTimestamp    s.lastLine.content = entries[i].Line    // 累計大小    bytesAdded += len(entries[i].Line)   }   chunk.lastUpdated = time.Now()  }   if len(storedEntries) != 0 {   // 當重放 wal 的時候 record 將為 nil(我們不希望在重放的時候重寫wal日志條目)   if record != nil {    record.AddEntries(uint64(s.fp), storedEntries...)   }   // 后續是用與tail日志的處理   ......  }  ......  // 如果新增了chunks  if len(s.chunks) != prevNumChunks {   memoryChunks.Add(float64(len(s.chunks) - prevNumChunks))  }  return bytesAdded, nil }

Chunk 其實就是多條日志構成的壓縮包,將日志壓成 Chunk 的可以直接存入對象存儲, 一個 Chunk 到達指定大小之前會不斷 Append  新的日志到里面,而在達到大小之后, Chunk 就會關閉等待持久化(強制持久化也會關閉 Chunk, 比如關閉 ingester 實例時就會關閉所有的  Chunk 并持久化)。Chunk 的大小控制很重要:

  • 假如 Chunk 容量過小: 首先是導致壓縮效率不高,同時也會增加整體的 Chunk 數量, 導致倒排索引過大,最后, 對象存儲的操作次數也會變多,  帶來額外的性能開銷

  • 假如 Chunk 過大: 一個 Chunk 的 open 時間會更長, 占用額外的內存空間, 同時, 也增加了丟數據的風險,Chunk  過大也會導致查詢讀放大

Loki 的作用是什么

在將日志流追加到 Chunk 中過后,在 Ingester 初始化時會啟動兩個循環去處理 Chunk 數據,分別從 chunks  數據取出存入優先級隊列,另外一個循環定期檢查從內存中刪除已經持久化過后的數據。

首先是 Ingester 中定義了一個 flushQueues 屬性,是一個優先級隊列數組,該隊列中存放的是 flushOp:

// pkg/ingester/ingester.go type Ingester struct {  services.Service  ......  // 每個 flush 線程一個隊列,指紋用來選擇隊列  flushQueues     []*util.PriorityQueue  // 優先級隊列數組  flushQueuesDone sync.WaitGroup  ...... }  // pkg/ingester/flush.go // 優先級隊列中存放的數據 type flushOp struct {  from      model.Time  userID    string  fp        model.Fingerprint  immediate bool }

在初始化 Ingester 的時候會根據傳遞的 ConcurrentFlushes 參數來實例化 flushQueues的大小:

// pkg/ingester/ingester.go func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error) {  ......  i := &Ingester{   ......   flushQueues:           make([]*util.PriorityQueue, cfg.ConcurrentFlushes),   ......  }  ......  i.Service = services.NewBasicService(i.starting, i.running, i.stopping)  return i, nil }

然后通過 services.NewBasicService 實例化 Service 的時候指定了服務的 Starting、Running、Stopping  3 個狀態,在其中的 staring 狀態函數中會啟動協程去消費優先級隊列中的數據

// pkg/ingester/ingester.go func (i *Ingester) starting(ctx context.Context) error {  // todo,如果開啟了 WAL 的處理  ......  // 初始化 flushQueues  i.InitFlushQueues()  ......  // 啟動循環檢查chunk數據  i.loopDone.Add(1)  go i.loop()  return nil }

初始化 flushQueues 實現如下所示,其中 flushQueuesDone 是一個 WaitGroup,根據配置的并發數量并發執行  flushLoop 操作:

// pkg/ingester/flush.go func (i *Ingester) InitFlushQueues() {  i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)  for j := 0; j < i.cfg.ConcurrentFlushes; j++ {   // 為每個協程構造一個優先級隊列   i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength)   go i.flushLoop(j)  } }

每一個優先級隊列循環消費數據:

// pkg/ingester/flush.go func (i *Ingester) flushLoop(j int) {  ......  for {   // 從隊列中根據優先級取出數據   o := i.flushQueues[j].Dequeue()   if o == nil {    return   }   op := o.(*flushOp)   // 執行真正的刷新用戶序列數據   err := i.flushUserSeries(op.userID, op.fp, op.immediate)   ......   // 如果退出時刷新失敗了,把失敗的操作放回到隊列中去。   if op.immediate && err != nil {    op.from = op.from.Add(flushBackoff)    i.flushQueues[j].Enqueue(op)   }  } }

刷新用戶的序列操作,也就是要保存到存儲中去:

// pkg/ingester/flush.go // 根據用戶ID刷新用戶日志序列 func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {  instance, ok := i.getInstanceByID(userID)  ......  // 根據instance和fp指紋數據獲取需要刷新的chunks  chunks, labels, chunkMtx := i.collectChunksToFlush(instance, fp, immediate)  ......  // 執行真正的刷新 chunks 操作  err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx)  ...... }  // 收集需要刷新的 chunks func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, labels.Labels, *sync.RWMutex) {  instance.streamsMtx.Lock()  // 根據指紋數據獲取 stream  stream, ok := instance.streamsByFP[fp]  instance.streamsMtx.Unlock()  if !ok {   return nil, nil, nil  }   var result []*chunkDesc  stream.chunkMtx.Lock()  defer stream.chunkMtx.Unlock()  // 循環所有chunks  for j := range stream.chunks {   // 判斷是否應該刷新當前chunk   shouldFlush, reason := i.shouldFlushChunk(&stream.chunks[j])   if immediate || shouldFlush {    // 確保不再對該塊進行寫操作(如果沒有關閉,則設置為關閉狀態)    if !stream.chunks[j].closed {     stream.chunks[j].closed = true    }    // 如果該 chunk 還沒有被成功刷新,則刷新這個塊    if stream.chunks[j].flushed.IsZero() {     result = append(result, &stream.chunks[j])     ......    }   }  }  return result, stream.labels, &stream.chunkMtx }

下面是判斷一個具體的 chunk 是否應該被刷新的邏輯:

// pkg/ingester/flush.go func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) (bool, string) {  // chunk關閉了也應該刷新了  if chunk.closed {   if chunk.synced {    return true, flushReasonSynced   }   return true, flushReasonFull  }  // chunk最后更新的時間超過了配置的 chunk 空閑時間 MaxChunkIdle  if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle {   return true, flushReasonIdle  }   // chunk的邊界時間操過了配置的 chunk  最大時間 MaxChunkAge  if from, to := chunk.chunk.Bounds(); to.Sub(from) > i.cfg.MaxChunkAge {   return true, flushReasonMaxAge  }  return false, "" }

真正將 chunks 數據刷新保存到存儲中是 flushChunks 函數實現的:

// pkg/ingester/flush.go func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc, chunkMtx sync.Locker) error {  ......  wireChunks := make([]chunk.Chunk, len(cs))  // 下面的匿名函數用于生成保存到存儲中的chunk數據  err = func() error {   chunkMtx.Lock()   defer chunkMtx.Unlock()    for j, c := range cs {    if err := c.chunk.Close(); err != nil {     return err    }    firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds())    ch := chunk.NewChunk(     userID, fp, metric,     chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize),     firstTime,     lastTime,    )     chunkSize := c.chunk.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header    start := time.Now()    if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkSize))); err != nil {     return err    }    wireChunks[j] = ch   }   return nil  }()    // 通過 store 接口保存 chunk 數據  if err := i.store.Put(ctx, wireChunks); err != nil {   return err  }   ......   chunkMtx.Lock()  defer chunkMtx.Unlock()  for i, wc := range wireChunks {   // flush 成功,寫入刷新時間   cs[i].flushed = time.Now()   // 下是一些監控數據更新   ......  }   return nil }

chunk 數據被寫入到存儲后,還有有一個協程會去定時清理本地的這些 chunk 數據,在上面的 Ingester 的 staring 函數中最后有一個  go i.loop(),在這個 loop() 函數中會每隔 FlushCheckPeriod(默認 30s,可以通過  --ingester.flush-check-period 進行配置)時間就會去去調用 sweepUsers 函數進行垃圾回收:

// pkg/ingester/ingester.go func (i *Ingester) loop() {  defer i.loopDone.Done()   flushTicker := time.NewTicker(i.cfg.FlushCheckPeriod)  defer flushTicker.Stop()   for {   select {   case <-flushTicker.C:    i.sweepUsers(false, true)   case <-i.loopQuit:    return   }  } }

sweepUsers 函數用于執行將日志流數據加入到優先級隊列中,并對沒有序列的用戶進行垃圾回收:

// pkg/ingester/flush.go // sweepUsers 定期執行 flush 操作,并對沒有序列的用戶進行垃圾回收 func (i *Ingester) sweepUsers(immediate, mayRemoveStreams bool) {  instances := i.getInstances()  for _, instance := range instances {   i.sweepInstance(instance, immediate, mayRemoveStreams)  } }  func (i *Ingester) sweepInstance(instance *instance, immediate, mayRemoveStreams bool) {  instance.streamsMtx.Lock()  defer instance.streamsMtx.Unlock()  for _, stream := range instance.streams {   i.sweepStream(instance, stream, immediate)   i.removeFlushedChunks(instance, stream, mayRemoveStreams)  } }  // must hold streamsMtx func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate bool) {  stream.chunkMtx.RLock()  defer stream.chunkMtx.RUnlock()  if len(stream.chunks) == 0 {   return  }  // 最新的chunk  lastChunk := stream.chunks[len(stream.chunks)-1]  // 判斷是否應該被flush  shouldFlush, _ := i.shouldFlushChunk(&lastChunk)  // 如果只有一個chunk并且不是強制持久化切最新的chunk還不應該被flush,則直接返回  if len(stream.chunks) == 1 && !immediate && !shouldFlush {   return  }  // 根據指紋獲取用與處理的優先級隊列索引  flushQueueIndex := int(uint64(stream.fp) % uint64(i.cfg.ConcurrentFlushes))  firstTime, _ := stream.chunks[0].chunk.Bounds()  // 加入到優先級隊列中去  i.flushQueues[flushQueueIndex].Enqueue(&flushOp{   model.TimeFromUnixNano(firstTime.UnixNano()), instance.instanceID,   stream.fp, immediate,  }) }  // 移除已經flush過后的chunks數據 func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRemoveStream bool) {  now := time.Now()   stream.chunkMtx.Lock()  defer stream.chunkMtx.Unlock()  prevNumChunks := len(stream.chunks)  var subtracted int  for len(stream.chunks) > 0 {   // 如果chunk還沒有被刷新到存儲 或者 chunk被刷新到存儲到現在的時間還沒操過 RetainPeriod(默認15分鐘,可以通過--ingester.chunks-retain-period 進行配置)則忽略   if stream.chunks[0].flushed.IsZero() || now.Sub(stream.chunks[0].flushed) < i.cfg.RetainPeriod {    break   }   subtracted += stream.chunks[0].chunk.UncompressedSize()   // 刪除引用,以便該塊可以被垃圾回收起來   stream.chunks[0].chunk = nil   // 移除chunk   stream.chunks = stream.chunks[1:]  }  ......  // 如果stream中的所有chunk都被清空了,則清空該 stream 的相關數據  if mayRemoveStream && len(stream.chunks) == 0 {   delete(instance.streamsByFP, stream.fp)   delete(instance.streams, stream.labelsString)   instance.index.Delete(stream.labels, stream.fp)   ......  } }

關于存儲或者查詢等模塊的實現在后文再繼續探索,包括 WAL 的實現也較為復雜。

“Loki 的作用是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

金阳县| 新绛县| 丰县| 宁波市| 云梦县| 南昌县| 五河县| 常宁市| 芜湖县| 股票| 上栗县| 公主岭市| 阿巴嘎旗| 禄丰县| 四平市| 日土县| 阿荣旗| 临泉县| 汝州市| 揭东县| 泗阳县| 渝中区| 黎川县| 株洲市| 礼泉县| 庆阳市| 馆陶县| 博野县| 湟源县| 赤水市| 丽江市| 乌鲁木齐市| 永顺县| 桂平市| 平远县| 巢湖市| 壶关县| 泸西县| 文化| 涡阳县| 贞丰县|