diff --git a/engine/engine.go b/engine/engine.go index 8a67128..b2812b6 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -14,7 +14,6 @@ import ( "github.com/drone-runners/drone-runner-podman/internal/podman/errors" "github.com/drone-runners/drone-runner-podman/internal/podman/image" "github.com/drone-runners/drone-runner-podman/internal/podman/jsonmessage" - "github.com/drone-runners/drone-runner-podman/internal/podman/stdcopy" "github.com/drone/runner-go/logger" "github.com/drone/runner-go/pipeline/runtime" @@ -353,8 +352,8 @@ func (e *Podman) deferTail(ctx context.Context, id string, output io.Writer) (lo Timestamps: toPtr(false), } - out := make(chan string, 100) - error := make(chan string, 100) + out := make(chan string, 512) + error := make(chan string, 512) err = containers.Logs(ctx, id, &opts, out, error) if err != nil { @@ -365,7 +364,8 @@ func (e *Podman) deferTail(ctx context.Context, id string, output io.Writer) (lo return nil, err } - stdcopy.StdCopy(output, output, logs) + logs = NewChansReadClose(ctx, out, error) + io.Copy(output, logs) return logs, nil } @@ -387,9 +387,11 @@ func (e *Podman) tail(ctx context.Context, id string, output io.Writer) error { return err } - // go func() { - // stdcopy.StdCopy(output, output, logs) - // logs.Close() - // }() + go func() { + logs := NewChansReadClose(ctx, out, error) + io.Copy(output, logs) + logs.Close() + }() + return nil } diff --git a/engine/util.go b/engine/util.go index ef13ce0..9d869dc 100644 --- a/engine/util.go +++ b/engine/util.go @@ -5,7 +5,6 @@ package engine import ( - "bytes" "context" "io" "reflect" @@ -38,14 +37,14 @@ type ReaderClose struct { channels []chan string } -func NewChansReadClose(ctx context.Context, channels ...chan string) ReaderClose { - return ReaderClose{ +func NewChansReadClose(ctx context.Context, channels ...chan string) *ReaderClose { + return &ReaderClose{ ctx: ctx, channels: channels, } } -func (c *ReaderClose) Read(p []byte) (int, error) { +func (c *ReaderClose) Read(p []byte) (n int, err error) { cases := make([]reflect.SelectCase, len(c.channels)) for i := range c.channels { cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.channels[i])} @@ -56,7 +55,7 @@ func (c *ReaderClose) Read(p []byte) (int, error) { select { case <-c.ctx.Done(): c.Close() - return len(p), nil + return n, io.EOF default: } @@ -67,10 +66,10 @@ func (c *ReaderClose) Read(p []byte) (int, error) { continue } - io.WriteString(bytes.NewBuffer(p), value.String()) + n += copy(p, []byte(value.String())) } - return len(p), io.EOF + return n, io.EOF } func (c *ReaderClose) Close() error { diff --git a/engine/util_test.go b/engine/util_test.go index b5dd010..f81958d 100644 --- a/engine/util_test.go +++ b/engine/util_test.go @@ -4,8 +4,51 @@ package engine -import "testing" +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + "time" +) func TestChansToReader(t *testing.T) { + ctx := context.Background() + stdout := make(chan string, 1000) + + logs := NewChansReadClose(ctx, stdout) + + mockLogs := []string{ + "\u0008", + "this is a log 1\n", + "this is a log 2\n", + "this is a log 3\n", + "this is a log 4\n", + "this is a log 5\n", + "this is a log 6\n", + "this is a log 7\n", + "this is a log 8\n", + "this is a log 9\n", + "this is a log 10\n", + "this is a log 11\n", + "this is a log 12\n", + } + + go func(mock []string) { + for i := range mock { + time.Sleep(2) + fmt.Printf("sending [mock=%s]\n", mock[i]) + stdout <- mock[i] + } + fmt.Println("closing channel...") + logs.Close() + }(mockLogs) + + fmt.Println("starting std copy...") + output := bytes.NewBuffer([]byte{}) + io.Copy(output, logs) + + fmt.Print(output.String()) }