me own pihole

This commit is contained in:
2020-07-23 17:18:21 +02:00
parent 74b727b62b
commit 1c51a6604b
39 changed files with 2769 additions and 30 deletions

View File

@@ -0,0 +1,10 @@
package s3
// Config holds values to configure the driver
type Config struct {
AccessKeyID string
SecretAccessKey string
Region string
Endpoint string
Mounter string
}

View File

@@ -0,0 +1,201 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package s3
import (
"crypto/sha1"
"encoding/hex"
"fmt"
"io"
"strings"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/container-storage-interface/spec/lib/go/csi"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type controllerServer struct {
*csicommon.DefaultControllerServer
}
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
volumeID := sanitizeVolumeID(req.GetName())
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
glog.V(3).Infof("invalid create volume req: %v", req)
return nil, err
}
// Check arguments
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Name missing in request")
}
if req.GetVolumeCapabilities() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request")
}
capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes())
params := req.GetParameters()
mounter := params[mounterTypeKey]
glog.V(4).Infof("Got a request to create volume %s", volumeID)
s3, err := newS3ClientFromSecrets(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := s3.bucketExists(volumeID)
if err != nil {
return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err)
}
if exists {
var b *bucket
b, err = s3.getBucket(volumeID)
if err != nil {
return nil, fmt.Errorf("failed to get bucket metadata of bucket %s: %v", volumeID, err)
}
// Check if volume capacity requested is bigger than the already existing capacity
if capacityBytes > b.CapacityBytes {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID))
}
} else {
if err = s3.createBucket(volumeID); err != nil {
return nil, fmt.Errorf("failed to create volume %s: %v", volumeID, err)
}
if err = s3.createPrefix(volumeID, fsPrefix); err != nil {
return nil, fmt.Errorf("failed to create prefix %s: %v", fsPrefix, err)
}
}
b := &bucket{
Name: volumeID,
Mounter: mounter,
CapacityBytes: capacityBytes,
FSPath: fsPrefix,
}
if err := s3.setBucket(b); err != nil {
return nil, fmt.Errorf("Error setting bucket metadata: %v", err)
}
glog.V(4).Infof("create volume %s", volumeID)
s3Vol := s3Volume{}
s3Vol.VolName = volumeID
s3Vol.VolID = volumeID
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volumeID,
CapacityBytes: capacityBytes,
VolumeContext: req.GetParameters(),
},
}, nil
}
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
volumeID := req.GetVolumeId()
// Check arguments
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
glog.V(3).Infof("Invalid delete volume req: %v", req)
return nil, err
}
glog.V(4).Infof("Deleting volume %s", volumeID)
s3, err := newS3ClientFromSecrets(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := s3.bucketExists(volumeID)
if err != nil {
return nil, err
}
if exists {
if err := s3.removeBucket(volumeID); err != nil {
glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err)
return nil, err
}
} else {
glog.V(5).Infof("Bucket %s does not exist, ignoring request", volumeID)
}
return &csi.DeleteVolumeResponse{}, nil
}
func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
// Check arguments
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if req.GetVolumeCapabilities() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
}
s3, err := newS3ClientFromSecrets(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := s3.bucketExists(req.GetVolumeId())
if err != nil {
return nil, err
}
if !exists {
// return an error if the volume requested does not exist
return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume with id %s does not exist", req.GetVolumeId()))
}
// We currently only support RWO
supportedAccessMode := &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
}
for _, cap := range req.VolumeCapabilities {
if cap.GetAccessMode().GetMode() != supportedAccessMode.GetMode() {
return &csi.ValidateVolumeCapabilitiesResponse{Message: "Only single node writer is supported"}, nil
}
}
return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
VolumeCapabilities: []*csi.VolumeCapability{
{
AccessMode: supportedAccessMode,
},
},
},
}, nil
}
func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
return &csi.ControllerExpandVolumeResponse{}, status.Error(codes.Unimplemented, "ControllerExpandVolume is not implemented")
}
func sanitizeVolumeID(volumeID string) string {
volumeID = strings.ToLower(volumeID)
if len(volumeID) > 63 {
h := sha1.New()
io.WriteString(h, volumeID)
volumeID = hex.EncodeToString(h.Sum(nil))
}
return volumeID
}

