initial commit [ci skip]

This commit is contained in:
Brad Rydzewski
2019-10-10 19:01:58 -07:00
parent 56c135e4ae
commit 43bbf6e78c
95 changed files with 6579 additions and 1 deletions

242
runtime/execer.go Normal file
View File

@@ -0,0 +1,242 @@
// Code generated automatically. DO NOT EDIT.
// Copyright 2019 Drone.IO Inc. All rights reserved.
// Use of this source code is governed by the Polyform License
// that can be found in the LICENSE file.
package runtime
import (
"context"
"sync"
"github.com/drone-runners/drone-runner-docker/engine"
"github.com/drone-runners/drone-runner-docker/engine/replacer"
"github.com/drone/drone-go/drone"
"github.com/drone/runner-go/environ"
"github.com/drone/runner-go/logger"
"github.com/drone/runner-go/pipeline"
"github.com/hashicorp/go-multierror"
"github.com/natessilva/dag"
"golang.org/x/sync/semaphore"
)
// Execer is the execution context for executing the intermediate
// representation of a pipeline.
type Execer interface {
Exec(context.Context, *engine.Spec, *pipeline.State) error
}
type execer struct {
mu sync.Mutex
engine engine.Engine
reporter pipeline.Reporter
streamer pipeline.Streamer
sem *semaphore.Weighted
}
// NewExecer returns a new execer used
func NewExecer(
reporter pipeline.Reporter,
streamer pipeline.Streamer,
engine engine.Engine,
procs int64,
) Execer {
exec := &execer{
reporter: reporter,
streamer: streamer,
engine: engine,
}
if procs > 0 {
// optional semaphor that limits the number of steps
// that can execute concurrently.
exec.sem = semaphore.NewWeighted(procs)
}
return exec
}
// Exec executes the intermediate representation of the pipeline
// and returns an error if execution fails.
func (e *execer) Exec(ctx context.Context, spec *engine.Spec, state *pipeline.State) error {
defer e.engine.Destroy(noContext, spec)
if err := e.engine.Setup(noContext, spec); err != nil {
state.FailAll(err)
return e.reporter.ReportStage(noContext, state)
}
// create a directed graph, where each vertex in the graph
// is a pipeline step.
var d dag.Runner
for _, s := range spec.Steps {
step := s
d.AddVertex(step.Name, func() error {
return e.exec(ctx, state, spec, step)
})
}
// create the vertex edges from the values configured in the
// depends_on attribute.
for _, s := range spec.Steps {
for _, dep := range s.DependsOn {
d.AddEdge(dep, s.Name)
}
}
var result error
if err := d.Run(); err != nil {
multierror.Append(result, err)
}
// once pipeline execution completes, notify the state
// manageer that all steps are finished.
state.FinishAll()
if err := e.reporter.ReportStage(noContext, state); err != nil {
multierror.Append(result, err)
}
return result
}
func (e *execer) exec(ctx context.Context, state *pipeline.State, spec *engine.Spec, step *engine.Step) error {
var result error
select {
case <-ctx.Done():
state.Cancel()
return nil
default:
}
log := logger.FromContext(ctx)
log = log.WithField("step.name", step.Name)
ctx = logger.WithContext(ctx, log)
if e.sem != nil {
// the semaphore limits the number of steps that can run
// concurrently. acquire the semaphore and release when
// the pipeline completes.
if err := e.sem.Acquire(ctx, 1); err != nil {
return nil
}
defer func() {
// recover from a panic to ensure the semaphore is
// released to prevent deadlock. we do not expect a
// panic, however, we are being overly cautious.
if r := recover(); r != nil {
// TODO(bradrydzewsi) log the panic.
}
// release the semaphore
e.sem.Release(1)
}()
}
switch {
case state.Skipped():
return nil
case state.Cancelled():
return nil
case step.RunPolicy == engine.RunNever:
return nil
case step.RunPolicy == engine.RunAlways:
break
case step.RunPolicy == engine.RunOnFailure && state.Failed() == false:
state.Skip(step.Name)
return e.reporter.ReportStep(noContext, state, step.Name)
case step.RunPolicy == engine.RunOnSuccess && state.Failed():
state.Skip(step.Name)
return e.reporter.ReportStep(noContext, state, step.Name)
}
state.Start(step.Name)
err := e.reporter.ReportStep(noContext, state, step.Name)
if err != nil {
return err
}
copy := cloneStep(step)
// the pipeline environment variables need to be updated to
// reflect the current state of the build and stage.
state.Lock()
copy.Envs = environ.Combine(
copy.Envs,
environ.Build(state.Build),
environ.Stage(state.Stage),
environ.Step(findStep(state, step.Name)),
)
state.Unlock()
// writer used to stream build logs.
wc := e.streamer.Stream(noContext, state, step.Name)
wc = replacer.New(wc, step.Secrets)
// if the step is configured as a daemon, it is detached
// from the main process and executed separately.
// todo(bradrydzewski) this code is still experimental.
if step.Detach {
go func() {
e.engine.Run(ctx, spec, copy, wc)
wc.Close()
}()
return nil
}
exited, err := e.engine.Run(ctx, spec, copy, wc)
// close the stream. If the session is a remote session, the
// full log buffer is uploaded to the remote server.
if err := wc.Close(); err != nil {
multierror.Append(result, err)
}
if exited != nil {
state.Finish(step.Name, exited.ExitCode)
err := e.reporter.ReportStep(noContext, state, step.Name)
if err != nil {
multierror.Append(result, err)
}
// if the exit code is 78 the system will skip all
// subsequent pending steps in the pipeline.
if exited.ExitCode == 78 {
state.SkipAll()
}
return result
}
switch err {
case context.Canceled, context.DeadlineExceeded:
state.Cancel()
return nil
}
// if the step failed with an internal error (as oppsed to a
// runtime error) the step is failed.
state.Fail(step.Name, err)
err = e.reporter.ReportStep(noContext, state, step.Name)
if err != nil {
multierror.Append(result, err)
}
return result
}
// helper function to clone a step. The runner mutates a step to
// update the environment variables to reflect the current
// pipeline state.
func cloneStep(src *engine.Step) *engine.Step {
dst := new(engine.Step)
*dst = *src
dst.Envs = environ.Combine(src.Envs)
return dst
}
// helper function returns the named step from the state.
func findStep(state *pipeline.State, name string) *drone.Step {
for _, step := range state.Stage.Steps {
if step.Name == name {
return step
}
}
panic("step not found: " + name)
}

