diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 74cf8033076608de4ccc8759b0de58337b57cbfb..13b4b1149e8837cb2d6c4561afa6b26a48f2728a 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -6,7 +6,7 @@ stages: variables: GIT_STRATEGY: clone CONTAINER_RDM_URL: "${CI_REGISTRY_IMAGE}/remote-data-broker" - TAG: 0.2.2 + TAG: 0.3.0 # This import is for the func_rse_docker_* functions before_script: diff --git a/CMakeLists.txt b/CMakeLists.txt index 32dbabf98a31fdabae356ff0dd67d2a26111fa3b..412a785b907c9638d518a78fcea3f35c98daa22b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,8 +22,8 @@ SET(CPACK_DEBIAN_PACKAGE_MAINTAINER "ORNL") SET(CPACK_RPM_PACKAGE_MAINTAINER "ORNL") set(CPACK_PACKAGE_VERSION_MAJOR "0") -set(CPACK_PACKAGE_VERSION_MINOR "2") -set(CPACK_PACKAGE_VERSION_PATCH "2") +set(CPACK_PACKAGE_VERSION_MINOR "3") +set(CPACK_PACKAGE_VERSION_PATCH "0") INCLUDE(CPack) diff --git a/src/common/settings.go b/src/common/settings.go index 009938cd3236d1699cb29f6d63b3f630032475b7..8879418abf0b62ca1710e7775ef377a9fcbdee44 100644 --- a/src/common/settings.go +++ b/src/common/settings.go @@ -7,9 +7,10 @@ import ( ) type serverSettings struct { - CatCommand string - TestCommand string - Auth struct { + CatCommand string + DeleteCommand string + TestCommand string + Auth struct { Enabled bool JwksUrl string UserList []string @@ -36,6 +37,7 @@ var Settings serverSettings func ReadConfig(fname string) (log.Level, error) { // need to set defaults for all nested values, othervise Viper does not read them from environment! viper.SetDefault("CatCommand", "cat $filename") + viper.SetDefault("DeleteCommand", "delete $filename") viper.SetDefault("TestCommand", "test -r $filename") viper.SetDefault("Http.Enabled", true) viper.SetDefault("Http.EndpointUrl", "http://localhost") diff --git a/src/common/structs.go b/src/common/structs.go index 70fb65f08c2408aa81370bea7cb80b113a967ece..b75881fb73f0df6d9b197cc56a72dad589f91935 100644 --- a/src/common/structs.go +++ b/src/common/structs.go @@ -1,7 +1,8 @@ package common type FileMessage struct { - Endpoint string - Filename string - Token string + Endpoint string + Filename string + Operation string + Token string } diff --git a/src/common/version.go b/src/common/version.go index 150b490473b9678ed64e5bbaf200e48bda49368c..0c45b65ce493fcfda7eab5bb719feddeff253e26 100644 --- a/src/common/version.go +++ b/src/common/version.go @@ -9,7 +9,7 @@ import ( var version string func init() { - version = "0.2.2" + version = "0.3.0" rdbApiVersion = "v0.1" } diff --git a/src/http_server/api_test.go b/src/http_server/api_test.go new file mode 100644 index 0000000000000000000000000000000000000000..5725473ed68e1667f9a34ad6334f3b029738a87e --- /dev/null +++ b/src/http_server/api_test.go @@ -0,0 +1,29 @@ +package http_server + +import ( + "github.com/stretchr/testify/assert" + "net/http" + "net/http/httptest" + "remote_data_broker/utils" + "strings" + "testing" +) + +func makeRequest(request interface{}) string { + buf, _ := utils.MapToJson(request) + return string(buf) +} + +func doPostRequest(path string, buf string) *httptest.ResponseRecorder { + mux := utils.NewRouter(listRoutes) + req, _ := http.NewRequest("POST", path, strings.NewReader(buf)) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + return w +} + +func TestWrongApiVersion(t *testing.T) { + request := makeRequest(fileOpRequest{"folder", "fname"}) + w := doPostRequest("/v100000.2/download?sizeonly=true", request) + assert.Equal(t, http.StatusUnsupportedMediaType, w.Code, "wrong api version") +} diff --git a/src/http_server/common.go b/src/http_server/common.go new file mode 100644 index 0000000000000000000000000000000000000000..e93e5aab13fd21c3e028992a1604e9edd6bd977f --- /dev/null +++ b/src/http_server/common.go @@ -0,0 +1,85 @@ +package http_server + +import ( + "bytes" + "net/http" + "os/exec" + "remote_data_broker/common" + log "remote_data_broker/logger" + "remote_data_broker/utils" + "strings" +) + +var channels = make(map[string]chan []byte, 0) + +func checkFtsApiVersion(w http.ResponseWriter, r *http.Request) (utils.VersionNum, bool) { + return utils.PrecheckApiVersion(w, r, common.GetRdbApiVersion()) +} + +const ( + OpDownload = "download" + OpDelete = "delete" +) + +type fileOpRequest struct { + FileName string + Token string +} + +type FileOpResult struct { + Operation string + StatusCode int +} + +func checkRequest(r *http.Request, ver utils.VersionNum) (fileOpRequest, int, error) { + var request fileOpRequest + err := utils.ExtractRequest(r, &request) + if err != nil { + return fileOpRequest{}, http.StatusBadRequest, err + } + return request, http.StatusOK, nil +} + +type FileOperation interface { + LocalAsServiceUser(w http.ResponseWriter, r *http.Request, request fileOpRequest) + LocalAsUser(w http.ResponseWriter, downloadRequest fileOpRequest) + Remote(w http.ResponseWriter, downloadRequest fileOpRequest, remoteQueue string) + Operation() string +} + +func executeCommand(request fileOpRequest, command string) ([]byte, error) { + name, args := getCommand(command, request) + cmd := exec.Command(name, args...) + return cmd.CombinedOutput() +} + +func executeCommandAsRoot(w http.ResponseWriter, request fileOpRequest, command string) bool { + out, err := executeCommand(request, command) + if err != nil { + status := http.StatusNotFound + if bytes.Contains(out, []byte("token")) { + status = http.StatusUnauthorized + } + http.Error(w, "command execution failed", status) + log.WithFields(map[string]interface{}{ + "name": request.FileName, + "command": command, + "error": err, + "output": string(out), + }).Error("execution failed") + return false + } + return true +} + +func getCommand(cmd string, request fileOpRequest) (string, []string) { + token := request.Token + cmd = strings.Replace(cmd, "$token", token, -1) + cmd = strings.Replace(cmd, "$filename", request.FileName, -1) + words := utils.FieldsQuoted(cmd) + if len(words) > 1 { + return words[0], words[1:] + } else { + return words[0], []string{} + } +} diff --git a/src/http_server/delete.go b/src/http_server/delete.go new file mode 100644 index 0000000000000000000000000000000000000000..f89b3efcec5a4ea05438af464449624efdd051e4 --- /dev/null +++ b/src/http_server/delete.go @@ -0,0 +1,86 @@ +package http_server + +import ( + "bytes" + "encoding/json" + "net/http" + "os" + "remote_data_broker/common" + log "remote_data_broker/logger" + "remote_data_broker/utils" +) + +func routeFileDelete(w http.ResponseWriter, r *http.Request) { + fileOp(w, r, OpDelete) +} + +type FileDelete struct { +} + +func (op FileDelete) Operation() string { + return OpDownload +} + +func (op FileDelete) LocalAsServiceUser(w http.ResponseWriter, r *http.Request, request fileOpRequest) { + if common.Settings.Auth.Enabled { + _, err := auth.CheckAndGetContent(request.Token) + if err != nil { + log.WithFields(map[string]interface{}{ + "name": request.FileName, + "error": err, + }).Error("authorization failed") + http.Error(w, "Authorization failed", http.StatusUnauthorized) + return + } + } + + if _, err := os.Stat(request.FileName); err != nil { + log.WithFields(map[string]interface{}{ + "name": request.FileName, + "error": err, + }).Error("file not found") + http.Error(w, "file not found", http.StatusNotFound) + return + } + + if err := os.Remove(request.FileName); err != nil { + http.Error(w, "", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) + log.WithFields(map[string]interface{}{ + "name": request.FileName, + "mode": "as rdb user", + }).Debug("File deleted") + +} + +func (op FileDelete) LocalAsUser(w http.ResponseWriter, request fileOpRequest) { + if !executeCommandAsRoot(w, request, common.Settings.DeleteCommand) { + return + } + + w.WriteHeader(http.StatusNoContent) + log.WithFields(map[string]interface{}{ + "name": request.FileName, + "mode": "as rdb user", + }).Debug("File deleted") + +} + +func (op FileDelete) Remote(w http.ResponseWriter, request fileOpRequest, remoteQueue string) { + var b bytes.Buffer + err := remoteOperation(&b, request, remoteQueue, op.Operation()) + if err != nil { + utils.WriteServerError(w, err, http.StatusInternalServerError) + return + } + var opResult FileOpResult + err = json.Unmarshal(b.Bytes(), &opResult) + if err != nil || opResult.Operation != OpDelete { + utils.WriteServerError(w, err, http.StatusInternalServerError) + return + } + w.WriteHeader(opResult.StatusCode) +} diff --git a/src/http_server/delete_test.go b/src/http_server/delete_test.go new file mode 100644 index 0000000000000000000000000000000000000000..fcabad061ab6552f6b02c2d9645be41bd1c210fb --- /dev/null +++ b/src/http_server/delete_test.go @@ -0,0 +1,74 @@ +package http_server + +import ( + "encoding/json" + "github.com/stretchr/testify/assert" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "remote_data_broker/common" + "remote_data_broker/utils" + "testing" + "time" +) + +var deleteFileTests = []struct { + fname string + token string + query string + status int + message string +}{ + {"exists", "token", "&asuser=true", http.StatusNoContent, "file delete as user, ok"}, + {"exists", "token", "", http.StatusNoContent, "file delete, ok"}, + {"exists", "wrong_token", "", http.StatusUnauthorized, "file delete, wrong token"}, + {"not_exists", "token", "", http.StatusNotFound, "file delete, not exists"}, + {"not_exists", "token", "&asuser=true", http.StatusNotFound, "file delete as user, not exist"}, +} + +func TestDeleteFile(t *testing.T) { + os.MkdirAll(filepath.Clean("folder"), os.ModePerm) + defer os.RemoveAll("folder") + auth = &utils.MockAuth{} + common.Settings.Auth.Enabled = true + common.Settings.DeleteCommand = "rm $filename" + common.Settings.TestCommand = "test -r $filename" + for _, test := range deleteFileTests { + ioutil.WriteFile(filepath.Clean("folder/exists"), []byte("hello"), 0644) + request := makeRequest(fileOpRequest{"folder/" + test.fname, test.token}) + w := doPostRequest("/v0.1/delete", request) + if test.status == http.StatusNoContent { + assert.NoFileExists(t, "folder/exists", test.message) + } + assert.Equal(t, test.status, w.Code, test.message) + } +} + +func TestDeleteRemote(t *testing.T) { + common.Settings.Http.ChunkSize = 10000 + common.Settings.Http.Timeout = 3 + rmqClient.Init("amqp://guest:guest@127.0.0.1:5672/") + defer rmqClient.Close() + request := makeRequest(fileOpRequest{"folder", "test"}) + respChan := make(chan *httptest.ResponseRecorder) + go func() { + w := doPostRequest("/v0.1/delete?remotequeue=test_queue", request) + respChan <- w + }() + time.Sleep(time.Millisecond * 10) + var channelId string + for k, _ := range channels { + channelId = k + } + res := FileOpResult{ + Operation: OpDelete, + StatusCode: http.StatusNoContent, + } + resb, _ := json.Marshal(&res) + doPostRequest("/v0.1/uploadresult/"+channelId, string(resb)) + + w := <-respChan + assert.Equal(t, http.StatusNoContent, w.Code, "test.message") +} diff --git a/src/http_server/download.go b/src/http_server/download.go index 867e6996633831c4179a2ac3028e90fa953dec77..94d89b9f0de3f8d55eef739b2e4629a9df061f81 100644 --- a/src/http_server/download.go +++ b/src/http_server/download.go @@ -1,51 +1,95 @@ package http_server import ( + "io" "net/http" + "os/exec" + "path" "remote_data_broker/common" log "remote_data_broker/logger" "remote_data_broker/utils" + "strings" ) -func checkFtsApiVersion(w http.ResponseWriter, r *http.Request) (utils.VersionNum, bool) { - return utils.PrecheckApiVersion(w, r, common.GetRdbApiVersion()) +func routeFileDownload(w http.ResponseWriter, r *http.Request) { + fileOp(w, r, OpDownload) } -type fileDownloadRequest struct { - FileName string - Token string +type FileDownload struct { } -func checkRequest(r *http.Request, ver utils.VersionNum) (fileDownloadRequest, int, error) { - var request fileDownloadRequest - err := utils.ExtractRequest(r, &request) - if err != nil { - return fileDownloadRequest{}, http.StatusBadRequest, err +func (op FileDownload) Operation() string { + return OpDownload +} + +func (op FileDownload) LocalAsServiceUser(w http.ResponseWriter, r *http.Request, request fileOpRequest) { + if common.Settings.Auth.Enabled { + _, err := auth.CheckAndGetContent(request.Token) + if err != nil { + log.WithFields(map[string]interface{}{ + "name": request.FileName, + "error": err, + }).Error("authorization failed") + http.Error(w, "Authorization failed", http.StatusUnauthorized) + return + } } - return request, http.StatusOK, nil + + _, file := path.Split(request.FileName) + w.Header().Set("Content-Disposition", "attachment; filename=\""+file+"\"") + + log.WithFields(map[string]interface{}{ + "name": request.FileName, + "mode": "as rdb user", + }).Debug("Transferring file") + + http.ServeFile(w, r, request.FileName) } -func routeFileDownload(w http.ResponseWriter, r *http.Request) { - ver, ok := checkFtsApiVersion(w, r) - if !ok { +func (op FileDownload) LocalAsUser(w http.ResponseWriter, request fileOpRequest) { + if !executeCommandAsRoot(w, request, common.Settings.TestCommand) { + return + } + + name, args := getCommand(common.Settings.CatCommand, request) + cmd := exec.Command(name, args...) + stdout, err := cmd.StdoutPipe() + if err != nil { + http.Error(w, "Failed to create stdout pipe", http.StatusInternalServerError) return } + var errbuf strings.Builder + cmd.Stderr = &errbuf - downloadRequest, status, err := checkRequest(r, ver) + err = cmd.Start() if err != nil { - utils.WriteServerError(w, err, status) + http.Error(w, "Failed to start command execution", http.StatusInternalServerError) return } + io.Copy(w, stdout) - log.Debug("File download request: ", downloadRequest.FileName) + err = cmd.Wait() + if err != nil { + log.WithFields(map[string]interface{}{ + "name": request.FileName, + "error": err, + "stderr": errbuf.String(), + }).Error("cat command execution failed") + panic("abort downloadFromLocalAsUser") + } - remoteQueue := r.URL.Query().Get("remotequeue") - asUser := r.URL.Query().Get("asuser") == "true" - if remoteQueue != "" { - downloadFromRemote(w, downloadRequest, remoteQueue) - } else if asUser { - downloadFromLocalAsUser(w, downloadRequest) - } else { - downloadFromLocal(w, r, downloadRequest) + _, file := path.Split(request.FileName) + w.Header().Set("Content-Disposition", "attachment; filename=\""+file+"\"") + log.WithFields(map[string]interface{}{ + "name": request.FileName, + "mode": "as user", + }).Debug("File sent successfully") +} + +func (op FileDownload) Remote(w http.ResponseWriter, request fileOpRequest, remoteQueue string) { + err := remoteOperation(w, request, remoteQueue, op.Operation()) + if err != nil { + utils.WriteServerError(w, err, http.StatusInternalServerError) + return } } diff --git a/src/http_server/download_local.go b/src/http_server/download_local.go deleted file mode 100644 index e861887f1d803f13e8f9771780d90867b2ad61ee..0000000000000000000000000000000000000000 --- a/src/http_server/download_local.go +++ /dev/null @@ -1,113 +0,0 @@ -package http_server - -import ( - "bytes" - "io" - "net/http" - "os/exec" - "path" - "remote_data_broker/common" - log "remote_data_broker/logger" - "remote_data_broker/utils" - "strings" -) - -func downloadFromLocal(w http.ResponseWriter, r *http.Request, downloadRequest fileDownloadRequest) { - if common.Settings.Auth.Enabled { - _, err := auth.CheckAndGetContent(downloadRequest.Token) - if err != nil { - log.WithFields(map[string]interface{}{ - "name": downloadRequest.FileName, - "error": err, - }).Error("authorization failed") - http.Error(w, "Authorization failed", http.StatusUnauthorized) - return - } - } - - _, file := path.Split(downloadRequest.FileName) - w.Header().Set("Content-Disposition", "attachment; filename=\""+file+"\"") - - log.WithFields(map[string]interface{}{ - "name": downloadRequest.FileName, - "mode": "as rdb user", - }).Debug("Transferring file") - - http.ServeFile(w, r, downloadRequest.FileName) -} - -func getCommand(cmd string, downloadRequest fileDownloadRequest) (string, []string) { - token := downloadRequest.Token - cmd = strings.Replace(cmd, "$token", token, -1) - cmd = strings.Replace(cmd, "$filename", downloadRequest.FileName, -1) - words := utils.FieldsQuoted(cmd) - if len(words) > 1 { - return words[0], words[1:] - } else { - return words[0], []string{} - } -} - -func executeCheckFileCommand(downloadRequest fileDownloadRequest) ([]byte, error) { - name, args := getCommand(common.Settings.TestCommand, downloadRequest) - cmd := exec.Command(name, args...) - return cmd.CombinedOutput() -} - -func checkFile(w http.ResponseWriter, downloadRequest fileDownloadRequest) bool { - out, err := executeCheckFileCommand(downloadRequest) - if err != nil { - status := http.StatusNotFound - if bytes.Contains(out, []byte("token")) { - status = http.StatusUnauthorized - } - http.Error(w, "command execution failed", status) - log.WithFields(map[string]interface{}{ - "name": downloadRequest.FileName, - "error": err, - "output": string(out), - }).Error("test file execution failed") - return false - } - return true -} - -func downloadFromLocalAsUser(w http.ResponseWriter, downloadRequest fileDownloadRequest) { - if !checkFile(w, downloadRequest) { - return - } - - name, args := getCommand(common.Settings.CatCommand, downloadRequest) - cmd := exec.Command(name, args...) - stdout, err := cmd.StdoutPipe() - if err != nil { - http.Error(w, "Failed to create stdout pipe", http.StatusInternalServerError) - return - } - var errbuf strings.Builder - cmd.Stderr = &errbuf - - err = cmd.Start() - if err != nil { - http.Error(w, "Failed to start command execution", http.StatusInternalServerError) - return - } - io.Copy(w, stdout) - - err = cmd.Wait() - if err != nil { - log.WithFields(map[string]interface{}{ - "name": downloadRequest.FileName, - "error": err, - "stderr": errbuf.String(), - }).Error("cat command execution failed") - panic("abort downloadFromLocalAsUser") - } - - _, file := path.Split(downloadRequest.FileName) - w.Header().Set("Content-Disposition", "attachment; filename=\""+file+"\"") - log.WithFields(map[string]interface{}{ - "name": downloadRequest.FileName, - "mode": "as user", - }).Debug("File sent successfully") -} diff --git a/src/http_server/download_local_test.go b/src/http_server/download_local_test.go deleted file mode 100644 index cd20100b36ab23418d715ba34a8c9577eae49c6d..0000000000000000000000000000000000000000 --- a/src/http_server/download_local_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package http_server - -import ( - "github.com/stretchr/testify/assert" - "io/ioutil" - "net/http" - "os" - "path/filepath" - "remote_data_broker/common" - "remote_data_broker/utils" - "testing" -) - -var downloadFileTests = []struct { - fname string - token string - query string - status int - message string -}{ - {"exists", "token", "&asuser=true", http.StatusOK, "file download as user, ok"}, - {"exists", "token", "", http.StatusOK, "file download, ok"}, - {"exists", "wrong_token", "", http.StatusUnauthorized, "file download, wrong token"}, - {"not_exists", "token", "", http.StatusNotFound, "file download, not exists"}, - {"not_exists", "token", "&asuser=true", http.StatusNotFound, "file download as user, not exist"}, -} - -func TestDownloadFile(t *testing.T) { - os.MkdirAll(filepath.Clean("folder"), os.ModePerm) - ioutil.WriteFile(filepath.Clean("folder/exists"), []byte("hello"), 0644) - defer os.RemoveAll("folder") - auth = &utils.MockAuth{} - common.Settings.Auth.Enabled = true - common.Settings.CatCommand = "cat $filename" - common.Settings.TestCommand = "test -r $filename" - for _, test := range downloadFileTests { - request := makeRequest(fileDownloadRequest{"folder/" + test.fname, test.token}) - w := doPostRequest("/v0.1/download", request) - if test.status == http.StatusOK { - body, _ := ioutil.ReadAll(w.Body) - body_str := string(body) - assert.Equal(t, test.status, w.Code, test.message) - assert.Contains(t, w.Header().Get("Content-Disposition"), test.fname, test.message) - assert.Equal(t, "hello", body_str, test.message) - - } - assert.Equal(t, test.status, w.Code, test.message) - } -} diff --git a/src/http_server/download_remote_test.go b/src/http_server/download_remote_test.go deleted file mode 100644 index 94e8d3d2146c1b3351f64ff13e5ebac6272e7c86..0000000000000000000000000000000000000000 --- a/src/http_server/download_remote_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package http_server - -import ( - "github.com/stretchr/testify/assert" - "io/ioutil" - "net/http/httptest" - "remote_data_broker/common" - "testing" - "time" -) - -func TestDownloadRemote(t *testing.T) { - common.Settings.Http.ChunkSize = 10000 - common.Settings.Http.Timeout = 3 - rmqClient.Init("amqp://guest:guest@127.0.0.1:5672/") - defer rmqClient.Close() - request := makeRequest(fileDownloadRequest{"folder", "test"}) - respChan := make(chan *httptest.ResponseRecorder) - go func() { - w := doPostRequest("/v0.1/download?remotequeue=test_queue", request) - respChan <- w - }() - time.Sleep(time.Millisecond * 10) - var channelId string - for k, _ := range channels { - channelId = k - } - doPostRequest("/v0.1/upload/"+channelId, "hello") - - w := <-respChan - - body, _ := ioutil.ReadAll(w.Body) - body_str := string(body) - assert.Equal(t, "hello", body_str, "test.message") -} diff --git a/src/http_server/download_test.go b/src/http_server/download_test.go index 46fe24a44c338143b4454da00754b87762a1f56c..5f91317b034830f68faba9b6a167b95e3d8c6117 100644 --- a/src/http_server/download_test.go +++ b/src/http_server/download_test.go @@ -2,28 +2,75 @@ package http_server import ( "github.com/stretchr/testify/assert" + "io/ioutil" "net/http" "net/http/httptest" + "os" + "path/filepath" + "remote_data_broker/common" "remote_data_broker/utils" - "strings" "testing" + "time" ) -func makeRequest(request interface{}) string { - buf, _ := utils.MapToJson(request) - return string(buf) +var downloadFileTests = []struct { + fname string + token string + query string + status int + message string +}{ + {"exists", "token", "&asuser=true", http.StatusOK, "file download as user, ok"}, + {"exists", "token", "", http.StatusOK, "file download, ok"}, + {"exists", "wrong_token", "", http.StatusUnauthorized, "file download, wrong token"}, + {"not_exists", "token", "", http.StatusNotFound, "file download, not exists"}, + {"not_exists", "token", "&asuser=true", http.StatusNotFound, "file download as user, not exist"}, } -func doPostRequest(path string, buf string) *httptest.ResponseRecorder { - mux := utils.NewRouter(listRoutes) - req, _ := http.NewRequest("POST", path, strings.NewReader(buf)) - w := httptest.NewRecorder() - mux.ServeHTTP(w, req) - return w +func TestDownloadFile(t *testing.T) { + os.MkdirAll(filepath.Clean("folder"), os.ModePerm) + ioutil.WriteFile(filepath.Clean("folder/exists"), []byte("hello"), 0644) + defer os.RemoveAll("folder") + auth = &utils.MockAuth{} + common.Settings.Auth.Enabled = true + common.Settings.CatCommand = "cat $filename" + common.Settings.TestCommand = "test -r $filename" + for _, test := range downloadFileTests { + request := makeRequest(fileOpRequest{"folder/" + test.fname, test.token}) + w := doPostRequest("/v0.1/download", request) + if test.status == http.StatusOK { + body, _ := ioutil.ReadAll(w.Body) + body_str := string(body) + assert.Equal(t, test.status, w.Code, test.message) + assert.Contains(t, w.Header().Get("Content-Disposition"), test.fname, test.message) + assert.Equal(t, "hello", body_str, test.message) + + } + assert.Equal(t, test.status, w.Code, test.message) + } } -func TestDownloadWrongApiVersion(t *testing.T) { - request := makeRequest(fileDownloadRequest{"folder", "fname"}) - w := doPostRequest("/v100000.2/download?sizeonly=true", request) - assert.Equal(t, http.StatusUnsupportedMediaType, w.Code, "wrong api version") +func TestDownloadRemote(t *testing.T) { + common.Settings.Http.ChunkSize = 10000 + common.Settings.Http.Timeout = 3 + rmqClient.Init("amqp://guest:guest@127.0.0.1:5672/") + defer rmqClient.Close() + request := makeRequest(fileOpRequest{"folder", "test"}) + respChan := make(chan *httptest.ResponseRecorder) + go func() { + w := doPostRequest("/v0.1/download?remotequeue=test_queue", request) + respChan <- w + }() + time.Sleep(time.Millisecond * 10) + var channelId string + for k, _ := range channels { + channelId = k + } + doPostRequest("/v0.1/uploadresult/"+channelId, "hello") + + w := <-respChan + + body, _ := ioutil.ReadAll(w.Body) + body_str := string(body) + assert.Equal(t, "hello", body_str, "test.message") } diff --git a/src/http_server/listroutes.go b/src/http_server/listroutes.go index 0f59d3a4d47950b73d469631a92ff60097ffafed..c0fe48b607e95dfaf1d53540773b2d81f51d1a05 100644 --- a/src/http_server/listroutes.go +++ b/src/http_server/listroutes.go @@ -11,11 +11,17 @@ var listRoutes = utils.Routes{ "/{apiver}/download", routeFileDownload, }, + utils.Route{ + "Delete File", + "POST", + "/{apiver}/delete", + routeFileDelete, + }, utils.Route{ "Receive File", "POST", - "/{apiver}/upload/{id}", - routeFileUpload, + "/{apiver}/uploadresult/{id}", + routeFileOpResult, }, utils.Route{ "HealthCheck", diff --git a/src/http_server/download_remote.go b/src/http_server/operation.go similarity index 60% rename from src/http_server/download_remote.go rename to src/http_server/operation.go index a8b2330a54d7e485dbd6c5ec870cd88013111ce1..fb23b48db41076b8fafddd3c6006b8ca3449b0c3 100644 --- a/src/http_server/download_remote.go +++ b/src/http_server/operation.go @@ -13,7 +13,39 @@ import ( "time" ) -func routeFileUpload(w http.ResponseWriter, r *http.Request) { +func fileOp(w http.ResponseWriter, r *http.Request, op string) { + ver, ok := checkFtsApiVersion(w, r) + if !ok { + return + } + + request, status, err := checkRequest(r, ver) + if err != nil { + utils.WriteServerError(w, err, status) + return + } + + log.Debug("File "+op+" request: ", request.FileName) + + var opHandler FileOperation + switch op { + case OpDelete: + opHandler = &FileDelete{} + case OpDownload: + opHandler = FileDownload{} + } + remoteQueue := r.URL.Query().Get("remotequeue") + asUser := r.URL.Query().Get("asuser") == "true" + if remoteQueue != "" { + opHandler.Remote(w, request, remoteQueue) + } else if asUser { + opHandler.LocalAsUser(w, request) + } else { + opHandler.LocalAsServiceUser(w, r, request) + } +} + +func routeFileOpResult(w http.ResponseWriter, r *http.Request) { _, ok := checkFtsApiVersion(w, r) if !ok { utils.WriteServerError(w, errors.New("wrong API version"), http.StatusBadRequest) @@ -67,27 +99,31 @@ func routeFileUpload(w http.ResponseWriter, r *http.Request) { close(ch) } -var channels = make(map[string]chan []byte, 0) - -func downloadFromRemote(w http.ResponseWriter, downloadRequest fileDownloadRequest, remoteQueue string) { - channel := make(chan []byte) +func publishRequest(request fileOpRequest, remoteQueue string, operation string) (chan []byte, error) { id := uuid.New().String() - channels[id] = channel - msg := common.FileMessage{ Endpoint: common.Settings.Http.EndpointUrl + "/" + - common.GetRdbApiVersion() + "/upload/" + id, - Filename: downloadRequest.FileName, - Token: downloadRequest.Token, + common.GetRdbApiVersion() + "/uploadresult/" + id, + Filename: request.FileName, + Operation: operation, + Token: request.Token, } msgb, _ := json.Marshal(&msg) err := rmqClient.Publish(msgb, remoteQueue) if err != nil { - utils.WriteServerError(w, nil, http.StatusInternalServerError) - log.Fatal("Cannot publish file request to RMQ, will exit now: ", err) - return + return nil, err } + channel := make(chan []byte) + channels[id] = channel + return channel, nil +} +func remoteOperation(writer io.Writer, request fileOpRequest, remoteQueue string, operation string) error { + channel, err := publishRequest(request, remoteQueue, operation) + if err != nil { + log.Fatal("Cannot publish file request to RMQ, will exit now: ", err) + return errors.New("") + } timeout := time.Second * time.Duration(common.Settings.Http.Timeout) timer := time.NewTimer(timeout) for { @@ -95,20 +131,19 @@ func downloadFromRemote(w http.ResponseWriter, downloadRequest fileDownloadReque case data := <-channel: if data == nil { log.WithFields(map[string]interface{}{ - "name": downloadRequest.FileName, + "name": request.FileName, "mode": "as user, from remote", - }).Debug("File sent successfully") - return + }).Debug("remoteOperation " + operation + " finished successfully") + return nil } - w.Write(data) + writer.Write(data) timer.Reset(timeout) case <-timer.C: log.WithFields(map[string]interface{}{ - "name": downloadRequest.FileName, + "name": request.FileName, "mode": "as user, from remote", - }).Error("File sent timeout") - utils.WriteServerError(w, errors.New("request timeout"), http.StatusInternalServerError) - return + }).Error("remoteOperation " + operation + " timeout") + return errors.New("request timeout") } } } diff --git a/src/rmq_client/process_request.go b/src/rmq_client/process_request.go new file mode 100644 index 0000000000000000000000000000000000000000..eea8297f0ccee953b6dff2e5f73f44d2c92d3646 --- /dev/null +++ b/src/rmq_client/process_request.go @@ -0,0 +1,95 @@ +package rmq_client + +import ( + "bytes" + "encoding/json" + "errors" + "io" + "net/http" + "os" + "remote_data_broker/common" + "remote_data_broker/http_server" + log "remote_data_broker/logger" +) + +func processMessage(message []byte) error { + var msg common.FileMessage + err := json.Unmarshal(message, &msg) + if err != nil { + return err + } + + if common.Settings.Auth.Enabled { + user, err := auth.CheckAndGetContent(msg.Token) + if err != nil { + return err + } + + err = auth.Authorize(user) + if err != nil { + return err + } + + } + + var req *http.Request + switch msg.Operation { + case http_server.OpDelete: + req, err = processDeleteRequest(msg) + case http_server.OpDownload: + req, err = processDownloadRequest(msg) + } + if err != nil { + return err + } + + return doRequest(req, msg.Filename, msg.Operation) +} + +func processDeleteRequest(msg common.FileMessage) (*http.Request, error) { + err := os.Remove(msg.Filename) + if err != nil { + return nil, err + } + opRes := http_server.FileOpResult{ + http_server.OpDelete, + http.StatusNoContent, + } + b, _ := json.Marshal(&opRes) + req, err := http.NewRequest("POST", msg.Endpoint, bytes.NewBuffer(b)) + if err != nil { + return nil, err + } + return req, nil +} + +func processDownloadRequest(msg common.FileMessage) (*http.Request, error) { + file, err := os.Open(msg.Filename) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", msg.Endpoint, file) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/octet-stream") + return req, nil +} + +func doRequest(req *http.Request, fname string, operation string) error { + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + log.Debug("Request to "+operation+" processed successfully: ", fname) + } else { + err_msg, _ := io.ReadAll(resp.Body) + return errors.New("Error processing request to " + operation + " : " + string(err_msg)) + } + return nil +} diff --git a/src/rmq_client/process_request_test.go b/src/rmq_client/process_request_test.go new file mode 100644 index 0000000000000000000000000000000000000000..78f6937573f573b6118802202aa0de53defbcd96 --- /dev/null +++ b/src/rmq_client/process_request_test.go @@ -0,0 +1,57 @@ +package rmq_client + +import ( + "encoding/json" + "github.com/stretchr/testify/assert" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "remote_data_broker/common" + "remote_data_broker/utils" + "testing" +) + +func TestDownloadFileOp(t *testing.T) { + tmpFile, _ := ioutil.TempFile("", "testfile.txt") + defer os.Remove(tmpFile.Name()) + tmpFile.Write([]byte("test data")) + tmpFile.Close() + auth = &utils.MockAuth{} + var res string + uploadHandler := func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("File uploaded successfully\n")) + b, _ := io.ReadAll(r.Body) + res = string(b) + } + server := httptest.NewServer(http.HandlerFunc(uploadHandler)) + defer server.Close() + msg := common.FileMessage{server.URL, tmpFile.Name(), "download", "token"} + b, _ := json.Marshal(&msg) + err := processMessage(b) + assert.NoError(t, err) + assert.Equal(t, "test data", res) +} + +func TestDeleteFileOp(t *testing.T) { + tmpFile, _ := ioutil.TempFile("", "testfile.txt") + tmpFile.Write([]byte("test data")) + tmpFile.Close() + auth = &utils.MockAuth{} + var res string + uploadHandler := func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + b, _ := io.ReadAll(r.Body) + res = string(b) + } + server := httptest.NewServer(http.HandlerFunc(uploadHandler)) + defer server.Close() + msg := common.FileMessage{server.URL, tmpFile.Name(), "delete", "token"} + b, _ := json.Marshal(&msg) + err := processMessage(b) + assert.NoError(t, err) + assert.Equal(t, "{\"Operation\":\"delete\",\"StatusCode\":204}", res) + assert.NoFileExists(t, tmpFile.Name(), res) +} diff --git a/src/rmq_client/send_file.go b/src/rmq_client/send_file.go deleted file mode 100644 index 89ff9c7fe7b6f0deec5b68a466c11f34b88e70a7..0000000000000000000000000000000000000000 --- a/src/rmq_client/send_file.go +++ /dev/null @@ -1,70 +0,0 @@ -package rmq_client - -import ( - "encoding/json" - "errors" - "io" - "net/http" - "os" - "remote_data_broker/common" - log "remote_data_broker/logger" -) - -func processMessage(message []byte) error { - var msg common.FileMessage - err := json.Unmarshal(message, &msg) - if err != nil { - return err - } - - if common.Settings.Auth.Enabled { - user, err := auth.CheckAndGetContent(msg.Token) - if err != nil { - return err - } - - err = auth.Authorize(user) - if err != nil { - return err - } - - } - - req, err := prepareRequest(msg) - if err != nil { - return err - } - - return doRequest(req, msg.Filename) -} - -func prepareRequest(msg common.FileMessage) (*http.Request, error) { - file, err := os.Open(msg.Filename) - if err != nil { - return nil, err - } - - req, err := http.NewRequest("POST", msg.Endpoint, file) - if err != nil { - return nil, err - } - req.Header.Set("Content-Type", "application/octet-stream") - return req, nil -} - -func doRequest(req *http.Request, fname string) error { - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode == http.StatusOK { - log.Debug("File uploaded successfully: ", fname) - } else { - err_msg, _ := io.ReadAll(resp.Body) - return errors.New("Error file upload request: " + string(err_msg)) - } - return nil -} diff --git a/src/rmq_client/send_file_test.go b/src/rmq_client/send_file_test.go deleted file mode 100644 index eb277f4f2730a7b6e0a229e6303cc6aaa4e96aac..0000000000000000000000000000000000000000 --- a/src/rmq_client/send_file_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package rmq_client - -import ( - "encoding/json" - "github.com/stretchr/testify/assert" - "io" - "io/ioutil" - "net/http" - "net/http/httptest" - "os" - "remote_data_broker/common" - "remote_data_broker/utils" - "testing" -) - -func TestUploadFile(t *testing.T) { - tmpFile, _ := ioutil.TempFile("", "testfile.txt") - defer os.Remove(tmpFile.Name()) - tmpFile.Write([]byte("test data")) - tmpFile.Close() - auth = &utils.MockAuth{} - var res string - uploadHandler := func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write([]byte("File uploaded successfully\n")) - b, _ := io.ReadAll(r.Body) - res = string(b) - } - server := httptest.NewServer(http.HandlerFunc(uploadHandler)) - defer server.Close() - msg := common.FileMessage{server.URL, tmpFile.Name(), "token"} - b, _ := json.Marshal(&msg) - err := processMessage(b) - assert.NoError(t, err) - assert.Equal(t, "test data", res) -}