View File

@@ -0,0 +1,25 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package s3
import (
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type identityServer struct {
*csicommon.DefaultIdentityServer
}

View File

@@ -0,0 +1,82 @@
package s3
import (
"fmt"
"os/exec"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/mount"
)
// Mounter interface which can be implemented
// by the different mounter types
type Mounter interface {
Stage(stagePath string) error
Unstage(stagePath string) error
Mount(source string, target string) error
}
const (
s3fsMounterType = "s3fs"
goofysMounterType = "goofys"
s3backerMounterType = "s3backer"
rcloneMounterType = "rclone"
mounterTypeKey = "mounter"
)
// newMounter returns a new mounter depending on the mounterType parameter
func newMounter(bucket *bucket, cfg *Config) (Mounter, error) {
mounter := bucket.Mounter
// Fall back to mounterType in cfg
if len(bucket.Mounter) == 0 {
mounter = cfg.Mounter
}
switch mounter {
case s3fsMounterType:
return newS3fsMounter(bucket, cfg)
case goofysMounterType:
return newGoofysMounter(bucket, cfg)
case s3backerMounterType:
return newS3backerMounter(bucket, cfg)
case rcloneMounterType:
return newRcloneMounter(bucket, cfg)
default:
// default to s3backer
return newS3backerMounter(bucket, cfg)
}
}
func fuseMount(path string, command string, args []string) error {
cmd := exec.Command(command, args...)
glog.V(3).Infof("Mounting fuse with command: %s and args: %s", command, args)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Error fuseMount command: %s\nargs: %s\noutput: %s", command, args, out)
}
return waitForMount(path, 10*time.Second)
}
func fuseUnmount(path string) error {
if err := mount.New("").Unmount(path); err != nil {
return err
}
// as fuse quits immediately, we will try to wait until the process is done
process, err := findFuseMountProcess(path)
if err != nil {
glog.Errorf("Error getting PID of fuse mount: %s", err)
return nil
}
if process == nil {
glog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path)
return nil
}
glog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path)
return waitForProcess(process, 1)
}

View File

@@ -0,0 +1,71 @@
package s3
import (
"fmt"
"os"
"context"
goofysApi "github.com/kahing/goofys/api"
)
const (
goofysCmd = "goofys"
defaultRegion = "us-east-1"
)
// Implements Mounter
type goofysMounter struct {
bucket *bucket
endpoint string
region string
accessKeyID string
secretAccessKey string
}
func newGoofysMounter(b *bucket, cfg *Config) (Mounter, error) {
region := cfg.Region
// if endpoint is set we need a default region
if region == "" && cfg.Endpoint != "" {
region = defaultRegion
}
return &goofysMounter{
bucket: b,
endpoint: cfg.Endpoint,
region: region,
accessKeyID: cfg.AccessKeyID,
secretAccessKey: cfg.SecretAccessKey,
}, nil
}
func (goofys *goofysMounter) Stage(stageTarget string) error {
return nil
}
func (goofys *goofysMounter) Unstage(stageTarget string) error {
return nil
}
func (goofys *goofysMounter) Mount(source string, target string) error {
goofysCfg := &goofysApi.Config{
MountPoint: target,
Endpoint: goofys.endpoint,
Region: goofys.region,
DirMode: 0755,
FileMode: 0644,
MountOptions: map[string]string{
"allow_other": "",
},
}
os.Setenv("AWS_ACCESS_KEY_ID", goofys.accessKeyID)
os.Setenv("AWS_SECRET_ACCESS_KEY", goofys.secretAccessKey)
fullPath := fmt.Sprintf("%s:%s", goofys.bucket.Name, goofys.bucket.FSPath)
_, _, err := goofysApi.Mount(context.Background(), fullPath, goofysCfg)
if err != nil {
return fmt.Errorf("Error mounting via goofys: %s", err)
}
return nil
}

View File

