diff --git a/src/curl.h b/src/curl.h index 013910c..abba679 100644 --- a/src/curl.h +++ b/src/curl.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include "common.h" @@ -202,7 +203,7 @@ class S3fsCurl std::string query_string; // request query string Semaphore *sem; std::mutex *completed_tids_lock; - std::vector *completed_tids; + std::vector *completed_tids; s3fscurl_lazy_setup fpLazySetup; // curl options for lazy setting function CURLcode curlCode; // handle curl return diff --git a/src/curl_multi.cpp b/src/curl_multi.cpp index e2a156c..e2bee26 100644 --- a/src/curl_multi.cpp +++ b/src/curl_multi.cpp @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include #include "s3fs.h" @@ -112,14 +114,12 @@ bool S3fsMultiCurl::SetS3fsCurlObject(std::unique_ptr s3fscurl) int S3fsMultiCurl::MultiPerform() { - std::vector threads; + std::map>> threads; bool success = true; bool isMultiHead = false; Semaphore sem(GetMaxParallelism()); - int rc; for(s3fscurllist_t::iterator iter = clist_req.begin(); iter != clist_req.end(); ++iter) { - pthread_t thread; S3fsCurl* s3fscurl = iter->get(); if(!s3fscurl){ continue; @@ -129,19 +129,14 @@ int S3fsMultiCurl::MultiPerform() { const std::lock_guard lock(completed_tids_lock); - for(std::vector::iterator it = completed_tids.begin(); it != completed_tids.end(); ++it){ - void* retval; - - rc = pthread_join(*it, &retval); - if (rc) { - success = false; - S3FS_PRN_ERR("failed pthread_join - rc(%d) %s", rc, strerror(rc)); - } else { - long int_retval = reinterpret_cast(retval); - if (int_retval && !(int_retval == -ENOENT && isMultiHead)) { - S3FS_PRN_WARN("thread terminated with non-zero return code: %ld", int_retval); - } + for(const auto &thread_id : completed_tids){ + auto it = threads.find(thread_id); + it->second.first.join(); + long int int_retval = it->second.second.get(); + if (int_retval && !(int_retval == -ENOENT && isMultiHead)) { + S3FS_PRN_WARN("thread terminated with non-zero return code: %ld", int_retval); } + threads.erase(it); } completed_tids.clear(); } @@ -151,13 +146,11 @@ int S3fsMultiCurl::MultiPerform() isMultiHead |= s3fscurl->GetOp() == "HEAD"; - rc = pthread_create(&thread, nullptr, S3fsMultiCurl::RequestPerformWrapper, static_cast(s3fscurl)); - if (rc != 0) { - success = false; - S3FS_PRN_ERR("failed pthread_create - rc(%d)", rc); - break; - } - threads.push_back(thread); + std::promise promise; + std::future future = promise.get_future(); + std::thread thread(S3fsMultiCurl::RequestPerformWrapper, s3fscurl, std::move(promise)); + auto thread_id = thread.get_id(); + threads.emplace(std::piecewise_construct, std::forward_as_tuple(thread_id), std::forward_as_tuple(std::move(thread), std::move(future))); } for(int i = 0; i < sem.get_value(); ++i){ @@ -165,19 +158,14 @@ int S3fsMultiCurl::MultiPerform() } const std::lock_guard lock(completed_tids_lock); - for (std::vector::iterator titer = completed_tids.begin(); titer != completed_tids.end(); ++titer) { - void* retval; - - rc = pthread_join(*titer, &retval); - if (rc) { - success = false; - S3FS_PRN_ERR("failed pthread_join - rc(%d)", rc); - } else { - long int_retval = reinterpret_cast(retval); - if (int_retval && !(int_retval == -ENOENT && isMultiHead)) { - S3FS_PRN_WARN("thread terminated with non-zero return code: %ld", int_retval); - } + for(const auto &thread_id : completed_tids){ + auto it = threads.find(thread_id); + it->second.first.join(); + long int int_retval = it->second.second.get(); + if (int_retval && !(int_retval == -ENOENT && isMultiHead)) { + S3FS_PRN_WARN("thread terminated with non-zero return code: %ld", int_retval); } + threads.erase(it); } completed_tids.clear(); @@ -343,30 +331,31 @@ int S3fsMultiCurl::Request() // // thread function for performing an S3fsCurl request // -void* S3fsMultiCurl::RequestPerformWrapper(void* arg) +void S3fsMultiCurl::RequestPerformWrapper(S3fsCurl* s3fscurl, std::promise promise) { - S3fsCurl* s3fscurl= static_cast(arg); - void* result = nullptr; + int result = 0; if(!s3fscurl){ - return reinterpret_cast(static_cast(-EIO)); + // this doesn't signal completion but also never happens + promise.set_value(-EIO); + return; } if(s3fscurl->fpLazySetup){ if(!s3fscurl->fpLazySetup(s3fscurl)){ S3FS_PRN_ERR("Failed to lazy setup, then respond EIO."); - result = reinterpret_cast(static_cast(-EIO)); + result = -EIO; } } if(!result){ - result = reinterpret_cast(static_cast(s3fscurl->RequestPerform())); + result = s3fscurl->RequestPerform(); s3fscurl->DestroyCurlHandle(true, false); } const std::lock_guard lock(*s3fscurl->completed_tids_lock); - s3fscurl->completed_tids->push_back(pthread_self()); + s3fscurl->completed_tids->push_back(std::this_thread::get_id()); s3fscurl->sem->post(); - return result; + promise.set_value(result); } /* diff --git a/src/curl_multi.h b/src/curl_multi.h index 5ba94e8..d62f6a7 100644 --- a/src/curl_multi.h +++ b/src/curl_multi.h @@ -21,8 +21,10 @@ #ifndef S3FS_CURL_MULTI_H_ #define S3FS_CURL_MULTI_H_ +#include #include #include +#include #include //---------------------------------------------- @@ -54,14 +56,14 @@ class S3fsMultiCurl void* pNotFoundCallbackParam; std::mutex completed_tids_lock; - std::vector completed_tids; + std::vector completed_tids; private: bool ClearEx(bool is_all); int MultiPerform(); int MultiRead(); - static void* RequestPerformWrapper(void* arg); + static void RequestPerformWrapper(S3fsCurl* s3fscurl, std::promise promise); public: explicit S3fsMultiCurl(int maxParallelism, bool not_abort = false); diff --git a/src/gnutls_auth.cpp b/src/gnutls_auth.cpp index 36d8395..b77b3cd 100644 --- a/src/gnutls_auth.cpp +++ b/src/gnutls_auth.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include diff --git a/src/nss_auth.cpp b/src/nss_auth.cpp index a8fc5ca..4f8d8ca 100644 --- a/src/nss_auth.cpp +++ b/src/nss_auth.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include diff --git a/src/openssl_auth.cpp b/src/openssl_auth.cpp index 885a020..680d632 100644 --- a/src/openssl_auth.cpp +++ b/src/openssl_auth.cpp @@ -26,7 +26,6 @@ #include #include #include -#include #include #include #include @@ -34,6 +33,7 @@ #include #include #include +#include #include "s3fs_auth.h" #include "s3fs_logger.h" @@ -99,9 +99,7 @@ static void s3fs_crypt_mutex_lock(int mode, int pos, const char* file, int line) static unsigned long s3fs_crypt_get_threadid() __attribute__ ((unused)); static unsigned long s3fs_crypt_get_threadid() { - // For FreeBSD etc, some system's pthread_t is structure pointer. - // Then we use cast like C style(not C++) instead of ifdef. - return (unsigned long)(pthread_self()); + return static_cast(std::hash()(std::this_thread::get_id())); } static struct CRYPTO_dynlock_value* s3fs_dyn_crypt_mutex(const char* file, int line) __attribute__ ((unused)); diff --git a/src/sighandlers.cpp b/src/sighandlers.cpp index d193269..432925f 100644 --- a/src/sighandlers.cpp +++ b/src/sighandlers.cpp @@ -20,7 +20,7 @@ #include #include -#include +#include #include "s3fs_logger.h" #include "sighandlers.h" @@ -86,14 +86,13 @@ bool S3fsSignals::SetUsr1Handler(const char* path) return true; } -void* S3fsSignals::CheckCacheWorker(void* arg) +void S3fsSignals::CheckCacheWorker(Semaphore* pSem) { - Semaphore* pSem = static_cast(arg); if(!pSem){ - pthread_exit(nullptr); + return; } if(!S3fsSignals::enableUsr1){ - pthread_exit(nullptr); + return; } // wait and loop @@ -117,7 +116,6 @@ void* S3fsSignals::CheckCacheWorker(void* arg) pSem->wait(); } } - return nullptr; } void S3fsSignals::HandlerUSR2(int sig) @@ -195,15 +193,9 @@ bool S3fsSignals::InitUsr1Handler() } // create thread - int result; std::unique_ptr pSemUsr1_tmp(new Semaphore(0)); - std::unique_ptr pThreadUsr1_tmp(new pthread_t); - if(0 != (result = pthread_create(pThreadUsr1.get(), nullptr, S3fsSignals::CheckCacheWorker, static_cast(pSemUsr1_tmp.get())))){ - S3FS_PRN_ERR("Could not create thread for SIGUSR1 by %d", result); - return false; - } + pThreadUsr1.reset(new std::thread(S3fsSignals::CheckCacheWorker, pSemUsr1_tmp.get())); pSemUsr1 = std::move(pSemUsr1_tmp); - pThreadUsr1 = std::move(pThreadUsr1_tmp); // set handler struct sigaction sa{}; @@ -230,12 +222,7 @@ bool S3fsSignals::DestroyUsr1Handler() pSemUsr1->post(); // wait for thread exiting - void* retval = nullptr; - int result; - if(0 != (result = pthread_join(*pThreadUsr1, &retval))){ - S3FS_PRN_ERR("Could not stop thread for SIGUSR1 by %d", result); - return false; - } + pThreadUsr1->join(); pSemUsr1.reset(); pThreadUsr1.reset(); diff --git a/src/sighandlers.h b/src/sighandlers.h index 4f8c74a..0c5b23e 100644 --- a/src/sighandlers.h +++ b/src/sighandlers.h @@ -22,6 +22,7 @@ #define S3FS_SIGHANDLERS_H_ #include +#include class Semaphore; @@ -34,14 +35,14 @@ class S3fsSignals static std::unique_ptr pSingleton; static bool enableUsr1; - std::unique_ptr pThreadUsr1; + std::unique_ptr pThreadUsr1; std::unique_ptr pSemUsr1; protected: static S3fsSignals* get() { return pSingleton.get(); } static void HandlerUSR1(int sig); - static void* CheckCacheWorker(void* arg); + static void CheckCacheWorker(Semaphore* pSem); static void HandlerUSR2(int sig); static bool InitUsr2Handler(); diff --git a/src/threadpoolman.cpp b/src/threadpoolman.cpp index 4fe5132..4059084 100644 --- a/src/threadpoolman.cpp +++ b/src/threadpoolman.cpp @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include "s3fs_logger.h" #include "threadpoolman.h" @@ -64,13 +66,12 @@ bool ThreadPoolMan::Instruct(const thpoolman_param& param) // // Thread worker // -void* ThreadPoolMan::Worker(void* arg) +void ThreadPoolMan::Worker(ThreadPoolMan* psingleton, std::promise promise) { - ThreadPoolMan* psingleton = static_cast(arg); - if(!psingleton){ S3FS_PRN_ERR("The parameter for worker thread is invalid."); - return reinterpret_cast(-EIO); + promise.set_value(-EIO); + return; } S3FS_PRN_INFO3("Start worker thread in ThreadPoolMan."); @@ -105,7 +106,7 @@ void* ThreadPoolMan::Worker(void* arg) } } - return nullptr; + promise.set_value(0); } //------------------------------------------------ @@ -158,14 +159,10 @@ bool ThreadPoolMan::StopThreads() } // wait for threads exiting - for(thread_list_t::const_iterator iter = thread_list.begin(); iter != thread_list.end(); ++iter){ - void* retval = nullptr; - int result = pthread_join(*iter, &retval); - if(result){ - S3FS_PRN_ERR("failed pthread_join - result(%d)", result); - }else{ - S3FS_PRN_DBG("succeed pthread_join - return code(%ld)", reinterpret_cast(retval)); - } + for(auto& pair : thread_list){ + pair.first.join(); + long retval = pair.second.get(); + S3FS_PRN_DBG("join succeeded - return code(%ld)", reinterpret_cast(retval)); } thread_list.clear(); @@ -195,14 +192,10 @@ bool ThreadPoolMan::StartThreads(int count) SetExitFlag(false); for(int cnt = 0; cnt < count; ++cnt){ // run thread - pthread_t thread; - int result; - if(0 != (result = pthread_create(&thread, nullptr, ThreadPoolMan::Worker, static_cast(this)))){ - S3FS_PRN_ERR("failed pthread_create with return code(%d)", result); - StopThreads(); // if possible, stop all threads - return false; - } - thread_list.push_back(thread); + std::promise promise; + std::future future = promise.get_future(); + std::thread thread(ThreadPoolMan::Worker, this, std::move(promise)); + thread_list.emplace_back(std::move(thread), std::move(future)); } return true; } diff --git a/src/threadpoolman.h b/src/threadpoolman.h index e8d7d75..3c5d906 100644 --- a/src/threadpoolman.h +++ b/src/threadpoolman.h @@ -22,6 +22,7 @@ #define S3FS_THREADPOOLMAN_H_ #include +#include #include #include #include @@ -34,7 +35,7 @@ // // Prototype function // -typedef void* (*thpoolman_worker)(void*); // same as start_routine for pthread_create function +typedef void* (*thpoolman_worker)(void*); // // Parameter structure @@ -53,8 +54,6 @@ struct thpoolman_param typedef std::list thpoolman_params_t; -typedef std::vector thread_list_t; - //------------------------------------------------ // Class ThreadPoolMan //------------------------------------------------ @@ -67,12 +66,12 @@ class ThreadPoolMan Semaphore thpoolman_sem; std::mutex thread_list_lock; - thread_list_t thread_list; + std::vector>> thread_list; thpoolman_params_t instruction_list; private: - static void* Worker(void* arg); + static void Worker(ThreadPoolMan* psingleton, std::promise promise); explicit ThreadPoolMan(int count = 1); ~ThreadPoolMan();