Loading go/file_transfer/src/common/settings.go +6 −0 Original line number Diff line number Diff line Loading @@ -8,6 +8,12 @@ import ( ) type serverSettings struct { CatCommand string TestCommand string Auth struct { JwksUrl string UserList []string } Http struct { Enabled bool Host string Loading go/file_transfer/src/go.mod +2 −1 Original line number Diff line number Diff line Loading @@ -3,11 +3,12 @@ module remote_data_broker go 1.16 require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 github.com/rabbitmq/amqp091-go v1.8.1 github.com/sirupsen/logrus v1.8.0 github.com/spf13/viper v1.16.0 github.com/stretchr/testify v1.8.3 github.com/stretchr/testify v1.8.4 golang.org/x/sys v0.9.0 // indirect ) go/file_transfer/src/go.sum +4 −1 Original line number Diff line number Diff line Loading @@ -656,6 +656,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= Loading Loading @@ -1004,8 +1006,9 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8= github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= Loading go/file_transfer/src/http_server/download.go +12 −132 Original line number Diff line number Diff line package http_server import ( "encoding/json" "errors" "fmt" "github.com/google/uuid" "github.com/gorilla/mux" "io" "net/http" "os" "path" "remote_data_broker/common" log "remote_data_broker/logger" "remote_data_broker/utils" "strconv" "time" ) func checkFtsApiVersion(w http.ResponseWriter, r *http.Request) (utils.VersionNum, bool) { return utils.PrecheckApiVersion(w, r, common.GetRdbApiVersion()) } type fileDownloadRequest struct { FileName string Token string Loading @@ -31,87 +25,6 @@ func checkRequest(r *http.Request, ver utils.VersionNum) (fileDownloadRequest, i return request, http.StatusOK, nil } func serveFile(w http.ResponseWriter, r *http.Request, fullName string) { _, file := path.Split(fullName) w.Header().Set("Content-Disposition", "attachment; filename=\""+file+"\"") log.WithFields(map[string]interface{}{ "name": fullName, }).Debug("transferring file") http.ServeFile(w, r, fullName) } func serveFileSize(w http.ResponseWriter, r *http.Request, fullName string) { var fsize struct { FileSize int64 `json:"file_size"` } fi, err := os.Stat(fullName) if err != nil { utils.WriteServerError(w, err, http.StatusBadRequest) log.Error("error getting file size for " + fullName + ": " + err.Error()) } log.WithFields(map[string]interface{}{ "name": fullName, "size": fi.Size(), }).Debug("sending file size") fsize.FileSize = fi.Size() b, _ := json.Marshal(&fsize) w.Write(b) } func checkFtsApiVersion(w http.ResponseWriter, r *http.Request) (utils.VersionNum, bool) { return utils.PrecheckApiVersion(w, r, common.GetRdbApiVersion()) } func routeFileUpload(w http.ResponseWriter, r *http.Request) { _, ok := checkFtsApiVersion(w, r) if !ok { utils.WriteServerError(w, errors.New("wrong API version"), http.StatusBadRequest) return } vars := mux.Vars(r) id, ok := vars["id"] if !ok { utils.WriteServerError(w, errors.New("no channel id in request"), http.StatusBadRequest) return } log.Debug("Receiving file from RMQ client: ", id) chunkSize := common.Settings.Http.ChunkSize ch := channels[id] if !ok || ch == nil { utils.WriteServerError(w, errors.New("no channel to redirect request found"), http.StatusBadRequest) return } buffer := make([]byte, chunkSize) for { bytesRead, err := r.Body.Read(buffer) if bytesRead > 0 { select { case ch <- buffer[:bytesRead]: default: log.Error("no message sent") return } } if err != nil { if err != io.EOF { fmt.Println(err) } break } } close(ch) } var channels = make(map[string]chan []byte, 0) func routeFileDownload(w http.ResponseWriter, r *http.Request) { ver, ok := checkFtsApiVersion(w, r) if !ok { Loading @@ -126,46 +39,13 @@ func routeFileDownload(w http.ResponseWriter, r *http.Request) { log.Debug("File download request: ", downloadRequest.FileName) channel := make(chan []byte) id := uuid.New().String() channels[id] = channel msg := common.FileMessage{ Endpoint: common.Settings.Http.Host + ":" + strconv.Itoa(common.Settings.Http.Port) + "/" + common.GetRdbApiVersion() + "/upload/" + id, Filename: downloadRequest.FileName, Token: downloadRequest.Token, remoteQueue := r.URL.Query().Get("remotequeue") asUser := r.URL.Query().Get("asuser") == "true" if remoteQueue != "" { downloadFromRemote(w, downloadRequest) } else if asUser { downloadFromLocalAsUser(w, downloadRequest) } else { downloadFromLocal(w, r, downloadRequest) } msgb, _ := json.Marshal(&msg) err = rmqClient.Publish(msgb) if err != nil { utils.WriteServerError(w, nil, http.StatusInternalServerError) log.Fatal("Cannot publish file request to RMQ, will exit now: ", err) return } timeout := time.Second * time.Duration(common.Settings.Http.Timeout) timer := time.NewTimer(timeout) for { select { case data := <-channel: if data == nil { return } w.Write(data) timer.Reset(timeout) case <-timer.C: fmt.Println("Timeout occurred") utils.WriteServerError(w, errors.New("request timeout"), http.StatusInternalServerError) return } } // sizeonly := r.URL.Query().Get("sizeonly") // if sizeonly != "true" { // serveFile(w, r, fullName) // } else { // serveFileSize(w, r, fullName) // } } go/file_transfer/src/http_server/download_local.go 0 → 100644 +109 −0 Original line number Diff line number Diff line package http_server import ( "bytes" "io" "net/http" "os/exec" "path" "remote_data_broker/common" log "remote_data_broker/logger" "remote_data_broker/utils" "strings" ) func downloadFromLocal(w http.ResponseWriter, r *http.Request, downloadRequest fileDownloadRequest) { _, err := auth.CheckAndGetContent(downloadRequest.Token) if err != nil { log.WithFields(map[string]interface{}{ "name": downloadRequest.FileName, "error": err, }).Error("authorization failed") http.Error(w, "Authorization failed", http.StatusUnauthorized) return } _, file := path.Split(downloadRequest.FileName) w.Header().Set("Content-Disposition", "attachment; filename=\""+file+"\"") log.WithFields(map[string]interface{}{ "name": downloadRequest.FileName, }).Debug("transferring file") http.ServeFile(w, r, downloadRequest.FileName) } func getCommand(cmd string, downloadRequest fileDownloadRequest) (string, []string) { token := downloadRequest.Token cmd = strings.Replace(cmd, "$token", token, -1) cmd = strings.Replace(cmd, "$filename", downloadRequest.FileName, -1) words := utils.FieldsQuoted(cmd) if len(words) > 1 { return words[0], words[1:] } else { return words[0], []string{} } } func executeCheckFileCommand(downloadRequest fileDownloadRequest) ([]byte, error) { name, args := getCommand(common.Settings.TestCommand, downloadRequest) cmd := exec.Command(name, args...) return cmd.CombinedOutput() } func checkFile(w http.ResponseWriter, downloadRequest fileDownloadRequest) bool { out, err := executeCheckFileCommand(downloadRequest) if err != nil { status := http.StatusNotFound if bytes.Contains(out, []byte("token")) { status = http.StatusUnauthorized } http.Error(w, "command execution failed", status) log.WithFields(map[string]interface{}{ "name": downloadRequest.FileName, "error": err, "output": string(out), }).Error("test file execution failed") return false } return true } func downloadFromLocalAsUser(w http.ResponseWriter, downloadRequest fileDownloadRequest) { if !checkFile(w, downloadRequest) { return } name, args := getCommand(common.Settings.CatCommand, downloadRequest) cmd := exec.Command(name, args...) stdout, err := cmd.StdoutPipe() if err != nil { http.Error(w, "Failed to create stdout pipe", http.StatusInternalServerError) return } var errbuf strings.Builder cmd.Stderr = &errbuf err = cmd.Start() if err != nil { http.Error(w, "Failed to start command execution", http.StatusInternalServerError) return } io.Copy(w, stdout) err = cmd.Wait() if err != nil { log.WithFields(map[string]interface{}{ "name": downloadRequest.FileName, "error": err, "stderr": errbuf.String(), }).Error("cat command execution failed") panic("abort downloadFromLocalAsUser") } _, file := path.Split(downloadRequest.FileName) w.Header().Set("Content-Disposition", "attachment; filename=\""+file+"\"") log.WithFields(map[string]interface{}{ "name": downloadRequest.FileName, }).Debug("file sent successfully") } Loading
go/file_transfer/src/common/settings.go +6 −0 Original line number Diff line number Diff line Loading @@ -8,6 +8,12 @@ import ( ) type serverSettings struct { CatCommand string TestCommand string Auth struct { JwksUrl string UserList []string } Http struct { Enabled bool Host string Loading
go/file_transfer/src/go.mod +2 −1 Original line number Diff line number Diff line Loading @@ -3,11 +3,12 @@ module remote_data_broker go 1.16 require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 github.com/rabbitmq/amqp091-go v1.8.1 github.com/sirupsen/logrus v1.8.0 github.com/spf13/viper v1.16.0 github.com/stretchr/testify v1.8.3 github.com/stretchr/testify v1.8.4 golang.org/x/sys v0.9.0 // indirect )
go/file_transfer/src/go.sum +4 −1 Original line number Diff line number Diff line Loading @@ -656,6 +656,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= Loading Loading @@ -1004,8 +1006,9 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8= github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= Loading
go/file_transfer/src/http_server/download.go +12 −132 Original line number Diff line number Diff line package http_server import ( "encoding/json" "errors" "fmt" "github.com/google/uuid" "github.com/gorilla/mux" "io" "net/http" "os" "path" "remote_data_broker/common" log "remote_data_broker/logger" "remote_data_broker/utils" "strconv" "time" ) func checkFtsApiVersion(w http.ResponseWriter, r *http.Request) (utils.VersionNum, bool) { return utils.PrecheckApiVersion(w, r, common.GetRdbApiVersion()) } type fileDownloadRequest struct { FileName string Token string Loading @@ -31,87 +25,6 @@ func checkRequest(r *http.Request, ver utils.VersionNum) (fileDownloadRequest, i return request, http.StatusOK, nil } func serveFile(w http.ResponseWriter, r *http.Request, fullName string) { _, file := path.Split(fullName) w.Header().Set("Content-Disposition", "attachment; filename=\""+file+"\"") log.WithFields(map[string]interface{}{ "name": fullName, }).Debug("transferring file") http.ServeFile(w, r, fullName) } func serveFileSize(w http.ResponseWriter, r *http.Request, fullName string) { var fsize struct { FileSize int64 `json:"file_size"` } fi, err := os.Stat(fullName) if err != nil { utils.WriteServerError(w, err, http.StatusBadRequest) log.Error("error getting file size for " + fullName + ": " + err.Error()) } log.WithFields(map[string]interface{}{ "name": fullName, "size": fi.Size(), }).Debug("sending file size") fsize.FileSize = fi.Size() b, _ := json.Marshal(&fsize) w.Write(b) } func checkFtsApiVersion(w http.ResponseWriter, r *http.Request) (utils.VersionNum, bool) { return utils.PrecheckApiVersion(w, r, common.GetRdbApiVersion()) } func routeFileUpload(w http.ResponseWriter, r *http.Request) { _, ok := checkFtsApiVersion(w, r) if !ok { utils.WriteServerError(w, errors.New("wrong API version"), http.StatusBadRequest) return } vars := mux.Vars(r) id, ok := vars["id"] if !ok { utils.WriteServerError(w, errors.New("no channel id in request"), http.StatusBadRequest) return } log.Debug("Receiving file from RMQ client: ", id) chunkSize := common.Settings.Http.ChunkSize ch := channels[id] if !ok || ch == nil { utils.WriteServerError(w, errors.New("no channel to redirect request found"), http.StatusBadRequest) return } buffer := make([]byte, chunkSize) for { bytesRead, err := r.Body.Read(buffer) if bytesRead > 0 { select { case ch <- buffer[:bytesRead]: default: log.Error("no message sent") return } } if err != nil { if err != io.EOF { fmt.Println(err) } break } } close(ch) } var channels = make(map[string]chan []byte, 0) func routeFileDownload(w http.ResponseWriter, r *http.Request) { ver, ok := checkFtsApiVersion(w, r) if !ok { Loading @@ -126,46 +39,13 @@ func routeFileDownload(w http.ResponseWriter, r *http.Request) { log.Debug("File download request: ", downloadRequest.FileName) channel := make(chan []byte) id := uuid.New().String() channels[id] = channel msg := common.FileMessage{ Endpoint: common.Settings.Http.Host + ":" + strconv.Itoa(common.Settings.Http.Port) + "/" + common.GetRdbApiVersion() + "/upload/" + id, Filename: downloadRequest.FileName, Token: downloadRequest.Token, remoteQueue := r.URL.Query().Get("remotequeue") asUser := r.URL.Query().Get("asuser") == "true" if remoteQueue != "" { downloadFromRemote(w, downloadRequest) } else if asUser { downloadFromLocalAsUser(w, downloadRequest) } else { downloadFromLocal(w, r, downloadRequest) } msgb, _ := json.Marshal(&msg) err = rmqClient.Publish(msgb) if err != nil { utils.WriteServerError(w, nil, http.StatusInternalServerError) log.Fatal("Cannot publish file request to RMQ, will exit now: ", err) return } timeout := time.Second * time.Duration(common.Settings.Http.Timeout) timer := time.NewTimer(timeout) for { select { case data := <-channel: if data == nil { return } w.Write(data) timer.Reset(timeout) case <-timer.C: fmt.Println("Timeout occurred") utils.WriteServerError(w, errors.New("request timeout"), http.StatusInternalServerError) return } } // sizeonly := r.URL.Query().Get("sizeonly") // if sizeonly != "true" { // serveFile(w, r, fullName) // } else { // serveFileSize(w, r, fullName) // } }
go/file_transfer/src/http_server/download_local.go 0 → 100644 +109 −0 Original line number Diff line number Diff line package http_server import ( "bytes" "io" "net/http" "os/exec" "path" "remote_data_broker/common" log "remote_data_broker/logger" "remote_data_broker/utils" "strings" ) func downloadFromLocal(w http.ResponseWriter, r *http.Request, downloadRequest fileDownloadRequest) { _, err := auth.CheckAndGetContent(downloadRequest.Token) if err != nil { log.WithFields(map[string]interface{}{ "name": downloadRequest.FileName, "error": err, }).Error("authorization failed") http.Error(w, "Authorization failed", http.StatusUnauthorized) return } _, file := path.Split(downloadRequest.FileName) w.Header().Set("Content-Disposition", "attachment; filename=\""+file+"\"") log.WithFields(map[string]interface{}{ "name": downloadRequest.FileName, }).Debug("transferring file") http.ServeFile(w, r, downloadRequest.FileName) } func getCommand(cmd string, downloadRequest fileDownloadRequest) (string, []string) { token := downloadRequest.Token cmd = strings.Replace(cmd, "$token", token, -1) cmd = strings.Replace(cmd, "$filename", downloadRequest.FileName, -1) words := utils.FieldsQuoted(cmd) if len(words) > 1 { return words[0], words[1:] } else { return words[0], []string{} } } func executeCheckFileCommand(downloadRequest fileDownloadRequest) ([]byte, error) { name, args := getCommand(common.Settings.TestCommand, downloadRequest) cmd := exec.Command(name, args...) return cmd.CombinedOutput() } func checkFile(w http.ResponseWriter, downloadRequest fileDownloadRequest) bool { out, err := executeCheckFileCommand(downloadRequest) if err != nil { status := http.StatusNotFound if bytes.Contains(out, []byte("token")) { status = http.StatusUnauthorized } http.Error(w, "command execution failed", status) log.WithFields(map[string]interface{}{ "name": downloadRequest.FileName, "error": err, "output": string(out), }).Error("test file execution failed") return false } return true } func downloadFromLocalAsUser(w http.ResponseWriter, downloadRequest fileDownloadRequest) { if !checkFile(w, downloadRequest) { return } name, args := getCommand(common.Settings.CatCommand, downloadRequest) cmd := exec.Command(name, args...) stdout, err := cmd.StdoutPipe() if err != nil { http.Error(w, "Failed to create stdout pipe", http.StatusInternalServerError) return } var errbuf strings.Builder cmd.Stderr = &errbuf err = cmd.Start() if err != nil { http.Error(w, "Failed to start command execution", http.StatusInternalServerError) return } io.Copy(w, stdout) err = cmd.Wait() if err != nil { log.WithFields(map[string]interface{}{ "name": downloadRequest.FileName, "error": err, "stderr": errbuf.String(), }).Error("cat command execution failed") panic("abort downloadFromLocalAsUser") } _, file := path.Split(downloadRequest.FileName) w.Header().Set("Content-Disposition", "attachment; filename=\""+file+"\"") log.WithFields(map[string]interface{}{ "name": downloadRequest.FileName, }).Debug("file sent successfully") }