@@ -0,0 +1,56 @@
package s3
import (
"fmt"
"os"
)
// Implements Mounter
type rcloneMounter struct {
bucket *bucket
url string
region string
accessKeyID string
secretAccessKey string
}
const (
rcloneCmd = "rclone"
)
func newRcloneMounter(b *bucket, cfg *Config) (Mounter, error) {
return &rcloneMounter{
bucket: b,
url: cfg.Endpoint,
region: cfg.Region,
accessKeyID: cfg.AccessKeyID,
secretAccessKey: cfg.SecretAccessKey,
}, nil
}
func (rclone *rcloneMounter) Stage(stageTarget string) error {
return nil
}
func (rclone *rcloneMounter) Unstage(stageTarget string) error {
return nil
}
func (rclone *rcloneMounter) Mount(source string, target string) error {
args := []string{
"mount",
fmt.Sprintf(":s3:%s/%s", rclone.bucket.Name, rclone.bucket.FSPath),
fmt.Sprintf("%s", target),
"--daemon",
"--s3-provider=AWS",
"--s3-env-auth=true",
fmt.Sprintf("--s3-region=%s", rclone.region),
fmt.Sprintf("--s3-endpoint=%s", rclone.url),
"--allow-other",
// TODO: make this configurable
"--vfs-cache-mode=writes",
}
os.Setenv("AWS_ACCESS_KEY_ID", rclone.accessKeyID)
os.Setenv("AWS_SECRET_ACCESS_KEY", rclone.secretAccessKey)
return fuseMount(target, rcloneCmd, args)
}

View File

@@ -0,0 +1,154 @@
package s3
import (
"fmt"
"net/url"
"os"
"os/exec"
"path"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/mount"
)
// Implements Mounter
type s3backerMounter struct {
bucket *bucket
url string
region string
accessKeyID string
secretAccessKey string
ssl bool
}
const (
s3backerCmd = "s3backer"
s3backerFsType = "xfs"
s3backerDevice = "file"
// blockSize to use in k
s3backerBlockSize = "128k"
s3backerDefaultSize = 1024 * 1024 * 1024 // 1GiB
// S3backerLoopDevice the loop device required by s3backer
S3backerLoopDevice = "/dev/loop0"
)
func newS3backerMounter(bucket *bucket, cfg *Config) (Mounter, error) {
url, err := url.Parse(cfg.Endpoint)
if err != nil {
return nil, err
}
url.Path = path.Join(url.Path, bucket.Name, bucket.FSPath)
// s3backer cannot work with 0 size volumes
if bucket.CapacityBytes == 0 {
bucket.CapacityBytes = s3backerDefaultSize
}
s3backer := &s3backerMounter{
bucket: bucket,
url: cfg.Endpoint,
region: cfg.Region,
accessKeyID: cfg.AccessKeyID,
secretAccessKey: cfg.SecretAccessKey,
ssl: url.Scheme == "https",
}
return s3backer, s3backer.writePasswd()
}
func (s3backer *s3backerMounter) String() string {
return s3backer.bucket.Name
}
func (s3backer *s3backerMounter) Stage(stageTarget string) error {
// s3backer uses the loop device
if err := createLoopDevice(S3backerLoopDevice); err != nil {
return err
}
// s3backer requires two mounts
// first mount will fuse mount the bucket to a single 'file'
if err := s3backer.mountInit(stageTarget); err != nil {
return err
}
// ensure 'file' device is formatted
err := formatFs(s3backerFsType, path.Join(stageTarget, s3backerDevice))
if err != nil {
fuseUnmount(stageTarget)
}
return err
}
func (s3backer *s3backerMounter) Unstage(stageTarget string) error {
// Unmount the s3backer fuse mount
return fuseUnmount(stageTarget)
}
func (s3backer *s3backerMounter) Mount(source string, target string) error {
device := path.Join(source, s3backerDevice)
// second mount will mount the 'file' as a filesystem
err := mount.New("").Mount(device, target, s3backerFsType, []string{})
if err != nil {
// cleanup fuse mount
fuseUnmount(target)
return err
}
return nil
}
func (s3backer *s3backerMounter) mountInit(path string) error {
args := []string{
fmt.Sprintf("--blockSize=%s", s3backerBlockSize),
fmt.Sprintf("--size=%v", s3backer.bucket.CapacityBytes),
fmt.Sprintf("--prefix=%s/", s3backer.bucket.FSPath),
"--listBlocks",
s3backer.bucket.Name,
path,
}
if s3backer.region != "" {
args = append(args, fmt.Sprintf("--region=%s", s3backer.region))
} else {
// only set baseURL if not on AWS (region is not set)
// baseURL must end with /
args = append(args, fmt.Sprintf("--baseURL=%s/", s3backer.url))
}
if s3backer.ssl {
args = append(args, "--ssl")
}
return fuseMount(path, s3backerCmd, args)
}
func (s3backer *s3backerMounter) writePasswd() error {
pwFileName := fmt.Sprintf("%s/.s3backer_passwd", os.Getenv("HOME"))
pwFile, err := os.OpenFile(pwFileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
_, err = pwFile.WriteString(s3backer.accessKeyID + ":" + s3backer.secretAccessKey)
if err != nil {
return err
}
pwFile.Close()
return nil
}
func formatFs(fsType string, device string) error {
diskMounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: mount.NewOsExec()}
format, err := diskMounter.GetDiskFormat(device)
if err != nil {
return err
}
if format != "" {
glog.Infof("Disk %s is already formatted with format %s", device, format)
return nil
}
args := []string{
device,
}
cmd := exec.Command("mkfs."+fsType, args...)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Error formatting disk: %s", out)
}
glog.Infof("Formatting fs with type %s", fsType)
return nil
}

