diff options
Diffstat (limited to 'thirdparty/zstd/common/pool.c')
| -rw-r--r-- | thirdparty/zstd/common/pool.c | 36 |
1 files changed, 26 insertions, 10 deletions
diff --git a/thirdparty/zstd/common/pool.c b/thirdparty/zstd/common/pool.c index 2e37cdd73c..d5ca5a7808 100644 --- a/thirdparty/zstd/common/pool.c +++ b/thirdparty/zstd/common/pool.c @@ -1,5 +1,5 @@ /* - * Copyright (c) Yann Collet, Facebook, Inc. + * Copyright (c) Meta Platforms, Inc. and affiliates. * All rights reserved. * * This source code is licensed under both the BSD-style license (found in the @@ -10,9 +10,9 @@ /* ====== Dependencies ======= */ +#include "../common/allocations.h" /* ZSTD_customCalloc, ZSTD_customFree */ #include "zstd_deps.h" /* size_t */ #include "debug.h" /* assert */ -#include "zstd_internal.h" /* ZSTD_customMalloc, ZSTD_customFree */ #include "pool.h" /* ====== Compiler specifics ====== */ @@ -96,9 +96,7 @@ static void* POOL_thread(void* opaque) { /* If the intended queue size was 0, signal after finishing job */ ZSTD_pthread_mutex_lock(&ctx->queueMutex); ctx->numThreadsBusy--; - if (ctx->queueSize == 1) { - ZSTD_pthread_cond_signal(&ctx->queuePushCond); - } + ZSTD_pthread_cond_signal(&ctx->queuePushCond); ZSTD_pthread_mutex_unlock(&ctx->queueMutex); } } /* for (;;) */ @@ -128,7 +126,7 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, * empty and full queues. */ ctx->queueSize = queueSize + 1; - ctx->queue = (POOL_job*)ZSTD_customMalloc(ctx->queueSize * sizeof(POOL_job), customMem); + ctx->queue = (POOL_job*)ZSTD_customCalloc(ctx->queueSize * sizeof(POOL_job), customMem); ctx->queueHead = 0; ctx->queueTail = 0; ctx->numThreadsBusy = 0; @@ -142,7 +140,7 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, } ctx->shutdown = 0; /* Allocate space for the thread handles */ - ctx->threads = (ZSTD_pthread_t*)ZSTD_customMalloc(numThreads * sizeof(ZSTD_pthread_t), customMem); + ctx->threads = (ZSTD_pthread_t*)ZSTD_customCalloc(numThreads * sizeof(ZSTD_pthread_t), customMem); ctx->threadCapacity = 0; ctx->customMem = customMem; /* Check for errors */ @@ -175,7 +173,7 @@ static void POOL_join(POOL_ctx* ctx) { /* Join all of the threads */ { size_t i; for (i = 0; i < ctx->threadCapacity; ++i) { - ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */ + ZSTD_pthread_join(ctx->threads[i]); /* note : could fail */ } } } @@ -190,6 +188,17 @@ void POOL_free(POOL_ctx *ctx) { ZSTD_customFree(ctx, ctx->customMem); } +/*! POOL_joinJobs() : + * Waits for all queued jobs to finish executing. + */ +void POOL_joinJobs(POOL_ctx* ctx) { + ZSTD_pthread_mutex_lock(&ctx->queueMutex); + while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) { + ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); + } + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); +} + void ZSTD_freeThreadPool (ZSTD_threadPool* pool) { POOL_free (pool); } @@ -211,7 +220,7 @@ static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads) return 0; } /* numThreads > threadCapacity */ - { ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_customMalloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem); + { ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_customCalloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem); if (!threadPool) return 1; /* replace existing thread pool */ ZSTD_memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool)); @@ -262,7 +271,9 @@ static int isQueueFull(POOL_ctx const* ctx) { static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque) { - POOL_job const job = {function, opaque}; + POOL_job job; + job.function = function; + job.opaque = opaque; assert(ctx != NULL); if (ctx->shutdown) return; @@ -330,6 +341,11 @@ void POOL_free(POOL_ctx* ctx) { (void)ctx; } +void POOL_joinJobs(POOL_ctx* ctx){ + assert(!ctx || ctx == &g_poolCtx); + (void)ctx; +} + int POOL_resize(POOL_ctx* ctx, size_t numThreads) { (void)ctx; (void)numThreads; return 0; |
