Fixed about ParallelMixMultipartUpload

This commit is contained in:
Takeshi Nakatani
2020-06-21 18:04:49 +00:00
committed by Andrew Gaul
parent 7d0c66e08a
commit 35006e318f
5 changed files with 241 additions and 276 deletions

View File

@ -40,11 +40,11 @@
#include <vector>
#include "common.h"
#include "fdcache.h"
#include "s3fs.h"
#include "s3fs_util.h"
#include "string_util.h"
#include "curl.h"
#include "fdcache.h"
using namespace std;
@ -276,6 +276,131 @@ bool CacheFileStat::Release()
return true;
}
//------------------------------------------------
// fdpage_list_t utility
//------------------------------------------------
// Inline function for repeated processing
inline void raw_add_compress_fdpage_list(fdpage_list_t& pagelist, fdpage& page, bool ignore_load, bool ignore_modify, bool default_load, bool default_modify)
{
if(0 < page.bytes){
// [NOTE]
// The page variable is subject to change here.
//
if(ignore_load){
page.loaded = default_load;
}
if(ignore_modify){
page.modified = default_modify;
}
pagelist.push_back(page);
}
}
// Compress the page list
//
// ignore_load: Ignore the flag of loaded member and compress
// ignore_modify: Ignore the flag of modified member and compress
// default_load: loaded flag value in the list after compression when ignore_load=true
// default_modify: modified flag value in the list after compression when default_modify=true
//
// NOTE: ignore_modify and ignore_load cannot both be true.
//
static fdpage_list_t raw_compress_fdpage_list(const fdpage_list_t& pages, bool ignore_load, bool ignore_modify, bool default_load, bool default_modify)
{
fdpage_list_t compressed_pages;
fdpage tmppage;
bool is_first = true;
for(fdpage_list_t::const_iterator iter = pages.begin(); iter != pages.end(); ++iter){
if(!is_first){
if( (!ignore_load && (tmppage.loaded != iter->loaded )) ||
(!ignore_modify && (tmppage.modified != iter->modified)) )
{
// Different from the previous area, add it to list
raw_add_compress_fdpage_list(compressed_pages, tmppage, ignore_load, ignore_modify, default_load, default_modify);
// keep current area
tmppage = fdpage(iter->offset, iter->bytes, (ignore_load ? default_load : iter->loaded), (ignore_modify ? default_modify : iter->modified));
}else{
// Same as the previous area
if(tmppage.next() != iter->offset){
// These are not contiguous areas, add it to list
raw_add_compress_fdpage_list(compressed_pages, tmppage, ignore_load, ignore_modify, default_load, default_modify);
// keep current area
tmppage = fdpage(iter->offset, iter->bytes, (ignore_load ? default_load : iter->loaded), (ignore_modify ? default_modify : iter->modified));
}else{
// These are contiguous areas
// add current area
tmppage.bytes += iter->bytes;
}
}
}else{
// first erea
is_first = false;
// keep current area
tmppage = fdpage(iter->offset, iter->bytes, (ignore_load ? default_load : iter->loaded), (ignore_modify ? default_modify : iter->modified));
}
}
// add lastest area
if(!is_first){
raw_add_compress_fdpage_list(compressed_pages, tmppage, ignore_load, ignore_modify, default_load, default_modify);
}
return compressed_pages;
}
static fdpage_list_t compress_fdpage_list_ignore_modify(const fdpage_list_t& pages, bool default_modify)
{
return raw_compress_fdpage_list(pages, /* ignore_load= */ false, /* ignore_modify= */ true, /* default_load= */false, /* default_modify= */default_modify);
}
static fdpage_list_t compress_fdpage_list_ignore_load(const fdpage_list_t& pages, bool default_load)
{
return raw_compress_fdpage_list(pages, /* ignore_load= */ true, /* ignore_modify= */ false, /* default_load= */default_load, /* default_modify= */false);
}
static fdpage_list_t compress_fdpage_list(const fdpage_list_t& pages)
{
return raw_compress_fdpage_list(pages, /* ignore_load= */ false, /* ignore_modify= */ false, /* default_load= */false, /* default_modify= */false);
}
static fdpage_list_t parse_partsize_fdpage_list(const fdpage_list_t& pages, off_t max_partsize)
{
fdpage_list_t parsed_pages;
for(fdpage_list_t::const_iterator iter = pages.begin(); iter != pages.end(); ++iter){
if(iter->modified){
// modified page
fdpage tmppage = *iter;
for(off_t start = iter->offset, rest_bytes = iter->bytes; 0 < rest_bytes; ){
if((max_partsize * 2) < rest_bytes){
// do parse
tmppage.offset = start;
tmppage.bytes = max_partsize;
parsed_pages.push_back(tmppage);
start += max_partsize;
rest_bytes -= max_partsize;
}else{
// Since the number of remaining bytes is less than twice max_partsize,
// one of the divided areas will be smaller than max_partsize.
// Therefore, this area at the end should not be divided.
tmppage.offset = start;
tmppage.bytes = rest_bytes;
parsed_pages.push_back(tmppage);
start += rest_bytes;
rest_bytes = 0;
}
}
}else{
// not modified page is not parsed
parsed_pages.push_back(*iter);
}
}
return parsed_pages;
}
//------------------------------------------------
// PageList methods
//------------------------------------------------
@ -325,37 +450,9 @@ off_t PageList::Size() const
return riter->next();
}
bool PageList::Compress(bool force_modified)
bool PageList::Compress()
{
bool is_first = true;
bool is_last_loaded = false;
bool is_last_modified = false;
for(fdpage_list_t::iterator iter = pages.begin(); iter != pages.end(); ){
if(is_first){
is_first = false;
is_last_loaded = force_modified ? true : iter->loaded;
is_last_modified = iter->modified;
++iter;
}else{
if(is_last_modified == iter->modified){
if(force_modified || is_last_loaded == iter->loaded){
fdpage_list_t::iterator biter = iter;
--biter;
biter->bytes += iter->bytes;
iter = pages.erase(iter);
}else{
is_last_loaded = iter->loaded;
is_last_modified = iter->modified;
++iter;
}
}else{
is_last_loaded = force_modified ? true : iter->loaded;
is_last_modified = iter->modified;
++iter;
}
}
}
pages = compress_fdpage_list(pages);
return true;
}
@ -558,253 +655,104 @@ int PageList::GetUnloadedPages(fdpage_list_t& unloaded_list, off_t start, off_t
// This method checks the current PageList status and returns the area that needs
// to be downloaded so that each part is at least 5 MB.
//
bool PageList::GetLoadPageListForMultipartUpload(fdpage_list_t& dlpages)
bool PageList::GetPageListsForMultipartUpload(fdpage_list_t& dlpages, fdpage_list_t& mixuppages, off_t max_partsize)
{
// compress before this processing
if(!Compress()){
return false;
}
bool is_prev_modified_page = false;
off_t accumulated_bytes = 0;
off_t last_modified_bytes = 0;
for(fdpage_list_t::const_iterator iter = pages.begin(); iter != pages.end(); ++iter){
// make a list by modified flag
fdpage_list_t modified_pages = compress_fdpage_list_ignore_load(pages, false);
fdpage_list_t download_pages; // A non-contiguous page list showing the areas that need to be downloaded
fdpage_list_t mixupload_pages; // A continuous page list showing only modified flags for mixupload
fdpage prev_page;
for(fdpage_list_t::const_iterator iter = modified_pages.begin(); iter != modified_pages.end(); ++iter){
if(iter->modified){
// this is modified page
if(is_prev_modified_page){
// in case of continuous modified page
accumulated_bytes += iter->bytes;
}else{
// previous page is unmodified page
// check unmodified page bytes is over minimum size(5MB)
if(static_cast<const off_t>(MIN_MULTIPART_SIZE) <= accumulated_bytes){
// over minimum size
accumulated_bytes = iter->bytes; // reset accumulated size
// current is modified area
if(!prev_page.modified){
// previous is not modified area
if(prev_page.bytes < static_cast<const off_t>(MIN_MULTIPART_SIZE)){
// previous(not modified) area is too small for one multipart size,
// then all of previous area is needed to download.
download_pages.push_back(prev_page);
// previous(not modified) area is set upload area.
prev_page.modified = true;
mixupload_pages.push_back(prev_page);
}else{
// less than minimum size(5MB)
// the previous unmodified page needs to load, if it is not loaded.
// and that page will be included in consecutive modified page.
PageList::RawGetUnloadPageList(dlpages, (iter->offset - accumulated_bytes), accumulated_bytes);
accumulated_bytes += last_modified_bytes + iter->bytes; // this page size and last modified page size are accumulated
last_modified_bytes = 0;
// previous(not modified) area is set copy area.
prev_page.modified = false;
mixupload_pages.push_back(prev_page);
}
is_prev_modified_page = true;
// set current to previous
prev_page = *iter;
}else{
// previous is modified area, too
prev_page.bytes += iter->bytes;
}
}else{
// this is unmodified page
if(!is_prev_modified_page){
// in case of continuous unmodified page
accumulated_bytes += iter->bytes;
// current is not modified area
if(!prev_page.modified){
// previous is not modified area, too
prev_page.bytes += iter->bytes;
}else{
// previous page is modified page
// check modified page bytes is over minimum size(5MB)
if(static_cast<const off_t>(MIN_MULTIPART_SIZE) <= accumulated_bytes){
// over minimum size
last_modified_bytes = accumulated_bytes; // backup last modified page size
accumulated_bytes = iter->bytes; // set new accumulated size(this page size)
is_prev_modified_page = false;
// previous is modified area
if(prev_page.bytes < static_cast<const off_t>(MIN_MULTIPART_SIZE)){
// previous(modified) area is too small for one multipart size,
// then part or all of current area is needed to download.
off_t missing_bytes = static_cast<const off_t>(MIN_MULTIPART_SIZE) - prev_page.bytes;
}else{
// less than minimum size(5MB)
// this unmodified page needs to load, if it is not loaded.
// and this page will be included in consecutive modified page.
if((static_cast<const off_t>(MIN_MULTIPART_SIZE) - accumulated_bytes) <= iter->bytes){
// Split the missing size from this page size for just before modified page.
if(!iter->loaded){
// because this page is not loaded
fdpage dlpage(iter->offset, (iter->bytes - (static_cast<const off_t>(MIN_MULTIPART_SIZE) - accumulated_bytes))); // don't care for loaded/modified flag
dlpages.push_back(dlpage);
}
last_modified_bytes = static_cast<const off_t>(MIN_MULTIPART_SIZE); // backup last modified page size
accumulated_bytes = iter->bytes - (static_cast<const off_t>(MIN_MULTIPART_SIZE) - accumulated_bytes); // set rest bytes to accumulated size
is_prev_modified_page = false;
if((missing_bytes + static_cast<const off_t>(MIN_MULTIPART_SIZE)) < iter-> bytes){
// The current size is larger than the missing size, and the remainder
// after deducting the missing size is larger than the minimum size.
fdpage missing_page(iter->offset, missing_bytes, false, false);
download_pages.push_back(missing_page);
// previous(not modified) area is set upload area.
prev_page.bytes = static_cast<const off_t>(MIN_MULTIPART_SIZE);
mixupload_pages.push_back(prev_page);
// set current to previous
prev_page = *iter;
prev_page.offset += missing_bytes;
prev_page.bytes -= missing_bytes;
}else{
// assign all this page sizes to just before modified page.
// but still it is not enough for the minimum size.
if(!iter->loaded){
// because this page is not loaded
fdpage dlpage(iter->offset, iter->bytes); // don't care for loaded/modified flag
dlpages.push_back(dlpage);
}
accumulated_bytes += iter->bytes; // add all bytes to accumulated size
// The current size is less than the missing size, or the remaining
// size less the missing size is less than the minimum size.
download_pages.push_back(*iter);
// add current to previous
prev_page.bytes += iter->bytes;
}
}
}
}
}
// compress dlpages
bool is_first = true;
for(fdpage_list_t::iterator dliter = dlpages.begin(); dliter != dlpages.end(); ){
if(is_first){
is_first = false;
++dliter;
continue;
}
fdpage_list_t::iterator biter = dliter;
--biter;
if((biter->offset + biter->bytes) == dliter->offset){
biter->bytes += dliter->bytes;
dliter = dlpages.erase(dliter);
}else{
++dliter;
}
}
return true;
}
// [NOTE]
// This static method assumes that it is called only from GetLoadPageListForMultipartUpload.
// If you want to exclusive control, please do with GetLoadPageListForMultipartUpload,
// not with this method.
//
bool PageList::RawGetUnloadPageList(fdpage_list_t& dlpages, off_t offset, off_t size)
{
for(fdpage_list_t::const_iterator iter = pages.begin(); iter != pages.end(); ++iter){
if((iter->offset + iter->bytes) <= offset){
continue;
}else if((offset + size) <= iter->offset){
break;
}else{
if(!iter->loaded && !iter->modified){
fdpage dlpage(iter->offset, iter->bytes); // don't care for loaded/modified flag
dlpages.push_back(dlpage);
}
}
}
return true;
}
bool PageList::GetMultipartSizeList(fdpage_list_t& mplist, off_t partsize) const
{
if(!mplist.empty()){
return false;
}
// temporary page list
PageList tmpPageObj(*this);
if(!tmpPageObj.Compress(true)){ // compress by modified flag
return false;
}
// [NOTE]
// Set the modified flag in page list to the minimum size.
// This process needs to match the GetLoadPageListForMultipartUpload method exactly.
//
// [FIXME]
// Make the common processing of GetLoadPageListForMultipartUpload and this method
// to one method.
//
bool is_first = true;
bool is_prev_modified_page = false;
off_t accumulated_bytes = 0;
off_t last_modified_bytes = 0;
fdpage_list_t::iterator iter;
for(iter = tmpPageObj.pages.begin(); iter != tmpPageObj.pages.end(); ++iter){
if(is_first){
is_prev_modified_page = iter->modified;
is_first = false;
}
if(iter->modified){
// this is modified page
if(is_prev_modified_page){
// in case of continuous modified page
accumulated_bytes += iter->bytes;
}else{
// previous page is unmodified page
// check unmodified page bytes is over minimum size(5MB)
if(static_cast<const off_t>(MIN_MULTIPART_SIZE) <= accumulated_bytes){
// over minimum size
accumulated_bytes = iter->bytes; // reset accumulated size
}else{
// less than minimum size(5MB)
// the previous unmodified page is set modified flag.
fdpage_list_t::iterator biter = iter;
--biter;
biter->loaded = true;
biter->modified = true;
accumulated_bytes += last_modified_bytes + iter->bytes; // this page size and last modified page size are accumulated
last_modified_bytes = 0;
}
is_prev_modified_page = true;
}
// previous(modified) area is enough size for one multipart size.
mixupload_pages.push_back(prev_page);
}else{
// this is unmodified page
if(!is_prev_modified_page){
// in case of continuous unmodified page
accumulated_bytes += iter->bytes;
}else{
// previous page is modified page
// check modified page bytes is over minimum size(5MB)
if(static_cast<const off_t>(MIN_MULTIPART_SIZE) <= accumulated_bytes){
// over minimum size
last_modified_bytes = accumulated_bytes; // backup last modified page size
accumulated_bytes = iter->bytes; // set new accumulated size(this page size)
is_prev_modified_page = false;
}else{
// less than minimum size(5MB)
// this unmodified page is set modified flag.
if((static_cast<const off_t>(MIN_MULTIPART_SIZE) - accumulated_bytes) <= iter->bytes){
// Split the missing size from this page size for just before modified page.
fdpage newpage(iter->offset, (static_cast<const off_t>(MIN_MULTIPART_SIZE) - accumulated_bytes), true, true);
iter->bytes -= (static_cast<const off_t>(MIN_MULTIPART_SIZE) - accumulated_bytes);
iter->offset += (static_cast<const off_t>(MIN_MULTIPART_SIZE) - accumulated_bytes);
tmpPageObj.pages.insert(iter, newpage);
last_modified_bytes = static_cast<const off_t>(MIN_MULTIPART_SIZE); // backup last modified page size
accumulated_bytes = iter->bytes; // set rest bytes to accumulated size
is_prev_modified_page = false;
}else{
// assign all this page sizes to just before modified page.
// but still it is not enough for the minimum size.
accumulated_bytes += iter->bytes; // add all bytes to accumulated size
}
// set current to previous
prev_page = *iter;
}
}
}
}
// recompress
if(!tmpPageObj.Compress(true)){ // compress by modified flag
return false;
// lastest area
if(0 < prev_page.bytes){
mixupload_pages.push_back(prev_page);
}
// normalization for uploading parts
for(iter = tmpPageObj.pages.begin(); iter != tmpPageObj.pages.end(); ++iter){
off_t start = iter->offset;
off_t remains = iter->bytes;
// compress
dlpages = compress_fdpage_list_ignore_modify(download_pages, false);
mixuppages = compress_fdpage_list_ignore_load(mixupload_pages, false);
while(0 < remains){
off_t onesize;
if(iter->modified){
// Uploading parts, this page must be 5MB - partsize
onesize = std::min(remains, partsize);
}else{
// Not uploading parts, this page must be 5MB - 5GB
onesize = std::min(remains, static_cast<off_t>(FIVE_GB));
}
fdpage page(start, onesize, iter->loaded, iter->modified);
mplist.push_back(page);
// parse by max pagesize
dlpages = parse_partsize_fdpage_list(dlpages, max_partsize);
mixuppages = parse_partsize_fdpage_list(mixuppages, max_partsize);
start += onesize;
remains -= onesize;
}
}
return true;
}
@ -978,12 +926,12 @@ bool PageList::Serialize(CacheFileStat& file, bool is_output, ino_t inode)
return true;
}
void PageList::Dump()
void PageList::Dump() const
{
int cnt = 0;
S3FS_PRN_DBG("pages = {");
for(fdpage_list_t::iterator iter = pages.begin(); iter != pages.end(); ++iter, ++cnt){
for(fdpage_list_t::const_iterator iter = pages.begin(); iter != pages.end(); ++iter, ++cnt){
S3FS_PRN_DBG(" [%08d] -> {%014lld - %014lld : %s / %s}", cnt, static_cast<long long int>(iter->offset), static_cast<long long int>(iter->bytes), iter->loaded ? "loaded" : "unloaded", iter->modified ? "modified" : "not modified");
}
S3FS_PRN_DBG("}");
@ -2045,7 +1993,6 @@ int FdEntity::RowFlush(const char* tpath, bool force_sync)
if(0 == upload_id.length()){
// normal uploading
/*
* Make decision to do multi upload (or not) based upon file size
*
@ -2085,10 +2032,14 @@ int FdEntity::RowFlush(const char* tpath, bool force_sync)
// This is to ensure that each part is 5MB or more.
// If the part is less than 5MB, download it.
fdpage_list_t dlpages;
if(!pagelist.GetLoadPageListForMultipartUpload(dlpages)){
fdpage_list_t mixuppages;
if(!pagelist.GetPageListsForMultipartUpload(dlpages, mixuppages, S3fsCurl::GetMultipartSize())){
S3FS_PRN_ERR("something error occurred during getting download pagelist.");
return -1;
}
// [TODO] should use parallel downloading
//
for(fdpage_list_t::const_iterator iter = dlpages.begin(); iter != dlpages.end(); ++iter){
if(0 != (result = Load(iter->offset, iter->bytes, /*lock_already_held=*/ true, /*is_modified_flag=*/ true))){ // set loaded and modified flag
S3FS_PRN_ERR("failed to get parts(start=%lld, size=%lld) before uploading.", static_cast<long long int>(iter->offset), static_cast<long long int>(iter->bytes));
@ -2097,7 +2048,7 @@ int FdEntity::RowFlush(const char* tpath, bool force_sync)
}
// multipart uploading with copy api
result = S3fsCurl::ParallelMixMultipartUploadRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, fd, pagelist);
result = S3fsCurl::ParallelMixMultipartUploadRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, fd, mixuppages);
}else{
// multipart uploading not using copy api
@ -2122,7 +2073,6 @@ int FdEntity::RowFlush(const char* tpath, bool force_sync)
// reset uploaded file size
size_orgmeta = st.st_size;
}else{
// upload rest data
if(0 < mp_size){