View File

@@ -0,0 +1,65 @@
package s3
import (
"fmt"
"os"
)
// Implements Mounter
type s3fsMounter struct {
bucket *bucket
url string
region string
pwFileContent string
}
const (
s3fsCmd = "s3fs"
)
func newS3fsMounter(b *bucket, cfg *Config) (Mounter, error) {
return &s3fsMounter{
bucket: b,
url: cfg.Endpoint,
region: cfg.Region,
pwFileContent: cfg.AccessKeyID + ":" + cfg.SecretAccessKey,
}, nil
}
func (s3fs *s3fsMounter) Stage(stageTarget string) error {
return nil
}
func (s3fs *s3fsMounter) Unstage(stageTarget string) error {
return nil
}
func (s3fs *s3fsMounter) Mount(source string, target string) error {
if err := writes3fsPass(s3fs.pwFileContent); err != nil {
return err
}
args := []string{
fmt.Sprintf("%s:/%s", s3fs.bucket.Name, s3fs.bucket.FSPath),
fmt.Sprintf("%s", target),
"-o", "use_path_request_style",
"-o", fmt.Sprintf("url=%s", s3fs.url),
"-o", fmt.Sprintf("endpoint=%s", s3fs.region),
"-o", "allow_other",
"-o", "mp_umask=000",
}
return fuseMount(target, s3fsCmd, args)
}
func writes3fsPass(pwFileContent string) error {
pwFileName := fmt.Sprintf("%s/.passwd-s3fs", os.Getenv("HOME"))
pwFile, err := os.OpenFile(pwFileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
_, err = pwFile.WriteString(pwFileContent)
if err != nil {
return err
}
pwFile.Close()
return nil
}

View File

@@ -0,0 +1,214 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package s3
import (
"fmt"
"os"
"github.com/golang/glog"
"golang.org/x/net/context"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/kubernetes/pkg/util/mount"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type nodeServer struct {
*csicommon.DefaultNodeServer
}
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
stagingTargetPath := req.GetStagingTargetPath()
// Check arguments
if req.GetVolumeCapability() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
}
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(stagingTargetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Staging Target path missing in request")
}
if len(targetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
notMnt, err := checkMount(targetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if !notMnt {
return &csi.NodePublishVolumeResponse{}, nil
}
deviceID := ""
if req.GetPublishContext() != nil {
deviceID = req.GetPublishContext()[deviceID]
}
// TODO: Implement readOnly & mountFlags
readOnly := req.GetReadonly()
// TODO: check if attrib is correct with context.
attrib := req.GetVolumeContext()
mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
targetPath, deviceID, readOnly, volumeID, attrib, mountFlags)
s3, err := newS3ClientFromSecrets(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
b, err := s3.getBucket(volumeID)
if err != nil {
return nil, err
}
mounter, err := newMounter(b, s3.cfg)
if err != nil {
return nil, err
}
if err := mounter.Mount(stagingTargetPath, targetPath); err != nil {
return nil, err
}
glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", b.Name, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
// Check arguments
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(targetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
if err := fuseUnmount(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("s3: bucket %s has been unmounted.", volumeID)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
volumeID := req.GetVolumeId()
stagingTargetPath := req.GetStagingTargetPath()
// Check arguments
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(stagingTargetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
if req.VolumeCapability == nil {
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided")
}
notMnt, err := checkMount(stagingTargetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if !notMnt {
return &csi.NodeStageVolumeResponse{}, nil
}
s3, err := newS3ClientFromSecrets(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
b, err := s3.getBucket(volumeID)
if err != nil {
return nil, err
}
mounter, err := newMounter(b, s3.cfg)
if err != nil {
return nil, err
}
if err := mounter.Stage(stagingTargetPath); err != nil {
return nil, err
}
return &csi.NodeStageVolumeResponse{}, nil
}
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
volumeID := req.GetVolumeId()
stagingTargetPath := req.GetStagingTargetPath()
// Check arguments
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(stagingTargetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
return &csi.NodeUnstageVolumeResponse{}, nil
}
// NodeGetCapabilities returns the supported capabilities of the node server
func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
// currently there is a single NodeServer capability according to the spec
nscap := &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
}
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
nscap,
},
}, nil
}
func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
return &csi.NodeExpandVolumeResponse{}, status.Error(codes.Unimplemented, "NodeExpandVolume is not implemented")
}
func checkMount(targetPath string) (bool, error) {
notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath)
if err != nil {
if os.IsNotExist(err) {
if err = os.MkdirAll(targetPath, 0750); err != nil {
return false, err
}
notMnt = true
} else {
return false, err
}
}
return notMnt, nil
}

