Fixed to use copy api in multipart upload

This commit is contained in:
Takeshi Nakatani
2019-09-26 11:30:58 +09:00
committed by Andrew Gaul
parent b6349e9428
commit 1db94a0b30
8 changed files with 619 additions and 20 deletions

View File

@ -382,9 +382,11 @@ bool PageList::IsPageLoaded(off_t start, off_t size) const
return true;
}
bool PageList::SetPageLoadedStatus(off_t start, off_t size, bool is_loaded, bool is_modified, bool is_compress)
bool PageList::SetPageLoadedStatus(off_t start, off_t size, PageList::page_status pstatus, bool is_compress)
{
off_t now_size = Size();
off_t now_size = Size();
bool is_loaded = (PAGE_LOAD_MODIFIED == pstatus || PAGE_LOADED == pstatus);
bool is_modified = (PAGE_LOAD_MODIFIED == pstatus || PAGE_MODIFIED == pstatus);
if(now_size <= start){
if(now_size < start){
@ -507,6 +509,263 @@ int PageList::GetUnloadedPages(fdpage_list_t& unloaded_list, off_t start, off_t
return unloaded_list.size();
}
// [NOTE]
// This method is called in advance when mixing POST and COPY in multi-part upload.
// The minimum size of each part must be 5 MB, and the data area below this must be
// downloaded from S3.
// This method checks the current PageList status and returns the area that needs
// to be downloaded so that each part is at least 5 MB.
//
bool PageList::GetLoadPageListForMultipartUpload(fdpage_list_t& dlpages)
{
// compress before this processing
if(!Compress()){
return false;
}
bool is_prev_modified_page = false;
off_t accumulated_bytes = 0;
off_t last_modified_bytes = 0;
for(fdpage_list_t::const_iterator iter = pages.begin(); iter != pages.end(); ++iter){
if(iter->modified){
// this is modified page
if(is_prev_modified_page){
// in case of continuous modified page
accumulated_bytes += iter->bytes;
}else{
// previous page is unmodified page
// check unmodified page bytes is over minimum size(5MB)
if(static_cast<const off_t>(MIN_MULTIPART_SIZE) <= accumulated_bytes){
// over minimum size
accumulated_bytes = iter->bytes; // reset accumulated size
}else{
// less than minimum size(5MB)
// the previous unmodified page needs to load, if it is not loaded.
// and that page will be included in consecutive modified page.
PageList::RawGetUnloadPageList(dlpages, (iter->offset - accumulated_bytes), accumulated_bytes);
accumulated_bytes += last_modified_bytes + iter->bytes; // this page size and last modified page size are accumulated
last_modified_bytes = 0;
}
is_prev_modified_page = true;
}
}else{
// this is unmodified page
if(!is_prev_modified_page){
// in case of continuous unmodified page
accumulated_bytes += iter->bytes;
}else{
// previous page is modified page
// check modified page bytes is over minimum size(5MB)
if(static_cast<const off_t>(MIN_MULTIPART_SIZE) <= accumulated_bytes){
// over minimum size
last_modified_bytes = accumulated_bytes; // backup last modified page size
accumulated_bytes = iter->bytes; // set new accumulated size(this page size)
is_prev_modified_page = false;
}else{
// less than minimum size(5MB)
// this unmodified page needs to load, if it is not loaded.
// and this page will be included in consecutive modified page.
if((static_cast<const off_t>(MIN_MULTIPART_SIZE) - accumulated_bytes) <= iter->bytes){
// Split the missing size from this page size for just before modified page.
if(!iter->loaded){
// because this page is not loaded
fdpage dlpage(iter->offset, (iter->bytes - (static_cast<const off_t>(MIN_MULTIPART_SIZE) - accumulated_bytes))); // don't care for loaded/modified flag
dlpages.push_back(dlpage);
}
last_modified_bytes = static_cast<const off_t>(MIN_MULTIPART_SIZE); // backup last modified page size
accumulated_bytes = iter->bytes - (static_cast<const off_t>(MIN_MULTIPART_SIZE) - accumulated_bytes); // set rest bytes to accumulated size
is_prev_modified_page = false;
}else{
// assign all this page sizes to just before modified page.
// but still it is not enough for the minimum size.
if(!iter->loaded){
// because this page is not loaded
fdpage dlpage(iter->offset, iter->bytes); // don't care for loaded/modified flag
dlpages.push_back(dlpage);
}
accumulated_bytes += iter->bytes; // add all bytes to accumulated size
}
}
}
}
}
// compress dlpages
bool is_first = true;
for(fdpage_list_t::iterator dliter = dlpages.begin(); dliter != dlpages.end(); ){
if(is_first){
is_first = false;
++dliter;
continue;
}
fdpage_list_t::iterator biter = dliter;
--biter;
if((biter->offset + biter->bytes) == dliter->offset){
biter->bytes += dliter->bytes;
dliter = dlpages.erase(dliter);
}else{
++dliter;
}
}
return true;
}
// [NOTE]
// This static method assumes that it is called only from GetLoadPageListForMultipartUpload.
// If you want to exclusive control, please do with GetLoadPageListForMultipartUpload,
// not with this method.
//
bool PageList::RawGetUnloadPageList(fdpage_list_t& dlpages, off_t offset, off_t size)
{
for(fdpage_list_t::const_iterator iter = pages.begin(); iter != pages.end(); ++iter){
if((iter->offset + iter->bytes) <= offset){
continue;
}else if((offset + size) <= iter->offset){
break;
}else{
if(!iter->loaded && !iter->modified){
fdpage dlpage(iter->offset, iter->bytes); // don't care for loaded/modified flag
dlpages.push_back(dlpage);
}
}
}
return true;
}
bool PageList::GetMultipartSizeList(fdpage_list_t& mplist, off_t partsize) const
{
if(!mplist.empty()){
return false;
}
// temporary page list
PageList tmpPageObj(*this);
if(!tmpPageObj.Compress(true)){ // compress by modified flag
return false;
}
// [NOTE]
// Set the modified flag in page list to the minimum size.
// This process needs to match the GetLoadPageListForMultipartUpload method exactly.
//
// [FIXME]
// Make the common processing of GetLoadPageListForMultipartUpload and this method
// to one method.
//
bool is_first = true;
bool is_prev_modified_page = false;
off_t accumulated_bytes = 0;
off_t last_modified_bytes = 0;
fdpage_list_t::iterator iter;
for(iter = tmpPageObj.pages.begin(); iter != tmpPageObj.pages.end(); ++iter){
if(is_first){
is_prev_modified_page = iter->modified;
is_first = false;
}
if(iter->modified){
// this is modified page
if(is_prev_modified_page){
// in case of continuous modified page
accumulated_bytes += iter->bytes;
}else{
// previous page is unmodified page
// check unmodified page bytes is over minimum size(5MB)
if(static_cast<const off_t>(MIN_MULTIPART_SIZE) <= accumulated_bytes){
// over minimum size
accumulated_bytes = iter->bytes; // reset accumulated size
}else{
// less than minimum size(5MB)
// the previous unmodified page is set modified flag.
fdpage_list_t::iterator biter = iter;
--biter;
biter->loaded = true;
biter->modified = true;
accumulated_bytes += last_modified_bytes + iter->bytes; // this page size and last modified page size are accumulated
last_modified_bytes = 0;
}
is_prev_modified_page = true;
}
}else{
// this is unmodified page
if(!is_prev_modified_page){
// in case of continuous unmodified page
accumulated_bytes += iter->bytes;
}else{
// previous page is modified page
// check modified page bytes is over minimum size(5MB)
if(static_cast<const off_t>(MIN_MULTIPART_SIZE) <= accumulated_bytes){
// over minimum size
last_modified_bytes = accumulated_bytes; // backup last modified page size
accumulated_bytes = iter->bytes; // set new accumulated size(this page size)
is_prev_modified_page = false;
}else{
// less than minimum size(5MB)
// this unmodified page is set modified flag.
if((static_cast<const off_t>(MIN_MULTIPART_SIZE) - accumulated_bytes) <= iter->bytes){
// Split the missing size from this page size for just before modified page.
fdpage newpage(iter->offset, (static_cast<const off_t>(MIN_MULTIPART_SIZE) - accumulated_bytes), true, true);
iter->bytes -= (static_cast<const off_t>(MIN_MULTIPART_SIZE) - accumulated_bytes);
iter->offset += (static_cast<const off_t>(MIN_MULTIPART_SIZE) - accumulated_bytes);
tmpPageObj.pages.insert(iter, newpage);
last_modified_bytes = static_cast<const off_t>(MIN_MULTIPART_SIZE); // backup last modified page size
accumulated_bytes = iter->bytes; // set rest bytes to accumulated size
is_prev_modified_page = false;
}else{
// assign all this page sizes to just before modified page.
// but still it is not enough for the minimum size.
accumulated_bytes += iter->bytes; // add all bytes to accumulated size
}
}
}
}
}
// recompress
if(!tmpPageObj.Compress(true)){ // compress by modified flag
return false;
}
// normalization for uploading parts
for(iter = tmpPageObj.pages.begin(); iter != tmpPageObj.pages.end(); ++iter){
off_t start = iter->offset;
off_t remains = iter->bytes;
while(0 < remains){
off_t onesize;
if(iter->modified){
// Uploading parts, this page must be 5MB - partsize
onesize = std::min(remains, partsize);
}else{
// Not uploading parts, this page must be 5MB - 5GB
onesize = std::min(remains, static_cast<off_t>(FIVE_GB));
}
fdpage page(start, onesize, iter->loaded, iter->modified);
mplist.push_back(page);
start += onesize;
remains -= onesize;
}
}
return true;
}
bool PageList::IsModified(void) const
{
for(fdpage_list_t::const_iterator iter = pages.begin(); iter != pages.end(); ++iter){
@ -616,7 +875,12 @@ bool PageList::Serialize(CacheFileStat& file, bool is_output)
is_modified = (1 == s3fs_strtoofft(part.c_str()) ? true : false);
}
// add new area
SetPageLoadedStatus(offset, size, is_loaded, is_modified);
PageList::page_status pstatus =
( is_loaded && is_modified ? PageList::PAGE_LOAD_MODIFIED :
!is_loaded && is_modified ? PageList::PAGE_MODIFIED :
is_loaded && !is_modified ? PageList::PAGE_LOADED : PageList::PAGE_NOT_LOAD_MODIFIED );
SetPageLoadedStatus(offset, size, pstatus);
}
delete[] ptmp;
if(is_err){
@ -649,6 +913,15 @@ void PageList::Dump()
//------------------------------------------------
// FdEntity class methods
//------------------------------------------------
bool FdEntity::mixmultipart = true;
bool FdEntity::SetNoMixMultipart(void)
{
bool old = mixmultipart;
mixmultipart = false;
return old;
}
int FdEntity::FillFile(int fd, unsigned char byte, off_t size, off_t start)
{
unsigned char bytes[1024 * 32]; // 32kb
@ -1265,7 +1538,7 @@ int FdEntity::Load(off_t start, off_t size, bool lock_already_held)
break;
}
// Set loaded flag
pagelist.SetPageLoadedStatus(iter->offset, iter->bytes, true, false);
pagelist.SetPageLoadedStatus(iter->offset, iter->bytes, PageList::PAGE_LOADED);
}
PageList::FreeList(unloaded_list);
}
@ -1532,8 +1805,10 @@ int FdEntity::RowFlush(const char* tpath, bool force_sync)
// check disk space
if(ReserveDiskSpace(restsize)){
// enough disk space
// Load all uninitialized area
result = Load(/*start=*/ 0, /*size=*/ 0, /*lock_already_held=*/ true);
// Load all uninitialized area(no mix multipart uploading)
if(!FdEntity::mixmultipart){
result = Load(/*start=*/ 0, /*size=*/ 0, /*lock_already_held=*/ true);
}
FdManager::FreeReservedDiskSpace(restsize);
if(0 != result){
S3FS_PRN_ERR("failed to upload all area(errno=%d)", result);
@ -1588,8 +1863,37 @@ int FdEntity::RowFlush(const char* tpath, bool force_sync)
}
if(pagelist.Size() >= S3fsCurl::GetMultipartSize() && !nomultipart){
result = S3fsCurl::ParallelMultipartUploadRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, fd);
if(FdEntity::mixmultipart){
// multipart uploading can use copy api
// This is to ensure that each part is 5MB or more.
// If the part is less than 5MB, download it.
fdpage_list_t dlpages;
if(!pagelist.GetLoadPageListForMultipartUpload(dlpages)){
S3FS_PRN_ERR("something error occurred during getting download pagelist.");
return -1;
}
for(fdpage_list_t::const_iterator iter = dlpages.begin(); iter != dlpages.end(); ++iter){
if(0 != (result = Load(iter->offset, iter->bytes, true))){
S3FS_PRN_ERR("failed to get parts(start=%lld, size=%lld) before uploading.", static_cast<long long int>(iter->offset), static_cast<long long int>(iter->bytes));
return result;
}
}
// multipart uploading with copy api
result = S3fsCurl::ParallelMixMultipartUploadRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, fd, pagelist);
}else{
// multipart uploading not using copy api
result = S3fsCurl::ParallelMultipartUploadRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, fd);
}
}else{
// If there are unloaded pages, they are loaded at here.
if(0 != (result = Load(/*start=*/ 0, /*size=*/ 0, /*lock_already_held=*/ true))){
S3FS_PRN_ERR("failed to load parts before uploading object(%d)", result);
return result;
}
S3fsCurl s3fscurl(true);
result = s3fscurl.PutRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, fd);
}
@ -1667,7 +1971,7 @@ ssize_t FdEntity::Read(char* bytes, off_t start, size_t size, bool force_load)
AutoLock auto_lock(&fdent_data_lock);
if(force_load){
pagelist.SetPageLoadedStatus(start, size, false, false);
pagelist.SetPageLoadedStatus(start, size, PageList::PAGE_NOT_LOAD_MODIFIED);
}
ssize_t rsize;
@ -1737,7 +2041,7 @@ ssize_t FdEntity::Write(const char* bytes, off_t start, size_t size)
return -EIO;
}
// add new area
pagelist.SetPageLoadedStatus(pagelist.Size(), start - pagelist.Size(), false, true);
pagelist.SetPageLoadedStatus(pagelist.Size(), start - pagelist.Size(), PageList::PAGE_MODIFIED);
}
int result = 0;
@ -1750,9 +2054,12 @@ ssize_t FdEntity::Write(const char* bytes, off_t start, size_t size)
// enough disk space
// Load uninitialized area which starts from 0 to (start + size) before writing.
if(0 < start){
result = Load(0, start, /*lock_already_held=*/ true);
if(!FdEntity::mixmultipart){
if(0 < start){
result = Load(0, start, /*lock_already_held=*/ true);
}
}
FdManager::FreeReservedDiskSpace(restsize);
if(0 != result){
S3FS_PRN_ERR("failed to load uninitialized area before writing(errno=%d)", result);
@ -1782,15 +2089,17 @@ ssize_t FdEntity::Write(const char* bytes, off_t start, size_t size)
return -errno;
}
if(0 < wsize){
pagelist.SetPageLoadedStatus(start, wsize, true, true);
pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED);
}
// Load uninitialized area which starts from (start + size) to EOF after writing.
if(pagelist.Size() > start + static_cast<off_t>(size)){
result = Load(start + size, pagelist.Size(), /*lock_already_held=*/ true);
if(0 != result){
S3FS_PRN_ERR("failed to load uninitialized area after writing(errno=%d)", result);
return static_cast<ssize_t>(result);
if(!FdEntity::mixmultipart){
if(pagelist.Size() > start + static_cast<off_t>(size)){
result = Load(start + size, pagelist.Size(), /*lock_already_held=*/ true);
if(0 != result){
S3FS_PRN_ERR("failed to load uninitialized area after writing(errno=%d)", result);
return static_cast<ssize_t>(result);
}
}
}