#pragma once
#include <curl/curl.h>
#include <string>
using std::string;
#include <sstream>
using std::stringstream;
#include <glog/logging.h>
#include "./file.hpp"
namespace backtest::utils
{
class WebHdfs
{
private:
string cmd;
string username;
static size_t filewrite_data(const char *ptr, size_t size, size_t nmemb, void *stream)
{
size_t written = fwrite(ptr, size, nmemb, (FILE *)stream);
return written;
}
public:
/**
* @brief Construct a new Web Hdfs object
* the root path like: http://10.60.2.114:9870/webhdfs/v1/user/username
* full op to see: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html
*
* @param hostaddr as 10.60.2.114:9870
* @param username
*/
WebHdfs(const string &hostaddr, const string &username)
{
this->cmd = "http://" + hostaddr + "/webhdfs/v1/user/" + username;
this->username = username;
curl_global_init(CURL_GLOBAL_ALL);
}
/**
* @brief upload file to webhdfs
*
* @param local_file
* @param hdfs_file
* @param force
* @return true
* @return false
*/
bool Upload(const string &local_file, const string &hdfs_file = "", bool force = false)
{
try
{
stringstream url;
url << this->cmd;
if (hdfs_file == "")
{
string file_name = file::GetFileName(local_file);
url << "/" << file_name;
}
else
{
url << "/" << hdfs_file;
}
url << "?op=CREATE&user.name=" << username;
if (force)
{
url << "&overwrite=true";
}
// init curl
CURL *curl = curl_easy_init();
if (!curl)
{
LOG(ERROR) << "curl init fail.";
return false;
}
// setting
curl_easy_setopt(curl, CURLOPT_URL, url.str().c_str());
curl_easy_setopt(curl, CURLOPT_PUT, 1L);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); // auto redirect
FILE *fd = fopen(local_file.c_str(), "rb");
curl_easy_setopt(curl, CURLOPT_READDATA, fd);
curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t)file::GetSize(local_file));
// get result
CURLcode res = curl_easy_perform(curl);
fclose(fd);
if (res != CURLE_OK)
{
LOG(ERROR) << "upload file to hdfs failed: " << curl_easy_strerror(res);
return false;
}
curl_easy_cleanup(curl);
}
catch (const std::exception &e)
{
LOG(ERROR) << e.what();
return false;
}
return true;
}
/**
* @brief download file from webhdfs
*
* @param hdfs_file
* @param local_file
* @param overwrite
* @return true
* @return false
*/
bool Download(const string &hdfs_file, const string &local_file = "", bool overwrite = false)
{
try
{
stringstream url;
url << cmd << "/" << hdfs_file << "?op=OPEN";
// init
CURL *curl = curl_easy_init();
if (!curl)
{
LOG(ERROR) << "curl init fail.";
return false;
}
// setting
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L);
curl_easy_setopt(curl, CURLOPT_URL, url.str().c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, filewrite_data);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); // auto redirect
FILE *fd{nullptr};
string temp_file = local_file;
if (local_file == "")
{
temp_file = file::GetFileName(hdfs_file);
}
string flag = "wb";
if (overwrite)
{
flag = "wb+";
}
fd = fopen(local_file.c_str(), flag.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEDATA, fd);
// get result
auto res = curl_easy_perform(curl);
fclose(fd);
if (res != CURLE_OK)
{
LOG(ERROR) << "get file from hdfs failed: " << curl_easy_strerror(res);
return false;
}
curl_easy_cleanup(curl);
}
catch (const std::exception &e)
{
LOG(ERROR) << e.what();
return false;
}
return true;
}
/**
* @brief delete file
*
* @param hdfs_file
* @return true
* @return false
*/
bool Delete(const string &hdfs_file, bool recursive = false)
{
try
{
stringstream url;
url << cmd << "/" << hdfs_file << "?op=DELETE";
if (recursive)
{
url << "&op=DELETE&recursive=true";
}
// init curl
CURL *curl = curl_easy_init();
if (!curl)
{
LOG(ERROR) << "curl init fail.";
return false;
}
// setting
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_easy_setopt(curl, CURLOPT_URL, url.str().c_str());
curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0);
CURLcode res = curl_easy_perform(curl);
if (res != CURLE_OK)
{
LOG(ERROR) << "hdfs del failed: " << curl_easy_strerror(res);
return false;
}
curl_easy_cleanup(curl);
}
catch (const std::exception &e)
{
LOG(ERROR) << e.what();
return false;
}
return true;
}
/**
* @brief rename file
*
* @param old_name
* @param new_name
* @return true
* @return false
*/
bool Rename(const string &old_name, const string &new_name)
{
try
{
stringstream url;
url << cmd << "/" << old_name << "?op=RENAME&destination=/user/" << username << "/" << new_name;
// init
CURL *curl = curl_easy_init();
if (!curl)
{
LOG(ERROR) << "curl init fail.";
return false;
}
// setting
curl_easy_setopt(curl, CURLOPT_PUT, 1L);
curl_easy_setopt(curl, CURLOPT_URL, url.str().c_str());
curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0);
CURLcode res = curl_easy_perform(curl);
if (res != CURLE_OK)
{
LOG(ERROR) << "hdfs rename failed: " << curl_easy_strerror(res);
return false;
}
curl_easy_cleanup(curl);
}
catch (const std::exception &e)
{
LOG(ERROR) << e.what();
return false;
}
return true;
}
/**
* @brief move file
*
* @param src_name
* @param target_name
* @return true
* @return false
*/
bool Move(const string &src_name, const string &target_name)
{
return Rename(src_name, target_name);
}
};
}