diff options
Diffstat (limited to 'thirdparty/zstd/compress/zstdmt_compress.c')
-rw-r--r-- | thirdparty/zstd/compress/zstdmt_compress.c | 173 |
1 files changed, 94 insertions, 79 deletions
diff --git a/thirdparty/zstd/compress/zstdmt_compress.c b/thirdparty/zstd/compress/zstdmt_compress.c index 6786075569..86ccce3184 100644 --- a/thirdparty/zstd/compress/zstdmt_compress.c +++ b/thirdparty/zstd/compress/zstdmt_compress.c @@ -15,17 +15,13 @@ #endif -/* ====== Constants ====== */ -#define ZSTDMT_OVERLAPLOG_DEFAULT 0 - - /* ====== Dependencies ====== */ -#include "../common/allocations.h" /* ZSTD_customMalloc, ZSTD_customCalloc, ZSTD_customFree */ +#include "../common/allocations.h" /* ZSTD_customMalloc, ZSTD_customCalloc, ZSTD_customFree */ #include "../common/zstd_deps.h" /* ZSTD_memcpy, ZSTD_memset, INT_MAX, UINT_MAX */ #include "../common/mem.h" /* MEM_STATIC */ #include "../common/pool.h" /* threadpool */ #include "../common/threading.h" /* mutex */ -#include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */ +#include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */ #include "zstd_ldm.h" #include "zstdmt_compress.h" @@ -44,12 +40,13 @@ # include <unistd.h> # include <sys/times.h> -# define DEBUG_PRINTHEX(l,p,n) { \ - unsigned debug_u; \ - for (debug_u=0; debug_u<(n); debug_u++) \ - RAWLOG(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \ - RAWLOG(l, " \n"); \ -} +# define DEBUG_PRINTHEX(l,p,n) \ + do { \ + unsigned debug_u; \ + for (debug_u=0; debug_u<(n); debug_u++) \ + RAWLOG(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \ + RAWLOG(l, " \n"); \ + } while (0) static unsigned long long GetCurrentClockTimeMicroseconds(void) { @@ -61,25 +58,28 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void) } } #define MUTEX_WAIT_TIME_DLEVEL 6 -#define ZSTD_PTHREAD_MUTEX_LOCK(mutex) { \ - if (DEBUGLEVEL >= MUTEX_WAIT_TIME_DLEVEL) { \ - unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \ - ZSTD_pthread_mutex_lock(mutex); \ - { unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \ - unsigned long long const elapsedTime = (afterTime-beforeTime); \ - if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \ - DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread took %llu microseconds to acquire mutex %s \n", \ - elapsedTime, #mutex); \ - } } \ - } else { \ - ZSTD_pthread_mutex_lock(mutex); \ - } \ -} +#define ZSTD_PTHREAD_MUTEX_LOCK(mutex) \ + do { \ + if (DEBUGLEVEL >= MUTEX_WAIT_TIME_DLEVEL) { \ + unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \ + ZSTD_pthread_mutex_lock(mutex); \ + { unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \ + unsigned long long const elapsedTime = (afterTime-beforeTime); \ + if (elapsedTime > 1000) { \ + /* or whatever threshold you like; I'm using 1 millisecond here */ \ + DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, \ + "Thread took %llu microseconds to acquire mutex %s \n", \ + elapsedTime, #mutex); \ + } } \ + } else { \ + ZSTD_pthread_mutex_lock(mutex); \ + } \ + } while (0) #else # define ZSTD_PTHREAD_MUTEX_LOCK(m) ZSTD_pthread_mutex_lock(m) -# define DEBUG_PRINTHEX(l,p,n) {} +# define DEBUG_PRINTHEX(l,p,n) do { } while (0) #endif @@ -100,18 +100,39 @@ typedef struct ZSTDMT_bufferPool_s { unsigned totalBuffers; unsigned nbBuffers; ZSTD_customMem cMem; - buffer_t bTable[1]; /* variable size */ + buffer_t* buffers; } ZSTDMT_bufferPool; +static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool) +{ + DEBUGLOG(3, "ZSTDMT_freeBufferPool (address:%08X)", (U32)(size_t)bufPool); + if (!bufPool) return; /* compatibility with free on NULL */ + if (bufPool->buffers) { + unsigned u; + for (u=0; u<bufPool->totalBuffers; u++) { + DEBUGLOG(4, "free buffer %2u (address:%08X)", u, (U32)(size_t)bufPool->buffers[u].start); + ZSTD_customFree(bufPool->buffers[u].start, bufPool->cMem); + } + ZSTD_customFree(bufPool->buffers, bufPool->cMem); + } + ZSTD_pthread_mutex_destroy(&bufPool->poolMutex); + ZSTD_customFree(bufPool, bufPool->cMem); +} + static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned maxNbBuffers, ZSTD_customMem cMem) { - ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_customCalloc( - sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem); + ZSTDMT_bufferPool* const bufPool = + (ZSTDMT_bufferPool*)ZSTD_customCalloc(sizeof(ZSTDMT_bufferPool), cMem); if (bufPool==NULL) return NULL; if (ZSTD_pthread_mutex_init(&bufPool->poolMutex, NULL)) { ZSTD_customFree(bufPool, cMem); return NULL; } + bufPool->buffers = (buffer_t*)ZSTD_customCalloc(maxNbBuffers * sizeof(buffer_t), cMem); + if (bufPool->buffers==NULL) { + ZSTDMT_freeBufferPool(bufPool); + return NULL; + } bufPool->bufferSize = 64 KB; bufPool->totalBuffers = maxNbBuffers; bufPool->nbBuffers = 0; @@ -119,32 +140,19 @@ static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned maxNbBuffers, ZSTD_cu return bufPool; } -static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool) -{ - unsigned u; - DEBUGLOG(3, "ZSTDMT_freeBufferPool (address:%08X)", (U32)(size_t)bufPool); - if (!bufPool) return; /* compatibility with free on NULL */ - for (u=0; u<bufPool->totalBuffers; u++) { - DEBUGLOG(4, "free buffer %2u (address:%08X)", u, (U32)(size_t)bufPool->bTable[u].start); - ZSTD_customFree(bufPool->bTable[u].start, bufPool->cMem); - } - ZSTD_pthread_mutex_destroy(&bufPool->poolMutex); - ZSTD_customFree(bufPool, bufPool->cMem); -} - /* only works at initialization, not during compression */ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool) { - size_t const poolSize = sizeof(*bufPool) - + (bufPool->totalBuffers - 1) * sizeof(buffer_t); + size_t const poolSize = sizeof(*bufPool); + size_t const arraySize = bufPool->totalBuffers * sizeof(buffer_t); unsigned u; size_t totalBufferSize = 0; ZSTD_pthread_mutex_lock(&bufPool->poolMutex); for (u=0; u<bufPool->totalBuffers; u++) - totalBufferSize += bufPool->bTable[u].capacity; + totalBufferSize += bufPool->buffers[u].capacity; ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); - return poolSize + totalBufferSize; + return poolSize + arraySize + totalBufferSize; } /* ZSTDMT_setBufferSize() : @@ -187,9 +195,9 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) DEBUGLOG(5, "ZSTDMT_getBuffer: bSize = %u", (U32)bufPool->bufferSize); ZSTD_pthread_mutex_lock(&bufPool->poolMutex); if (bufPool->nbBuffers) { /* try to use an existing buffer */ - buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)]; + buffer_t const buf = bufPool->buffers[--(bufPool->nbBuffers)]; size_t const availBufferSize = buf.capacity; - bufPool->bTable[bufPool->nbBuffers] = g_nullBuffer; + bufPool->buffers[bufPool->nbBuffers] = g_nullBuffer; if ((availBufferSize >= bSize) & ((availBufferSize>>3) <= bSize)) { /* large enough, but not too much */ DEBUGLOG(5, "ZSTDMT_getBuffer: provide buffer %u of size %u", @@ -250,14 +258,14 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) if (buf.start == NULL) return; /* compatible with release on NULL */ ZSTD_pthread_mutex_lock(&bufPool->poolMutex); if (bufPool->nbBuffers < bufPool->totalBuffers) { - bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */ + bufPool->buffers[bufPool->nbBuffers++] = buf; /* stored for later use */ DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u", (U32)buf.capacity, (U32)(bufPool->nbBuffers-1)); ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); return; } ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); - /* Reached bufferPool capacity (should not happen) */ + /* Reached bufferPool capacity (note: should not happen) */ DEBUGLOG(5, "ZSTDMT_releaseBuffer: pool capacity reached => freeing "); ZSTD_customFree(buf.start, bufPool->cMem); } @@ -350,16 +358,20 @@ typedef struct { int totalCCtx; int availCCtx; ZSTD_customMem cMem; - ZSTD_CCtx* cctx[1]; /* variable size */ + ZSTD_CCtx** cctxs; } ZSTDMT_CCtxPool; -/* note : all CCtx borrowed from the pool should be released back to the pool _before_ freeing the pool */ +/* note : all CCtx borrowed from the pool must be reverted back to the pool _before_ freeing the pool */ static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool) { - int cid; - for (cid=0; cid<pool->totalCCtx; cid++) - ZSTD_freeCCtx(pool->cctx[cid]); /* note : compatible with free on NULL */ + if (!pool) return; ZSTD_pthread_mutex_destroy(&pool->poolMutex); + if (pool->cctxs) { + int cid; + for (cid=0; cid<pool->totalCCtx; cid++) + ZSTD_freeCCtx(pool->cctxs[cid]); /* free compatible with NULL */ + ZSTD_customFree(pool->cctxs, pool->cMem); + } ZSTD_customFree(pool, pool->cMem); } @@ -368,19 +380,24 @@ static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool) static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(int nbWorkers, ZSTD_customMem cMem) { - ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_customCalloc( - sizeof(ZSTDMT_CCtxPool) + (nbWorkers-1)*sizeof(ZSTD_CCtx*), cMem); + ZSTDMT_CCtxPool* const cctxPool = + (ZSTDMT_CCtxPool*) ZSTD_customCalloc(sizeof(ZSTDMT_CCtxPool), cMem); assert(nbWorkers > 0); if (!cctxPool) return NULL; if (ZSTD_pthread_mutex_init(&cctxPool->poolMutex, NULL)) { ZSTD_customFree(cctxPool, cMem); return NULL; } - cctxPool->cMem = cMem; cctxPool->totalCCtx = nbWorkers; + cctxPool->cctxs = (ZSTD_CCtx**)ZSTD_customCalloc(nbWorkers * sizeof(ZSTD_CCtx*), cMem); + if (!cctxPool->cctxs) { + ZSTDMT_freeCCtxPool(cctxPool); + return NULL; + } + cctxPool->cMem = cMem; + cctxPool->cctxs[0] = ZSTD_createCCtx_advanced(cMem); + if (!cctxPool->cctxs[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; } cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ - cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem); - if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; } DEBUGLOG(3, "cctxPool created, with %u workers", nbWorkers); return cctxPool; } @@ -402,16 +419,16 @@ static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool) { ZSTD_pthread_mutex_lock(&cctxPool->poolMutex); { unsigned const nbWorkers = cctxPool->totalCCtx; - size_t const poolSize = sizeof(*cctxPool) - + (nbWorkers-1) * sizeof(ZSTD_CCtx*); - unsigned u; + size_t const poolSize = sizeof(*cctxPool); + size_t const arraySize = cctxPool->totalCCtx * sizeof(ZSTD_CCtx*); size_t totalCCtxSize = 0; + unsigned u; for (u=0; u<nbWorkers; u++) { - totalCCtxSize += ZSTD_sizeof_CCtx(cctxPool->cctx[u]); + totalCCtxSize += ZSTD_sizeof_CCtx(cctxPool->cctxs[u]); } ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex); assert(nbWorkers > 0); - return poolSize + totalCCtxSize; + return poolSize + arraySize + totalCCtxSize; } } @@ -421,7 +438,7 @@ static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool) ZSTD_pthread_mutex_lock(&cctxPool->poolMutex); if (cctxPool->availCCtx) { cctxPool->availCCtx--; - { ZSTD_CCtx* const cctx = cctxPool->cctx[cctxPool->availCCtx]; + { ZSTD_CCtx* const cctx = cctxPool->cctxs[cctxPool->availCCtx]; ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex); return cctx; } } @@ -435,7 +452,7 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) if (cctx==NULL) return; /* compatibility with release on NULL */ ZSTD_pthread_mutex_lock(&pool->poolMutex); if (pool->availCCtx < pool->totalCCtx) - pool->cctx[pool->availCCtx++] = cctx; + pool->cctxs[pool->availCCtx++] = cctx; else { /* pool overflow : should not happen, since totalCCtx==nbWorkers */ DEBUGLOG(4, "CCtx pool overflow : free cctx"); @@ -601,11 +618,8 @@ static void ZSTDMT_serialState_update(serialState_t* serialState, ZSTD_pthread_mutex_unlock(&serialState->mutex); if (seqStore.size > 0) { - size_t const err = ZSTD_referenceExternalSequences( - jobCCtx, seqStore.seq, seqStore.size); + ZSTD_referenceExternalSequences(jobCCtx, seqStore.seq, seqStore.size); assert(serialState->params.ldmParams.enableLdm == ZSTD_ps_enable); - assert(!ZSTD_isError(err)); - (void)err; } } @@ -657,12 +671,13 @@ typedef struct { unsigned frameChecksumNeeded; /* used only by mtctx */ } ZSTDMT_jobDescription; -#define JOB_ERROR(e) { \ - ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); \ - job->cSize = e; \ - ZSTD_pthread_mutex_unlock(&job->job_mutex); \ - goto _endJob; \ -} +#define JOB_ERROR(e) \ + do { \ + ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); \ + job->cSize = e; \ + ZSTD_pthread_mutex_unlock(&job->job_mutex); \ + goto _endJob; \ + } while (0) /* ZSTDMT_compressionJob() is a POOL_function type */ static void ZSTDMT_compressionJob(void* jobDescription) @@ -1091,7 +1106,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) { unsigned jobNb; unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", - mtctx->doneJobID, lastJobNb, mtctx->jobReady) + mtctx->doneJobID, lastJobNb, mtctx->jobReady); for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) { unsigned const wJobID = jobNb & mtctx->jobIDMask; ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID]; |