diff --git a/camerad/CMakeLists.txt b/camerad/CMakeLists.txt index 3cd43b6..cbb8bdd 100644 --- a/camerad/CMakeLists.txt +++ b/camerad/CMakeLists.txt @@ -128,7 +128,7 @@ list (APPEND INTERFACE_SOURCES add_library(${INTERFACE_TARGET} ${INTERFACE_SOURCES}) target_link_libraries(${INTERFACE_TARGET} common - shared_memory_writer + frame_output_factory ) target_include_directories(${INTERFACE_TARGET} PUBLIC ${INTERFACE_INCLUDES}) @@ -173,7 +173,6 @@ target_link_libraries(camerad network utilities logentry - shared_memory_writer ${INTERFACE_TARGET} ${INTERFACE_LIBS} ${CMAKE_THREAD_LIBS_INIT} diff --git a/camerad/archon_controller.cpp b/camerad/archon_controller.cpp index cb22b6d..bbf808d 100644 --- a/camerad/archon_controller.cpp +++ b/camerad/archon_controller.cpp @@ -1584,8 +1584,6 @@ namespace Camera { } else error=this->fetchlog(); - this->is_camera_mode = false; // require that a mode be selected after loading new firmware - return error; } /***** Camera::ArchonController::load_acf ***********************************/ diff --git a/camerad/archon_controller.h b/camerad/archon_controller.h index 1d61b69..a4d0a04 100644 --- a/camerad/archon_controller.h +++ b/camerad/archon_controller.h @@ -294,7 +294,6 @@ namespace Camera { std::atomic_flag archon_busy = ATOMIC_FLAG_INIT; //!< indicates a thread is accessing Archon bool is_firmwareloaded; std::string firmware; - bool is_camera_mode{false}; //!< has a camera mode been selected int msgref; std::string backplaneversion; std::vector modtype; //!< type of each module from SYSTEM command diff --git a/camerad/archon_exposure_modes.cpp b/camerad/archon_exposure_modes.cpp index 892dc82..a23a3a8 100644 --- a/camerad/archon_exposure_modes.cpp +++ b/camerad/archon_exposure_modes.cpp @@ -7,6 +7,8 @@ #include "archon_exposure_modes.h" #include "archon_interface.h" +#include + namespace Camera { /***** Camera::ExposureModeSingle *******************************************/ @@ -111,43 +113,49 @@ namespace Camera { /***** Camera::ExposureModeSingle::image_processing_thread ******************/ /** - * @brief implementation of Archon-specific expose for Single - * + * @brief Consumer thread: pop each frame off the queue and fan out to + * every configured FrameOutput on the interface */ void ExposureModeSingle::image_processing_thread() { const std::string function("Camera::ExposureModeSingle::image_processing_thread"); logwrite(function, "enter"); -// open FITS file ? + auto* camera_info = &this->interface->camera_info; + auto* controller = this->interface->controller; + auto* mode = &controller->modemap[controller->selectedmode]; + const size_t bufferbytes = static_cast(camera_info->image_data_bytes) * camera_info->cubedepth; + const uint32_t bpp = (mode->samplemode == 1) ? 4 : 2; + const uint32_t width = static_cast(mode->geometry.pixelcount); + const uint32_t height = static_cast(mode->geometry.linecount); - // pop an image out of the queue, - // wait until producer stops producing data, or aborted - // while (!this->interface->is_aborted()) { + std::shared_ptr buf; { - std::unique_lock lock(this->queue_mutex); - // keep trying to get the queue lock until success or aborted - this->queue_cv.wait(lock, [this] { - return !this->imagebuf_queue.empty() || this->is_producer_finished || this->interface->is_aborted(); - }); - if (this->interface->is_aborted()) break; - if (this->imagebuf_queue.empty()) { - if (this->is_producer_finished) { - logwrite(function, "queue empty and producer finished"); - break; - } - else { - logwrite(function, "queue empty, producer not finished"); + std::unique_lock lock(this->queue_mutex); + this->queue_cv.wait(lock, [this] { + return !this->imagebuf_queue.empty() || this->is_producer_finished || this->interface->is_aborted(); + }); + if (this->interface->is_aborted()) break; + if (this->imagebuf_queue.empty()) { + if (this->is_producer_finished) { + logwrite(function, "queue empty and producer finished"); + break; + } continue; } + buf = this->imagebuf_queue.front(); + this->imagebuf_queue.pop(); } - this->imagebuf_queue.pop(); - } -// process_image - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + Camera::FrameMetadata meta; + meta.frame_number = buf->bufframen_slice.empty() ? 0 : static_cast(buf->bufframen_slice[0]); + meta.timestamp = buf->buftimestamp_slice.empty() ? 0 : buf->buftimestamp_slice[0]; + meta.width = width; + meta.height = height; + meta.bytes_per_pixel = bpp; + this->interface->dispatch_frame(buf->rawpixels.get(), bufferbytes, meta); } -// close FITS file ? logwrite(function, "exit"); } /***** Camera::ExposureModeSingle::image_processing_thread ******************/ diff --git a/camerad/archon_interface.cpp b/camerad/archon_interface.cpp index 04cb665..156b0e1 100644 --- a/camerad/archon_interface.cpp +++ b/camerad/archon_interface.cpp @@ -807,9 +807,8 @@ namespace Camera { // if we made it all the way to the end then this is the selected mode this->controller->selectedmode = modeselect; - this->controller->is_camera_mode = true; - std::string target = ArchonExposureMode::SINGLE; + std::string target = this->default_exposure_mode_name(); for (const auto &m : this->get_exposure_modes()) { if (m == modeselect) { target = modeselect; break; } } diff --git a/camerad/archon_interface.h b/camerad/archon_interface.h index be985f5..8db6bcd 100644 --- a/camerad/archon_interface.h +++ b/camerad/archon_interface.h @@ -68,6 +68,9 @@ namespace Camera { long set_vcpu_inreg(const std::string &args, std::string &retstring); long autofetch_mode(const std::string &args, std::string &retstring); + // Fallback for set_camera_mode when the camera-mode name is unknown + virtual std::string default_exposure_mode_name() const { return "SINGLE"; } + char* get_framebuf() { return controller->framebuf; } bool is_autofetch_mode{false}; diff --git a/camerad/camera_interface.h b/camerad/camera_interface.h index 542986e..629d1aa 100644 --- a/camerad/camera_interface.h +++ b/camerad/camera_interface.h @@ -12,6 +12,10 @@ #include "camera_information.h" #include "camerad_commands.h" #include "exposure_modes.h" +#include "frame_output.h" + +#include +#include namespace Camera { @@ -54,6 +58,16 @@ namespace Camera { Config configfile; Camera::Information camera_info; + + // Frame output destinations populated by Camera::make_frame_outputs() + std::vector> frame_outputs; + + // Fan a frame out to every configured FrameOutput + void dispatch_frame(const char* data, size_t size, const FrameMetadata &meta) { + for (auto &output : this->frame_outputs) { + output->write(data, size, meta); + } + } // Common::FitsKeys systemkeys; move to Camera::Information? // These functions are shared by all interfaces with common implementations, diff --git a/camerad/exposure_modes.h b/camerad/exposure_modes.h index 26ba477..ecf7735 100644 --- a/camerad/exposure_modes.h +++ b/camerad/exposure_modes.h @@ -48,7 +48,7 @@ namespace Camera { } virtual ~ExposureMode() = default; - virtual long expose() = 0; + virtual long expose() { return NO_ERROR; } virtual void image_acquisition_thread() { }; virtual void image_processing_thread() { }; diff --git a/utils/CMakeLists.txt b/utils/CMakeLists.txt index 47a17c6..61ff1e0 100644 --- a/utils/CMakeLists.txt +++ b/utils/CMakeLists.txt @@ -29,6 +29,28 @@ add_library(shared_memory_writer STATIC target_include_directories(shared_memory_writer PRIVATE ${PROJECT_BASE_DIR}/common ${PROJECT_BASE_DIR}/utils) target_link_libraries(shared_memory_writer nlohmann_json::nlohmann_json $,rt,>) +add_library(fits_writer STATIC + ${PROJECT_UTILS_DIR}/fits_writer.cpp +) +target_include_directories(fits_writer PRIVATE ${PROJECT_BASE_DIR}/common ${PROJECT_BASE_DIR}/utils) +find_library(FITSWRITER_CCFITS_LIB CCfits NAMES libCCfits PATHS /usr/local/lib /opt/homebrew/lib) +find_library(FITSWRITER_CFITS_LIB cfitsio NAMES libcfitsio PATHS /usr/local/lib /opt/homebrew/lib) +target_link_libraries(fits_writer + nlohmann_json::nlohmann_json + ${FITSWRITER_CCFITS_LIB} + ${FITSWRITER_CFITS_LIB} +) + +add_library(frame_output_factory STATIC + ${PROJECT_UTILS_DIR}/frame_output_factory.cpp +) +target_include_directories(frame_output_factory PRIVATE ${PROJECT_BASE_DIR}/common ${PROJECT_BASE_DIR}/utils) +target_link_libraries(frame_output_factory + nlohmann_json::nlohmann_json + shared_memory_writer + fits_writer +) + add_library(md5 STATIC ${PROJECT_UTILS_DIR}/md5.cpp ) diff --git a/utils/fits_writer.cpp b/utils/fits_writer.cpp new file mode 100644 index 0000000..6880cc4 --- /dev/null +++ b/utils/fits_writer.cpp @@ -0,0 +1,233 @@ +/** + * @file fits_writer.cpp + * @brief FrameOutput implementation that writes FITS files asynchronously + * + * Producer enqueues from the readout thread; a dedicated worker thread + * drains the queue and writes one FITS file per frame via CCfits. + */ + +#include "fits_writer.h" +#include "common.h" +#include "utilities.h" + +#include +#include +#include +#include +#include +#include + +namespace Camera { + + FitsWriter::FitsWriter(FitsWriterConfig cfg) + : cfg_(std::move(cfg)) { + } + + FitsWriter::~FitsWriter() { + this->close(); + } + + long FitsWriter::open() { + const std::string function("Camera::FitsWriter::open"); + + bool expected = false; + if (!started_.compare_exchange_strong(expected, true)) { + logwrite(function, "ERROR already opened"); + return ERROR; + } + + if (cfg_.queue_size == 0) { + logwrite(function, "ERROR queue_size must be > 0"); + started_.store(false); + return ERROR; + } + + std::error_code ec; + if (!std::filesystem::is_directory(cfg_.output_dir, ec)) { + logwrite(function, "ERROR output_dir does not exist: " + cfg_.output_dir); + started_.store(false); + return ERROR; + } + + stop_.store(false); + worker_ = std::thread(&FitsWriter::worker_loop, this); + + logwrite(function, "started: dir=" + cfg_.output_dir + + " basename=" + cfg_.basename + + " queue_size=" + std::to_string(cfg_.queue_size) + + " write_interval_ms=" + std::to_string(cfg_.write_interval_ms)); + return NO_ERROR; + } + + long FitsWriter::write(const char* data, size_t size, const FrameMetadata& meta) { + if (!started_.load()) return ERROR; + + // Cadence gate — skipped frames never enter the queue + if (cfg_.write_interval_ms > 0) { + const auto now = std::chrono::steady_clock::now(); + const auto interval = std::chrono::milliseconds(cfg_.write_interval_ms); + if (now - last_accepted_ < interval) { + n_skipped_cadence_.fetch_add(1, std::memory_order_relaxed); + return NO_ERROR; + } + last_accepted_ = now; + } + + QueuedFrame frame; + frame.meta = meta; + frame.data.assign(data, data + size); // the one memcpy — outside the lock + + { + std::lock_guard lock(mtx_); + if (queue_.size() >= cfg_.queue_size) { + queue_.pop_front(); + n_dropped_queue_.fetch_add(1, std::memory_order_relaxed); + } + queue_.push_back(std::move(frame)); + } + n_received_.fetch_add(1, std::memory_order_relaxed); + cv_.notify_one(); + return NO_ERROR; + } + + void FitsWriter::close() { + if (!started_.load()) return; + + stop_time_ = std::chrono::steady_clock::now(); + stop_.store(true); + cv_.notify_all(); + + if (worker_.joinable()) worker_.join(); + started_.store(false); + + const std::string function("Camera::FitsWriter::close"); + logwrite(function, "stopped: received=" + std::to_string(n_received_.load()) + + " written=" + std::to_string(n_written_.load()) + + " dropped_queue=" + std::to_string(n_dropped_queue_.load()) + + " skipped_cadence=" + std::to_string(n_skipped_cadence_.load()) + + " failed=" + std::to_string(n_failed_.load()) + + " dropped_shutdown=" + std::to_string(n_dropped_shutdown_.load())); + } + + FitsWriter::Stats FitsWriter::stats() const { + Stats s; + s.frames_received = n_received_.load(); + s.frames_written = n_written_.load(); + s.frames_dropped_queue = n_dropped_queue_.load(); + s.frames_skipped_cadence = n_skipped_cadence_.load(); + s.frames_failed = n_failed_.load(); + s.frames_dropped_shutdown= n_dropped_shutdown_.load(); + return s; + } + + void FitsWriter::worker_loop() { + const auto drain_timeout = std::chrono::milliseconds(cfg_.drain_timeout_ms); + + while (true) { + QueuedFrame frame; + { + std::unique_lock lock(mtx_); + cv_.wait(lock, [this]{ return !queue_.empty() || stop_.load(); }); + + if (queue_.empty()) return; // stop_ is set and we're drained + + if (stop_.load()) { + const auto elapsed = std::chrono::steady_clock::now() - stop_time_; + if (elapsed > drain_timeout) { + n_dropped_shutdown_.fetch_add(queue_.size(), std::memory_order_relaxed); + queue_.clear(); + return; + } + } + + frame = std::move(queue_.front()); + queue_.pop_front(); + } + + if (write_fits_file(frame) == NO_ERROR) { + n_written_.fetch_add(1, std::memory_order_relaxed); + } else { + n_failed_.fetch_add(1, std::memory_order_relaxed); + } + } + } + + long FitsWriter::write_fits_file(const QueuedFrame &frame) { + const std::string function("Camera::FitsWriter::write_fits_file"); + const auto &meta = frame.meta; + + if (meta.width == 0 || meta.height == 0 || meta.bytes_per_pixel == 0) { + logwrite(function, "ERROR invalid frame metadata"); + return ERROR; + } + if (meta.bytes_per_pixel != 2 && meta.bytes_per_pixel != 4) { + logwrite(function, "ERROR unsupported bytes_per_pixel=" + + std::to_string(meta.bytes_per_pixel)); + return ERROR; + } + + const size_t npixels = static_cast(meta.width) * meta.height; + const size_t expected_bytes = npixels * meta.bytes_per_pixel; + if (frame.data.size() < expected_bytes) { + logwrite(function, "ERROR frame data " + std::to_string(frame.data.size()) + + " < expected " + std::to_string(expected_bytes)); + return ERROR; + } + + const std::string filename = make_filename(meta.frame_number); + // CCfits requires a non-existing path; "!" prefix would overwrite, + // but make_filename() already resolved any conflict + const int bitpix = (meta.bytes_per_pixel == 2) ? USHORT_IMG : ULONG_IMG; + long axes[2] = { static_cast(meta.width), + static_cast(meta.height) }; + + try { + auto pFits = std::make_unique(filename, bitpix, 2, axes); + auto &phdu = pFits->pHDU(); + + phdu.addKey("FRAMENO", static_cast(meta.frame_number), + "Frame number"); + phdu.addKey("TIMESTMP", static_cast(meta.timestamp), + "Archon timestamp (0.01 us units)"); + phdu.addKey("DATE", get_timestamp(), "FITS file write time"); + + const long first_pixel = 1; + if (meta.bytes_per_pixel == 2) { + const auto *src = reinterpret_cast(frame.data.data()); + std::valarray data(src, npixels); + phdu.write(first_pixel, npixels, data); + } else { + const auto *src = reinterpret_cast(frame.data.data()); + std::valarray data(src, npixels); + phdu.write(first_pixel, npixels, data); + } + } + catch (const CCfits::FitsException &e) { + logwrite(function, "ERROR FITS exception writing " + filename + ": " + e.message()); + return ERROR; + } + catch (const std::exception &e) { + logwrite(function, "ERROR exception writing " + filename + ": " + e.what()); + return ERROR; + } + + return NO_ERROR; + } + + std::string FitsWriter::make_filename(uint64_t frame_number) const { + char num[32]; + std::snprintf(num, sizeof(num), "%08llu", + static_cast(frame_number)); + + const std::string prefix = cfg_.output_dir + "/" + cfg_.basename + "_" + num; + std::string filename = prefix + ".fits"; + + int suffix = 1; + while (std::filesystem::exists(filename)) { + filename = prefix + "_" + std::to_string(suffix) + ".fits"; + suffix++; + } + return filename; + } + +} diff --git a/utils/fits_writer.h b/utils/fits_writer.h new file mode 100644 index 0000000..1ef8fb8 --- /dev/null +++ b/utils/fits_writer.h @@ -0,0 +1,90 @@ +/** + * @file fits_writer.h + * @brief FrameOutput implementation that writes FITS files asynchronously + * + * Producer calls write() from the readout thread; data is memcpy'd into + * a bounded queue (drop-oldest on overflow) and a dedicated worker thread + * drains the queue to disk. write() never touches CCfits or blocks on + * disk I/O. + */ +#pragma once + +#include "frame_output.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Camera { + + struct FitsWriterConfig { + std::string output_dir{"/tmp"}; + std::string basename{"tracking"}; + uint32_t write_interval_ms{0}; // 0 = write every accepted frame + size_t queue_size{32}; + uint32_t drain_timeout_ms{5000}; + }; + + class FitsWriter : public FrameOutput { + public: + explicit FitsWriter(FitsWriterConfig cfg); + ~FitsWriter() override; + + FitsWriter(const FitsWriter&) = delete; + FitsWriter& operator=(const FitsWriter&) = delete; + + long open() override; + long write(const char* data, size_t size, const FrameMetadata& meta) override; + void close() override; + + struct Stats { + uint64_t frames_received{0}; + uint64_t frames_written{0}; + uint64_t frames_dropped_queue{0}; + uint64_t frames_skipped_cadence{0}; + uint64_t frames_failed{0}; + uint64_t frames_dropped_shutdown{0}; + }; + Stats stats() const; + + private: + struct QueuedFrame { + FrameMetadata meta; + std::vector data; + }; + + void worker_loop(); + long write_fits_file(const QueuedFrame &frame); + std::string make_filename(uint64_t frame_number) const; + + FitsWriterConfig cfg_; + + std::deque queue_; + mutable std::mutex mtx_; + std::condition_variable cv_; + + std::atomic stop_{false}; + std::atomic started_{false}; + std::thread worker_; + // Set in close() before stop_, so worker can read race-free + std::chrono::steady_clock::time_point stop_time_; + + // Cadence-gate state — only touched on producer thread + std::chrono::steady_clock::time_point last_accepted_{ + std::chrono::steady_clock::time_point::min()}; + + std::atomic n_received_{0}; + std::atomic n_written_{0}; + std::atomic n_dropped_queue_{0}; + std::atomic n_skipped_cadence_{0}; + std::atomic n_failed_{0}; + std::atomic n_dropped_shutdown_{0}; + }; + +} diff --git a/utils/frame_output_factory.cpp b/utils/frame_output_factory.cpp new file mode 100644 index 0000000..784967a --- /dev/null +++ b/utils/frame_output_factory.cpp @@ -0,0 +1,88 @@ +/** + * @file frame_output_factory.cpp + * @brief factory that constructs configured FrameOutput implementations + */ + +#include "frame_output_factory.h" +#include "fits_writer.h" +#include "shared_memory_writer.h" +#include "common.h" + +#include +#include + +namespace { + bool parse_bool(const std::string &v) { + return v == "yes" || v == "YES" || v == "true" || v == "TRUE" || v == "1"; + } +} + +namespace Camera { + + void apply_config_overrides(FrameOutputsConfig &out, const Config &cfg) { + const std::string function("Camera::apply_config_overrides"); + for (int row = 0; row < cfg.n_rows; ++row) { + const auto &key = cfg.param[row]; + const auto &val = cfg.arg[row]; + try { + if (key == "SHM_ENABLED") out.shm_enabled = parse_bool(val); + else if (key == "SHM_SEGMENT_NAME") out.shm_segment_name = val; + else if (key == "SHM_NUM_FRAMES") out.shm_num_frames = static_cast(std::stoul(val)); + else if (key == "FITS_ENABLED") out.fits_enabled = parse_bool(val); + else if (key == "FITS_OUTPUT_DIR") out.fits.output_dir = val; + else if (key == "FITS_BASENAME") out.fits.basename = val; + else if (key == "FITS_WRITE_INTERVAL_MS") out.fits.write_interval_ms = static_cast(std::stoul(val)); + else if (key == "FITS_QUEUE_SIZE") out.fits.queue_size = static_cast(std::stoul(val)); + else if (key == "FITS_DRAIN_TIMEOUT_MS") out.fits.drain_timeout_ms = static_cast(std::stoul(val)); + } + catch (const std::exception &e) { + logwrite(function, "WARNING bad value for " + key + "=" + val + ": " + e.what()); + } + } + } + + std::vector> + make_frame_outputs(const FrameOutputsConfig &cfg) { + const std::string function("Camera::make_frame_outputs"); + std::vector> outputs; + + if (cfg.shm_enabled) { + if (cfg.shm_max_frame_bytes == 0) { + logwrite(function, "WARNING shm_enabled but shm_max_frame_bytes==0; SHM skipped"); + } + else { + auto shm = std::make_unique( + cfg.shm_segment_name, cfg.shm_max_frame_bytes, cfg.shm_num_frames); + if (shm->open() == NO_ERROR) { + logwrite(function, "SHM output enabled: segment=" + cfg.shm_segment_name + + " max_bytes=" + std::to_string(cfg.shm_max_frame_bytes) + + " frames=" + std::to_string(cfg.shm_num_frames)); + outputs.push_back(std::move(shm)); + } + else { + logwrite(function, "WARNING SHM output failed to open; skipped"); + } + } + } + + if (cfg.fits_enabled) { + auto fits = std::make_unique(cfg.fits); + if (fits->open() == NO_ERROR) { + logwrite(function, "FITS output enabled: dir=" + cfg.fits.output_dir + + " basename=" + cfg.fits.basename + + " interval_ms=" + std::to_string(cfg.fits.write_interval_ms) + + " queue=" + std::to_string(cfg.fits.queue_size)); + outputs.push_back(std::move(fits)); + } + else { + logwrite(function, "WARNING FITS output failed to open; skipped"); + } + } + + if (outputs.empty()) { + logwrite(function, "no frame outputs configured"); + } + return outputs; + } + +} diff --git a/utils/frame_output_factory.h b/utils/frame_output_factory.h new file mode 100644 index 0000000..b1443e4 --- /dev/null +++ b/utils/frame_output_factory.h @@ -0,0 +1,40 @@ +/** + * @file frame_output_factory.h + * @brief factory that constructs configured FrameOutput implementations + * + * Lets instruments obtain their frame_outputs without depending on + * concrete writer subclasses. Adding a new writer touches only this + * factory and its .cpp. + */ +#pragma once + +#include "config.h" +#include "frame_output.h" +#include "fits_writer.h" + +#include +#include +#include +#include +#include + +namespace Camera { + + struct FrameOutputsConfig { + bool shm_enabled{false}; + std::string shm_segment_name{"camera"}; + size_t shm_max_frame_bytes{0}; // required > 0 when shm_enabled + uint32_t shm_num_frames{4}; + + bool fits_enabled{false}; + FitsWriterConfig fits; + }; + + // Defaults set on `out` by the caller survive for keys not present in cfg + void apply_config_overrides(FrameOutputsConfig &out, const Config &cfg); + + // open() is called on each writer; failures are logged and the writer is skipped + std::vector> + make_frame_outputs(const FrameOutputsConfig &cfg); + +}