View File

@@ -0,0 +1,153 @@
package s3
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/url"
"github.com/golang/glog"
"github.com/minio/minio-go"
)
const (
metadataName = ".metadata.json"
fsPrefix = "csi-fs"
)
type s3Client struct {
cfg *Config
minio *minio.Client
}
type bucket struct {
Name string
Mounter string
FSPath string
CapacityBytes int64
}
func newS3Client(cfg *Config) (*s3Client, error) {
var client = &s3Client{}
client.cfg = cfg
u, err := url.Parse(client.cfg.Endpoint)
if err != nil {
return nil, err
}
ssl := u.Scheme == "https"
endpoint := u.Hostname()
if u.Port() != "" {
endpoint = u.Hostname() + ":" + u.Port()
}
minioClient, err := minio.NewWithRegion(endpoint, client.cfg.AccessKeyID, client.cfg.SecretAccessKey, ssl, client.cfg.Region)
if err != nil {
return nil, err
}
client.minio = minioClient
return client, nil
}
func newS3ClientFromSecrets(secrets map[string]string) (*s3Client, error) {
return newS3Client(&Config{
AccessKeyID: secrets["accessKeyID"],
SecretAccessKey: secrets["secretAccessKey"],
Region: secrets["region"],
Endpoint: secrets["endpoint"],
// Mounter is set in the volume preferences, not secrets
Mounter: "",
})
}
func (client *s3Client) bucketExists(bucketName string) (bool, error) {
return client.minio.BucketExists(bucketName)
}
func (client *s3Client) createBucket(bucketName string) error {
return client.minio.MakeBucket(bucketName, client.cfg.Region)
}
func (client *s3Client) createPrefix(bucketName string, prefix string) error {
_, err := client.minio.PutObject(bucketName, prefix+"/", bytes.NewReader([]byte("")), 0, minio.PutObjectOptions{})
if err != nil {
return err
}
return nil
}
func (client *s3Client) removeBucket(bucketName string) error {
if err := client.emptyBucket(bucketName); err != nil {
return err
}
return client.minio.RemoveBucket(bucketName)
}
func (client *s3Client) emptyBucket(bucketName string) error {
objectsCh := make(chan string)
var listErr error
go func() {
defer close(objectsCh)
doneCh := make(chan struct{})
defer close(doneCh)
for object := range client.minio.ListObjects(bucketName, "", true, doneCh) {
if object.Err != nil {
listErr = object.Err
return
}
objectsCh <- object.Key
}
}()
if listErr != nil {
glog.Error("Error listing objects", listErr)
return listErr
}
select {
default:
errorCh := client.minio.RemoveObjects(bucketName, objectsCh)
for e := range errorCh {
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
}
if len(errorCh) != 0 {
return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName)
}
}
// ensure our prefix is also removed
return client.minio.RemoveObject(bucketName, fsPrefix)
}
func (client *s3Client) setBucket(bucket *bucket) error {
b := new(bytes.Buffer)
json.NewEncoder(b).Encode(bucket)
opts := minio.PutObjectOptions{ContentType: "application/json"}
_, err := client.minio.PutObject(bucket.Name, metadataName, b, int64(b.Len()), opts)
return err
}
func (client *s3Client) getBucket(bucketName string) (*bucket, error) {
opts := minio.GetObjectOptions{}
obj, err := client.minio.GetObject(bucketName, metadataName, opts)
if err != nil {
return &bucket{}, err
}
objInfo, err := obj.Stat()
if err != nil {
return &bucket{}, err
}
b := make([]byte, objInfo.Size)
_, err = obj.Read(b)
if err != nil && err != io.EOF {
return &bucket{}, err
}
var meta bucket
err = json.Unmarshal(b, &meta)
return &meta, err
}