39
runtime/execer_test.go Normal file
View File

@@ -0,0 +1,39 @@
// Code generated automatically. DO NOT EDIT.
// Copyright 2019 Drone.IO Inc. All rights reserved.
// Use of this source code is governed by the Polyform License
// that can be found in the LICENSE file.
package runtime
import (
"testing"
)
func TestExec(t *testing.T) {
t.Skip()
}
func TestExec_NonZeroExit(t *testing.T) {
t.Skip()
}
func TestExec_Exit78(t *testing.T) {
t.Skip()
}
func TestExec_Error(t *testing.T) {
t.Skip()
}
func TestExec_CtxError(t *testing.T) {
t.Skip()
}
func TestExec_ReportError(t *testing.T) {
t.Skip()
}
func TestExec_SkipCtxDone(t *testing.T) {
t.Skip()
}

72
runtime/poller.go Normal file
View File

@@ -0,0 +1,72 @@
// Code generated automatically. DO NOT EDIT.
// Copyright 2019 Drone.IO Inc. All rights reserved.
// Use of this source code is governed by the Polyform License
// that can be found in the LICENSE file.
package runtime
import (
"context"
"sync"
"github.com/drone/runner-go/client"
"github.com/drone/runner-go/logger"
)
var noContext = context.Background()
// Poller polls the server for pending stages and dispatches
// for execution by the Runner.
type Poller struct {
Client client.Client
Filter *client.Filter
Runner *Runner
}
// Poll opens N connections to the server to poll for pending
// stages for execution. Pending stages are dispatched to a
// Runner for execution.
func (p *Poller) Poll(ctx context.Context, n int) {
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
for {
select {
case <-ctx.Done():
wg.Done()
return
default:
p.poll(ctx, i+1)
}
}
}(i)
}
wg.Wait()
}
// poll requests a stage for execution from the server, and then
// dispatches for execution.
func (p *Poller) poll(ctx context.Context, thread int) error {
log := logger.FromContext(ctx).WithField("thread", thread)
log.WithField("thread", thread).Debug("request stage from remote server")
// request a new build stage for execution from the central
// build server.
stage, err := p.Client.Request(ctx, p.Filter)
if err != nil {
log.WithError(err).Error("cannot request stage")
return err
}
// exit if a nil or empty stage is returned from the system
// and allow the runner to retry.
if stage == nil || stage.ID == 0 {
return nil
}
return p.Runner.Run(
logger.WithContext(noContext, log), stage)
}

