var m =&promMetadata{promURL: conf.prometheus.url,// Start out with the full time range. The shipper will constrain it later.// TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled.mint: conf.limitMinTime.PrometheusTimestamp(),maxt: math.MaxInt64,limitMinTime: conf.limitMinTime,client: promclient.NewWithTracingClient(logger,"thanos-sidecar"),}
根据是否配置了 存储决定开启upload
存储命令行参数 --objstore.config-file
confContentYaml, err := conf.objStore.Content()if err !=nil{return errors.Wrap(err,"getting object store config")}var uploads =trueiflen(confContentYaml)==0{level.Info(logger).Log("msg","no supported bucket was configured, uploads will be disabled")uploads =false}
funcvalidatePrometheus(ctx context.Context, client *promclient.Client, logger log.Logger, ignoreBlockSize bool, m *promMetadata)error{var(flagErr errorflags promclient.Flags)if err := runutil.Retry(2*time.Second, ctx.Done(),func()error{if flags, flagErr = client.ConfiguredFlags(ctx, m.promURL); flagErr !=nil&& flagErr != promclient.ErrFlagEndpointNotFound {level.Warn(logger).Log("msg","failed to get Prometheus flags. Is Prometheus running? Retrying","err", flagErr)return errors.Wrapf(flagErr,"fetch Prometheus flags")}returnnil}); err !=nil{return errors.Wrapf(err,"fetch Prometheus flags")}if flagErr !=nil{level.Warn(logger).Log("msg","failed to check Prometheus flags, due to potentially older Prometheus. No extra validation is done.","err", flagErr)returnnil}// Check if compaction is disabled.if flags.TSDBMinTime != flags.TSDBMaxTime {if!ignoreBlockSize {return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+"Compaction needs to be disabled (storage.tsdb.min-block-duration = storage.tsdb.max-block-duration)", flags.TSDBMaxTime, flags.TSDBMinTime)}level.Warn(logger).Log("msg","flag to ignore Prometheus min/max block duration flags differing is being used. If the upload of a 2h block fails and a Prometheus compaction happens that block may be missing from your Thanos bucket storage.")}// Check if block time is 2h.if flags.TSDBMinTime != model.Duration(2*time.Hour){level.Warn(logger).Log("msg","found that TSDB block time is not 2h. Only 2h block time is recommended.","block-time", flags.TSDBMinTime)}returnnil}
获取prometheus的版本信息
调用prometheus 接口 /api/v1/status/buildinfo
// We retry infinitely until we reach and fetch BuildVersion from our Prometheus.err := runutil.Retry(2*time.Second, ctx.Done(),func()error{if err := m.BuildVersion(ctx); err !=nil{level.Warn(logger).Log("msg","failed to fetch prometheus version. Is Prometheus running? Retrying","err", err,)return err}level.Info(logger).Log("msg","successfully loaded prometheus version",)returnnil})if err !=nil{return errors.Wrap(err,"failed to get prometheus version")}
获取prometheus配置的 external labels
调用 prometheus /api/v1/status/config 接口
// Blocking query of external labels before joining as a Source Peer into gossip.// We retry infinitely until we reach and fetch labels from our Prometheus.err = runutil.Retry(2*time.Second, ctx.Done(),func()error{if err := m.UpdateLabels(ctx); err !=nil{level.Warn(logger).Log("msg","failed to fetch initial external labels. Is Prometheus running? Retrying","err", err,)promUp.Set(0)statusProber.NotReady(err)return err}level.Info(logger).Log("msg","successfully loaded prometheus external labels","external_labels", m.Labels().String(),)promUp.Set(1)statusProber.Ready()lastHeartbeat.SetToCurrentTime()returnnil})if err !=nil{return errors.Wrap(err,"initial external labels query")}
sidecar要求prometheus 采集器一定要配置 external label
iflen(m.Labels())==0{return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")}
// Periodically query the Prometheus config. We use this as a heartbeat as well as for updating// the external labels we apply.return runutil.Repeat(30*time.Second, ctx.Done(),func()error{iterCtx, iterCancel := context.WithTimeout(context.Background(),5*time.Second)deferiterCancel()if err := m.UpdateLabels(iterCtx); err !=nil{level.Warn(logger).Log("msg","heartbeat failed","err", err)promUp.Set(0)}else{promUp.Set(1)lastHeartbeat.SetToCurrentTime()}returnnil})
// NewPrometheusStore returns a new PrometheusStore that uses the given HTTP client// to talk to Prometheus.// It attaches the provided external labels to all results. Provided external labels has to be sorted.funcNewPrometheusStore(logger log.Logger,reg prometheus.Registerer,client *promclient.Client,baseURL *url.URL,component component.StoreAPI,externalLabelsFn func() labels.Labels,timestamps func()(mint int64, maxt int64),promVersion func()string,)(*PrometheusStore,error){if logger ==nil{logger = log.NewNopLogger()}p :=&PrometheusStore{logger: logger,base: baseURL,client: client,component: component,externalLabelsFn: externalLabelsFn,timestamps: timestamps,promVersion: promVersion,remoteReadAcceptableResponses:[]prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES},buffers: sync.Pool{New:func()interface{}{b :=make([]byte,0, initialBufSize)return&b}},framesRead: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{Name:"prometheus_store_received_frames",Help:"Number of frames received per streamed response.",Buckets: prometheus.ExponentialBuckets(10,10,5),},),}return p,nil}
var _Rules_serviceDesc = grpc.ServiceDesc{ServiceName:"thanos.Rules",HandlerType:(*RulesServer)(nil),Methods:[]grpc.MethodDesc{},Streams:[]grpc.StreamDesc{{StreamName:"Rules",Handler: _Rules_Rules_Handler,ServerStreams:true,},},Metadata:"rules/rulespb/rpc.proto",}
通过 /api/v1/rules 获取prometheus 的告警和聚合配置
注册 第3个 grpc service : targets.Prometheus. 获取prometheus的 采集target
var _Targets_serviceDesc = grpc.ServiceDesc{ServiceName:"thanos.Targets",HandlerType:(*TargetsServer)(nil),Methods:[]grpc.MethodDesc{},Streams:[]grpc.StreamDesc{{StreamName:"Targets",Handler: _Targets_Targets_Handler,ServerStreams:true,},},Metadata:"targets/targetspb/rpc.proto",}
// Exemplars returns all specified exemplars from Prometheus.func(p *Prometheus)Exemplars(r *exemplarspb.ExemplarsRequest, s exemplarspb.Exemplars_ExemplarsServer)error{exemplars, err := p.client.ExemplarsInGRPC(s.Context(), p.base, r.Query, r.Start, r.End)if err !=nil{return err}// Prometheus does not add external labels, so we need to add on our own.extLset := p.extLabels()for_, e :=range exemplars {// Make sure the returned series labels are sorted.e.SetSeriesLabels(labelpb.ExtendSortedLabels(e.SeriesLabels.PromLabels(), extLset))var err errortracing.DoInSpan(s.Context(),"send_exemplars_response",func(_ context.Context){err = s.Send(&exemplarspb.ExemplarsResponse{Result:&exemplarspb.ExemplarsResponse_Data{Data: e}})})if err !=nil{return err}}returnnil}
promReadyTimeout := conf.prometheus.readyTimeoutextLabelsCtx, cancel := context.WithTimeout(ctx, promReadyTimeout)defercancel()if err := runutil.Retry(2*time.Second, extLabelsCtx.Done(),func()error{iflen(m.Labels())==0{return errors.New("not uploading as no external labels are configured yet - is Prometheus healthy/reachable?")}returnnil}); err !=nil{return errors.Wrapf(err,"aborting as no external labels found after waiting %s", promReadyTimeout)}
用bkt创建托运人
代码注释中也说明了 shipper会持续的扫描 data目录,上传数据
// The background shipper continuously scans the data directory and uploads// new blocks to Google Cloud Storage or an S3-compatible storage service.s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))
读取 tsdb.path 下面的 thanos.shipper.json ,读取已经上传的block id
这个文件记录了已经通过sidecar上传的 block id
meta, err :=ReadMetaFile(s.dir)if err !=nil{// If we encounter any error, proceed with an empty meta file and overwrite it later.// The meta file is only used to avoid unnecessary bucket.Exists call,// which are properly handled by the system if their occur anyway.if!os.IsNotExist(err){level.Warn(s.logger).Log("msg","reading meta file failed, will override it","err", err)}meta =&Meta{Version: MetaVersion1}}// Build a map of blocks we already uploaded.hasUploaded :=make(map[ulid.ULID]struct{},len(meta.Uploaded))for_, id :=range meta.Uploaded {hasUploaded[id]=struct{}{}}// Reset the uploaded slice so we can rebuild it only with blocks that still exist locally.meta.Uploaded =nil
if err := s.upload(ctx, m); err !=nil{if!s.allowOutOfOrderUploads {return0, errors.Wrapf(err,"upload %v", m.ULID)}// No error returned, just log line. This is because we want other blocks to be uploaded even// though this one failed. It will be retried on second Sync iteration.level.Error(s.logger).Log("msg","shipping failed","block", m.ULID,"err", err)uploadErrs++continue}
upload函数
会在tsdb的data目录下创建 thanos/upload目录
然后以block文件夹的名字创建目录
再创建硬链接操作,避免上传过程中数据被tsdb其他动作占用删除等
level.Info(s.logger).Log("msg","upload new block","id", meta.ULID)// We hard-link the files into a temporary upload directory so we are not affected// by other operations happening against the TSDB directory.updir := filepath.Join(s.dir,"thanos","upload", meta.ULID.String())// Remove updir just in case.if err := os.RemoveAll(updir); err !=nil{return errors.Wrap(err,"clean upload directory")}if err := os.MkdirAll(updir,0750); err !=nil{return errors.Wrap(err,"create upload dir")}deferfunc(){if err := os.RemoveAll(updir); err !=nil{level.Error(s.logger).Log("msg","failed to clean upload directory","err", err)}}()dir := filepath.Join(s.dir, meta.ULID.String())if err :=hardlinkBlock(dir, updir); err !=nil{return errors.Wrap(err,"hard link block")}// Attach current labels and write a new meta file with Thanos extensions.if lset := s.labels(); lset !=nil{meta.Thanos.Labels = lset.Map()}meta.Thanos.Source = s.sourcemeta.Thanos.SegmentFiles = block.GetSegmentFiles(updir)if err := meta.WriteToDir(s.logger, updir); err !=nil{return errors.Wrap(err,"write meta file")}return block.Upload(ctx, s.logger, s.bucket, updir, s.hashFunc)
if err := bkt.Upload(ctx, path.Join(DebugMetas, fmt.Sprintf("%s.json", id)), strings.NewReader(metaEncoded.String())); err !=nil{returncleanUp(logger, bkt, id, errors.Wrap(err,"upload debug meta file"))}if err := objstore.UploadDir(ctx, logger, bkt, path.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname)); err !=nil{returncleanUp(logger, bkt, id, errors.Wrap(err,"upload chunks"))}if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexFilename), path.Join(id.String(), IndexFilename)); err !=nil{returncleanUp(logger, bkt, id, errors.Wrap(err,"upload index"))}// Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file to be pending uploads.if err := bkt.Upload(ctx, path.Join(id.String(), MetaFilename), strings.NewReader(metaEncoded.String())); err !=nil{// Don't call cleanUp here. Despite getting error, meta.json may have been uploaded in certain cases,