View File

@@ -0,0 +1,95 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package s3
import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type s3 struct {
driver *csicommon.CSIDriver
endpoint string
ids *identityServer
ns *nodeServer
cs *controllerServer
}
type s3Volume struct {
VolName string `json:"volName"`
VolID string `json:"volID"`
VolSize int64 `json:"volSize"`
VolPath string `json:"volPath"`
}
var (
vendorVersion = "v1.1.1"
driverName = "ch.ctrox.csi.s3-driver"
)
// NewS3 initializes the driver
func NewS3(nodeID string, endpoint string) (*s3, error) {
driver := csicommon.NewCSIDriver(driverName, vendorVersion, nodeID)
if driver == nil {
glog.Fatalln("Failed to initialize CSI Driver.")
}
s3Driver := &s3{
endpoint: endpoint,
driver: driver,
}
return s3Driver, nil
}
func (s3 *s3) newIdentityServer(d *csicommon.CSIDriver) *identityServer {
return &identityServer{
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
}
}
func (s3 *s3) newControllerServer(d *csicommon.CSIDriver) *controllerServer {
return &controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
}
}
func (s3 *s3) newNodeServer(d *csicommon.CSIDriver) *nodeServer {
return &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
}
}
func (s3 *s3) Run() {
glog.Infof("Driver: %v ", driverName)
glog.Infof("Version: %v ", vendorVersion)
// Initialize default library driver
s3.driver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME})
s3.driver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER})
// Create GRPC servers
s3.ids = s3.newIdentityServer(s3.driver)
s3.ns = s3.newNodeServer(s3.driver)
s3.cs = s3.newControllerServer(s3.driver)
s := csicommon.NewNonBlockingGRPCServer()
s.Start(s3.endpoint, s3.ids, s3.cs, s3.ns)
s.Wait()
}

View File