27
runtime/poller_test.go Normal file
View File

@@ -0,0 +1,27 @@
// Code generated automatically. DO NOT EDIT.
// Copyright 2019 Drone.IO Inc. All rights reserved.
// Use of this source code is governed by the Polyform License
// that can be found in the LICENSE file.
package runtime
import (
"testing"
)
func TestPoll(t *testing.T) {
t.Skip()
}
func TestPoll_NilStage(t *testing.T) {
t.Skip()
}
func TestPoll_EmptyStage(t *testing.T) {
t.Skip()
}
func TestPoll_RequestError(t *testing.T) {
t.Skip()
}

236
runtime/runner.go Normal file
View File

@@ -0,0 +1,236 @@
// Code generated automatically. DO NOT EDIT.
// Copyright 2019 Drone.IO Inc. All rights reserved.
// Use of this source code is governed by the Polyform License
// that can be found in the LICENSE file.
package runtime
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/drone-runners/drone-runner-docker/engine"
"github.com/drone-runners/drone-runner-docker/engine/compiler"
"github.com/drone-runners/drone-runner-docker/engine/resource"
"github.com/drone/drone-go/drone"
"github.com/drone/envsubst"
"github.com/drone/runner-go/client"
"github.com/drone/runner-go/environ"
"github.com/drone/runner-go/logger"
"github.com/drone/runner-go/manifest"
"github.com/drone/runner-go/pipeline"
"github.com/drone/runner-go/secret"
)
// Runnner runs the pipeline.
type Runner struct {
// Client is the remote client responsible for interacting
// with the central server.
Client client.Client
// Execer is responsible for executing intermediate
// representation of the pipeline and returns its results.
Execer Execer
// Reporter reports pipeline status back to the remote
// server.
Reporter pipeline.Reporter
// Environ provides custom, global environment variables
// that are added to every pipeline step.
Environ map[string]string
// Machine provides the runner with the name of the host
// machine executing the pipeline.
Machine string
// Match is an optional function that returns true if the
// repository or build match user-defined criteria. This is
// intended as a security measure to prevent a runner from
// processing an unwanted pipeline.
Match func(*drone.Repo, *drone.Build) bool
// Secret provides the compiler with secrets.
Secret secret.Provider
}
// Run runs the pipeline stage.
func (s *Runner) Run(ctx context.Context, stage *drone.Stage) error {
log := logger.FromContext(ctx).
WithField("stage.id", stage.ID).
WithField("stage.name", stage.Name).
WithField("stage.number", stage.Number)
log.Debug("stage received")
// delivery to a single agent is not guaranteed, which means
// we need confirm receipt. The first agent that confirms
// receipt of the stage can assume ownership.
stage.Machine = s.Machine
err := s.Client.Accept(ctx, stage)
if err != nil {
log.WithError(err).Error("cannot accept stage")
return err
}
log.Debug("stage accepted")
data, err := s.Client.Detail(ctx, stage)
if err != nil {
log.WithError(err).Error("cannot get stage details")
return err
}
log = log.WithField("repo.id", data.Repo.ID).
WithField("repo.namespace", data.Repo.Namespace).
WithField("repo.name", data.Repo.Name).
WithField("build.id", data.Build.ID).
WithField("build.number", data.Build.Number)
log.Debug("stage details fetched")
ctxdone, cancel := context.WithCancel(ctx)
defer cancel()
timeout := time.Duration(data.Repo.Timeout) * time.Minute
ctxtimeout, cancel := context.WithTimeout(ctxdone, timeout)
defer cancel()
ctxcancel, cancel := context.WithCancel(ctxtimeout)
defer cancel()
// next we opens a connection to the server to watch for
// cancellation requests. If a build is cancelled the running
// stage should also be cancelled.
go func() {
done, _ := s.Client.Watch(ctxdone, data.Build.ID)
if done {
cancel()
log.Debugln("received cancellation")
} else {
log.Debugln("done listening for cancellations")
}
}()
envs := environ.Combine(
s.Environ,
environ.System(data.System),
environ.Repo(data.Repo),
environ.Build(data.Build),
environ.Stage(stage),
environ.Link(data.Repo, data.Build, data.System),
data.Build.Params,
)
// string substitution function ensures that string
// replacement variables are escaped and quoted if they
// contain a newline character.
subf := func(k string) string {
v := envs[k]
if strings.Contains(v, "\n") {
v = fmt.Sprintf("%q", v)
}
return v
}
state := &pipeline.State{
Build: data.Build,
Stage: stage,
Repo: data.Repo,
System: data.System,
}
// evaluates whether or not the agent can process the
// pipeline. An agent may choose to reject a repository
// or build for security reasons.
if s.Match != nil && s.Match(data.Repo, data.Build) == false {
log.Error("cannot process stage, access denied")
state.FailAll(errors.New("insufficient permission to run the pipeline"))
return s.Reporter.ReportStage(noContext, state)
}
// evaluates string replacement expressions and returns an
// update configuration file string.
config, err := envsubst.Eval(string(data.Config.Data), subf)
if err != nil {
log.WithError(err).Error("cannot emulate bash substitution")
state.FailAll(err)
return s.Reporter.ReportStage(noContext, state)
}
// parse the yaml configuration file.
manifest, err := manifest.ParseString(config)
if err != nil {
log.WithError(err).Error("cannot parse configuration file")
state.FailAll(err)
return s.Reporter.ReportStage(noContext, state)
}
// find the named stage in the yaml configuration file.
resource, err := resource.Lookup(stage.Name, manifest)
if err != nil {
log.WithError(err).Error("cannot find pipeline resource")
state.FailAll(err)
return s.Reporter.ReportStage(noContext, state)
}
secrets := secret.Combine(
secret.Static(data.Secrets),
secret.Encrypted(),
s.Secret,
)
// compile the yaml configuration file to an intermediate
// representation, and then
comp := &compiler.Compiler{
Pipeline: resource,
Manifest: manifest,
Environ: s.Environ,
Build: data.Build,
Stage: stage,
Repo: data.Repo,
System: data.System,
Netrc: data.Netrc,
Secret: secrets,
}
spec := comp.Compile(ctx)
for _, src := range spec.Steps {
// steps that are skipped are ignored and are not stored
// in the drone database, nor displayed in the UI.
if src.RunPolicy == engine.RunNever {
continue
}
stage.Steps = append(stage.Steps, &drone.Step{
Name: src.Name,
Number: len(stage.Steps) + 1,
StageID: stage.ID,
Status: drone.StatusPending,
ErrIgnore: src.IgnoreErr,
})
}
stage.Started = time.Now().Unix()
stage.Status = drone.StatusRunning
if err := s.Client.Update(ctx, stage); err != nil {
log.WithError(err).Error("cannot update stage")
return err
}
log.Debug("updated stage to running")
ctxcancel = logger.WithContext(ctxcancel, log)
err = s.Execer.Exec(ctxcancel, spec, state)
if err != nil {
log.WithError(err).Debug("stage failed")
return err
}
log.Debug("updated stage to complete")
return nil
}

7
runtime/runner_test.go Normal file
View File

@@ -0,0 +1,7 @@
// Code generated automatically. DO NOT EDIT.
// Copyright 2019 Drone.IO Inc. All rights reserved.
// Use of this source code is governed by the Polyform License
// that can be found in the LICENSE file.
package runtime