fix logging by merging channels into reader

This commit is contained in:
zerodoctor
2023-10-05 21:03:50 -05:00
parent 04332e5527
commit 7603b84fa2
3 changed files with 60 additions and 16 deletions

View File

@@ -14,7 +14,6 @@ import (
"github.com/drone-runners/drone-runner-podman/internal/podman/errors" "github.com/drone-runners/drone-runner-podman/internal/podman/errors"
"github.com/drone-runners/drone-runner-podman/internal/podman/image" "github.com/drone-runners/drone-runner-podman/internal/podman/image"
"github.com/drone-runners/drone-runner-podman/internal/podman/jsonmessage" "github.com/drone-runners/drone-runner-podman/internal/podman/jsonmessage"
"github.com/drone-runners/drone-runner-podman/internal/podman/stdcopy"
"github.com/drone/runner-go/logger" "github.com/drone/runner-go/logger"
"github.com/drone/runner-go/pipeline/runtime" "github.com/drone/runner-go/pipeline/runtime"
@@ -353,8 +352,8 @@ func (e *Podman) deferTail(ctx context.Context, id string, output io.Writer) (lo
Timestamps: toPtr(false), Timestamps: toPtr(false),
} }
out := make(chan string, 100) out := make(chan string, 512)
error := make(chan string, 100) error := make(chan string, 512)
err = containers.Logs(ctx, id, &opts, out, error) err = containers.Logs(ctx, id, &opts, out, error)
if err != nil { if err != nil {
@@ -365,7 +364,8 @@ func (e *Podman) deferTail(ctx context.Context, id string, output io.Writer) (lo
return nil, err return nil, err
} }
stdcopy.StdCopy(output, output, logs) logs = NewChansReadClose(ctx, out, error)
io.Copy(output, logs)
return logs, nil return logs, nil
} }
@@ -387,9 +387,11 @@ func (e *Podman) tail(ctx context.Context, id string, output io.Writer) error {
return err return err
} }
// go func() { go func() {
// stdcopy.StdCopy(output, output, logs) logs := NewChansReadClose(ctx, out, error)
// logs.Close() io.Copy(output, logs)
// }() logs.Close()
}()
return nil return nil
} }

View File

@@ -5,7 +5,6 @@
package engine package engine
import ( import (
"bytes"
"context" "context"
"io" "io"
"reflect" "reflect"
@@ -38,14 +37,14 @@ type ReaderClose struct {
channels []chan string channels []chan string
} }
func NewChansReadClose(ctx context.Context, channels ...chan string) ReaderClose { func NewChansReadClose(ctx context.Context, channels ...chan string) *ReaderClose {
return ReaderClose{ return &ReaderClose{
ctx: ctx, ctx: ctx,
channels: channels, channels: channels,
} }
} }
func (c *ReaderClose) Read(p []byte) (int, error) { func (c *ReaderClose) Read(p []byte) (n int, err error) {
cases := make([]reflect.SelectCase, len(c.channels)) cases := make([]reflect.SelectCase, len(c.channels))
for i := range c.channels { for i := range c.channels {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.channels[i])} cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.channels[i])}
@@ -56,7 +55,7 @@ func (c *ReaderClose) Read(p []byte) (int, error) {
select { select {
case <-c.ctx.Done(): case <-c.ctx.Done():
c.Close() c.Close()
return len(p), nil return n, io.EOF
default: default:
} }
@@ -67,10 +66,10 @@ func (c *ReaderClose) Read(p []byte) (int, error) {
continue continue
} }
io.WriteString(bytes.NewBuffer(p), value.String()) n += copy(p, []byte(value.String()))
} }
return len(p), io.EOF return n, io.EOF
} }
func (c *ReaderClose) Close() error { func (c *ReaderClose) Close() error {

View File

@@ -4,8 +4,51 @@
package engine package engine
import "testing" import (
"bytes"
"context"
"fmt"
"io"
"testing"
"time"
)
func TestChansToReader(t *testing.T) { func TestChansToReader(t *testing.T) {
ctx := context.Background()
stdout := make(chan string, 1000)
logs := NewChansReadClose(ctx, stdout)
mockLogs := []string{
"\u0008",
"this is a log 1\n",
"this is a log 2\n",
"this is a log 3\n",
"this is a log 4\n",
"this is a log 5\n",
"this is a log 6\n",
"this is a log 7\n",
"this is a log 8\n",
"this is a log 9\n",
"this is a log 10\n",
"this is a log 11\n",
"this is a log 12\n",
}
go func(mock []string) {
for i := range mock {
time.Sleep(2)
fmt.Printf("sending [mock=%s]\n", mock[i])
stdout <- mock[i]
}
fmt.Println("closing channel...")
logs.Close()
}(mockLogs)
fmt.Println("starting std copy...")
output := bytes.NewBuffer([]byte{})
io.Copy(output, logs)
fmt.Print(output.String())
} }