@@ -0,0 +1,123 @@
package s3_test
import (
"log"
"os"
"github.com/ctrox/csi-s3/pkg/s3"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/kubernetes-csi/csi-test/pkg/sanity"
)
var _ = Describe("S3Driver", func() {
Context("goofys", func() {
socket := "/tmp/csi-goofys.sock"
csiEndpoint := "unix://" + socket
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
driver, err := s3.NewS3("test-node", csiEndpoint)
if err != nil {
log.Fatal(err)
}
go driver.Run()
Describe("CSI sanity", func() {
sanityCfg := &sanity.Config{
TargetPath: os.TempDir() + "/goofys-target",
StagingPath: os.TempDir() + "/goofys-staging",
Address: csiEndpoint,
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "goofys",
},
}
sanity.GinkgoTest(sanityCfg)
})
})
Context("s3fs", func() {
socket := "/tmp/csi-s3fs.sock"
csiEndpoint := "unix://" + socket
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
driver, err := s3.NewS3("test-node", csiEndpoint)
if err != nil {
log.Fatal(err)
}
go driver.Run()
Describe("CSI sanity", func() {
sanityCfg := &sanity.Config{
TargetPath: os.TempDir() + "/s3fs-target",
StagingPath: os.TempDir() + "/s3fs-staging",
Address: csiEndpoint,
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "s3fs",
},
}
sanity.GinkgoTest(sanityCfg)
})
})
Context("s3backer", func() {
socket := "/tmp/csi-s3backer.sock"
csiEndpoint := "unix://" + socket
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
// Clear loop device so we cover the creation of it
os.Remove(s3.S3backerLoopDevice)
driver, err := s3.NewS3("test-node", csiEndpoint)
if err != nil {
log.Fatal(err)
}
go driver.Run()
Describe("CSI sanity", func() {
sanityCfg := &sanity.Config{
TargetPath: os.TempDir() + "/s3backer-target",
StagingPath: os.TempDir() + "/s3backer-staging",
Address: csiEndpoint,
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "s3backer",
},
}
sanity.GinkgoTest(sanityCfg)
})
})
Context("rclone", func() {
socket := "/tmp/csi-rclone.sock"
csiEndpoint := "unix://" + socket
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
driver, err := s3.NewS3("test-node", csiEndpoint)
if err != nil {
log.Fatal(err)
}
go driver.Run()
Describe("CSI sanity", func() {
sanityCfg := &sanity.Config{
TargetPath: os.TempDir() + "/rclone-target",
StagingPath: os.TempDir() + "/rclone-staging",
Address: csiEndpoint,
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "rclone",
},
}
sanity.GinkgoTest(sanityCfg)
})
})
})

View File

@@ -0,0 +1,29 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package s3
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestS3Driver(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "S3Driver")
}

View File

@@ -0,0 +1,106 @@
package s3
import (
"errors"
"fmt"
"io/ioutil"
"os"
"os/exec"
"strings"
"syscall"
"time"
"github.com/mitchellh/go-ps"
"k8s.io/kubernetes/pkg/util/mount"
"github.com/golang/glog"
)
func waitForProcess(p *os.Process, backoff int) error {
if backoff == 20 {
return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid)
}
cmdLine, err := getCmdLine(p.Pid)
if err != nil {
glog.Warningf("Error checking cmdline of PID %v, assuming it is dead: %s", p.Pid, err)
return nil
}
if cmdLine == "" {
// ignore defunct processes
// TODO: debug why this happens in the first place
// seems to only happen on k8s, not on local docker
glog.Warning("Fuse process seems dead, returning")
return nil
}
if err := p.Signal(syscall.Signal(0)); err != nil {
glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err)
return nil
}
glog.Infof("Fuse process with PID %v still active, waiting...", p.Pid)
time.Sleep(time.Duration(backoff*100) * time.Millisecond)
return waitForProcess(p, backoff+1)
}
func waitForMount(path string, timeout time.Duration) error {
var elapsed time.Duration
var interval = 10 * time.Millisecond
for {
notMount, err := mount.New("").IsNotMountPoint(path)
if err != nil {
return err
}
if !notMount {
return nil
}
time.Sleep(interval)
elapsed = elapsed + interval
if elapsed >= timeout {
return errors.New("Timeout waiting for mount")
}
}
}
func findFuseMountProcess(path string) (*os.Process, error) {
processes, err := ps.Processes()
if err != nil {
return nil, err
}
for _, p := range processes {
cmdLine, err := getCmdLine(p.Pid())
if err != nil {
glog.Errorf("Unable to get cmdline of PID %v: %s", p.Pid(), err)
continue
}
if strings.Contains(cmdLine, path) {
glog.Infof("Found matching pid %v on path %s", p.Pid(), path)
return os.FindProcess(p.Pid())
}
}
return nil, nil
}
func getCmdLine(pid int) (string, error) {
cmdLineFile := fmt.Sprintf("/proc/%v/cmdline", pid)
cmdLine, err := ioutil.ReadFile(cmdLineFile)
if err != nil {
return "", err
}
return string(cmdLine), nil
}
func createLoopDevice(device string) error {
if _, err := os.Stat(device); !os.IsNotExist(err) {
return nil
}
args := []string{
device,
"b", "7", "0",
}
cmd := exec.Command("mknod", args...)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Error configuring loop device: %s", out)
}
return nil
}