/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
*/
#pragma once

#include "ErrorHolder.h"
#include "Logging.h"
#include "Options.h"
#include "utils/Buffer.h"
#include "utils/Range.h"
#include "utils/ResourcePool.h"
#include "utils/ThreadPool.h"
#include "utils/WorkQueue.h"
#define ZSTD_STATIC_LINKING_ONLY
#define ZSTD_DISABLE_DEPRECATE_WARNINGS /* No deprecation warnings, pzstd itself is deprecated
                                        * and uses deprecated functions
                                        */
#include "zstd.h"
#undef ZSTD_STATIC_LINKING_ONLY

#include <cstddef>
#include <cstdint>
#include <memory>

namespace pzstd {
/**
* Runs pzstd with `options` and returns the number of bytes written.
* An error occurred if `errorHandler.hasError()`.
*
* @param options      The pzstd options to use for (de)compression
* @returns            0 upon success and non-zero on failure.
*/
int pzstdMain(const Options& options);

class SharedState {
public:
 SharedState(const Options& options) : log(options.verbosity) {
   if (!options.decompress) {
     auto parameters = options.determineParameters();
     cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
         [this, parameters]() -> ZSTD_CStream* {
           this->log(kLogVerbose, "%s\n", "Creating new ZSTD_CStream");
           auto zcs = ZSTD_createCStream();
           if (zcs) {
             auto err = ZSTD_initCStream_advanced(
                 zcs, nullptr, 0, parameters, 0);
             if (ZSTD_isError(err)) {
               ZSTD_freeCStream(zcs);
               return nullptr;
             }
           }
           return zcs;
         },
         [](ZSTD_CStream *zcs) {
           ZSTD_freeCStream(zcs);
         }});
   } else {
     dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
         [this]() -> ZSTD_DStream* {
           this->log(kLogVerbose, "%s\n", "Creating new ZSTD_DStream");
           auto zds = ZSTD_createDStream();
           if (zds) {
             auto err = ZSTD_initDStream(zds);
             if (ZSTD_isError(err)) {
               ZSTD_freeDStream(zds);
               return nullptr;
             }
           }
           return zds;
         },
         [](ZSTD_DStream *zds) {
           ZSTD_freeDStream(zds);
         }});
   }
 }

 ~SharedState() {
   // The resource pools have references to this, so destroy them first.
   cStreamPool.reset();
   dStreamPool.reset();
 }

 Logger log;
 ErrorHolder errorHolder;
 std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;
 std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool;
};

/**
* Streams input from `fd`, breaks input up into chunks, and compresses each
* chunk independently.  Output of each chunk gets streamed to a queue, and
* the output queues get put into `chunks` in order.
*
* @param state        The shared state
* @param chunks       Each compression jobs output queue gets `pushed()` here
*                      as soon as it is available
* @param executor     The thread pool to run compression jobs in
* @param fd           The input file descriptor
* @param size         The size of the input file if known, 0 otherwise
* @param numThreads   The number of threads in the thread pool
* @param parameters   The zstd parameters to use for compression
* @returns            The number of bytes read from the file
*/
std::uint64_t asyncCompressChunks(
   SharedState& state,
   WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
   ThreadPool& executor,
   FILE* fd,
   std::uintmax_t size,
   std::size_t numThreads,
   ZSTD_parameters parameters);

/**
* Streams input from `fd`.  If pzstd headers are available it breaks the input
* up into independent frames.  It sends each frame to an independent
* decompression job.  Output of each frame gets streamed to a queue, and
* the output queues get put into `frames` in order.
*
* @param state        The shared state
* @param frames       Each decompression jobs output queue gets `pushed()` here
*                      as soon as it is available
* @param executor     The thread pool to run compression jobs in
* @param fd           The input file descriptor
* @returns            The number of bytes read from the file
*/
std::uint64_t asyncDecompressFrames(
   SharedState& state,
   WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
   ThreadPool& executor,
   FILE* fd);

/**
* Streams input in from each queue in `outs` in order, and writes the data to
* `outputFd`.
*
* @param state        The shared state
* @param outs         A queue of output queues, one for each
*                      (de)compression job.
* @param outputFd     The file descriptor to write to
* @param decompress   Are we decompressing?
* @returns            The number of bytes written
*/
std::uint64_t writeFile(
   SharedState& state,
   WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
   FILE* outputFd,
   bool decompress);
}