Skip to main content

Async Push Pull

FieldValue
DifficultyBeginner
Estimated Read Time10-15 minutes
Labelsasync, push-pull, throughput, runtime

Concept

This tutorial explains how to use asynchronous APIs to build high-performance production quality applications.

In a synchronous loop, one thread blocks while waiting for each result. That is simple, but it underutilizes compute when input production and output consumption can overlap. Async execution improves throughput by decoupling these stages:

  • push(...) feeds inputs as they become ready.
  • pull(...) consumes outputs independently.

This chapter focuses on the core async pattern used in real applications: producer-side async push and consumer-side async pull, with explicit queue/backpressure behavior.

For the programming concepts behind this flow, see:

Learning Process

  1. Prepare runtime inputs: parse CLI args, load ResNet50 MPK, and construct local input samples.
  2. Build the async run path and split responsibilities between producer push(...) and consumer pull(...).
  3. Observe queue-driven behavior and verify throughput-oriented execution.
  4. Validate results with top-1 output, async stats, and stable tutorial signature.

Run

NEAT_EXTRAS_ROOT=<sima-neat-*-Linux-extras>
cd $NEAT_EXTRAS_ROOT/lib/sima-neat/tutorials
./tutorial_v2_002_async_push_pull

Code

tutorials/002_async_push_pull/async_push_pull.cpp
// Async push/pull: producer thread pushes frames, main thread pulls outputs.
//
// Usage:
// tutorial_v2_002_async_push_pull --mpk /path/to/resnet_50.tar.gz [--image /path/to.jpg] [--n 4]

#include "neat.h"

#include <opencv2/core.hpp>
#include <opencv2/imgcodecs.hpp>
#include <opencv2/imgproc.hpp>

#include <atomic>
#include <cstring>
#include <exception>
#include <filesystem>
#include <iostream>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>

namespace fs = std::filesystem;

namespace {

bool get_arg(int argc, char** argv, const std::string& key, std::string& out) {
for (int i = 1; i + 1 < argc; ++i) {
if (key == argv[i]) {
out = argv[i + 1];
return true;
}
}
return false;
}

int parse_int_arg(int argc, char** argv, const std::string& key, int def) {
std::string value;
if (!get_arg(argc, argv, key, value))
return def;
return std::stoi(value);
}

cv::Mat load_rgb(const fs::path& image_path, int size) {
cv::Mat bgr = cv::imread(image_path.string(), cv::IMREAD_COLOR);
if (bgr.empty())
throw std::runtime_error("failed to read image: " + image_path.string());
if (bgr.cols != size || bgr.rows != size) {
cv::resize(bgr, bgr, cv::Size(size, size), 0, 0, cv::INTER_AREA);
}
cv::Mat rgb;
cv::cvtColor(bgr, rgb, cv::COLOR_BGR2RGB);
if (!rgb.isContinuous())
rgb = rgb.clone();
return rgb;
}

simaai::neat::Model::Options build_options(int size) {
simaai::neat::Model::Options opt;
opt.format = "RGB";
opt.input_max_width = size;
opt.input_max_height = size;
opt.input_max_depth = 3;
opt.preproc.channel_mean = {0.485f, 0.456f, 0.406f};
opt.preproc.channel_stddev = {0.229f, 0.224f, 0.225f};
return opt;
}

int top1_from_output(const simaai::neat::Sample& out) {
if (!out.tensor.has_value())
throw std::runtime_error("no tensor output");
const simaai::neat::Mapping m = out.tensor->map_read();
const size_t n = m.size_bytes / sizeof(float);
const float* p = reinterpret_cast<const float*>(m.data);
int best = 0;
for (size_t i = 1; i < n && i < 1000; ++i) {
if (p[i] > p[best])
best = static_cast<int>(i);
}
return best;
}

} // namespace

int main(int argc, char** argv) {
try {
std::string mpk, image;
if (!get_arg(argc, argv, "--mpk", mpk)) {
std::cerr
<< "Usage: tutorial_v2_002_async_push_pull --mpk <path> [--image <path>] [--n <n>]\n";
return 1;
}
get_arg(argc, argv, "--image", image);
const int n = parse_int_arg(argc, argv, "--n", 4);
const int size = 224;

cv::Mat frame = image.empty() ? cv::Mat(size, size, CV_8UC3, cv::Scalar(99, 99, 99))
: load_rgb(image, size);
std::vector<cv::Mat> frames(n, frame);

// CORE LOGIC
// Build a Session around the model and run it async: one producer thread pushes,
// the main thread pulls outputs.
simaai::neat::Model model(mpk, build_options(size));

simaai::neat::Session session;
session.add(model.session());

simaai::neat::RunOptions opt;
opt.queue_depth = 8;
opt.overflow_policy = simaai::neat::OverflowPolicy::Block;
opt.output_memory = simaai::neat::OutputMemory::Owned;

auto run = session.build(frames.front(), simaai::neat::RunMode::Async, opt);

std::atomic<int> pushed{0};
std::thread producer([&]() {
for (const cv::Mat& f : frames) {
run.push(f);
pushed.fetch_add(1, std::memory_order_relaxed);
}
run.close_input();
});

int pulled = 0;
while (pulled < n) {
auto out = run.pull(/*timeout_ms=*/2000);
if (!out.has_value())
break;
std::cout << "top1=" << top1_from_output(*out) << "\n";
++pulled;
}
producer.join();

std::cout << "pushed=" << pushed.load() << " pulled=" << pulled << "\n";
std::cout << "[OK] 002_async_push_pull\n";
return 0;
} catch (const std::exception& e) {
std::cerr << "[FAIL] " << e.what() << "\n";
return 1;
}
}

Source