Rewrite FFmpeg decoder to use pull model

This allows us to keep asynchronous decoders like MMAL and V4L2M2M fed
while we're waiting for output frames. Behavior for synchronous decoders
should be identical.

Continuing to feed new data while waiting for output frames is crucial for
acceptable performance on 1080p video on the Raspberry Pi using V4L2M2M,
since it allows the decode and copy operations to be pipelined.
This commit is contained in:
Cameron Gutman 2022-01-17 15:06:12 -06:00
parent 8a27fa7bb5
commit d6cfbdb273
4 changed files with 165 additions and 119 deletions

View file

@ -357,6 +357,13 @@ bool Session::populateDecoderProperties(SDL_Window* window)
} }
m_VideoCallbacks.capabilities = decoder->getDecoderCapabilities(); m_VideoCallbacks.capabilities = decoder->getDecoderCapabilities();
if (m_VideoCallbacks.capabilities & CAPABILITY_PULL_RENDERER) {
// It is an error to pass a push callback when in pull mode
m_VideoCallbacks.submitDecodeUnit = nullptr;
}
else {
m_VideoCallbacks.submitDecodeUnit = drSubmitDecodeUnit;
}
m_StreamConfig.colorSpace = decoder->getDecoderColorspace(); m_StreamConfig.colorSpace = decoder->getDecoderColorspace();
@ -439,7 +446,6 @@ bool Session::initialize()
LiInitializeVideoCallbacks(&m_VideoCallbacks); LiInitializeVideoCallbacks(&m_VideoCallbacks);
m_VideoCallbacks.setup = drSetup; m_VideoCallbacks.setup = drSetup;
m_VideoCallbacks.submitDecodeUnit = drSubmitDecodeUnit;
LiInitializeStreamConfiguration(&m_StreamConfig); LiInitializeStreamConfiguration(&m_StreamConfig);
m_StreamConfig.width = m_Preferences->width; m_StreamConfig.width = m_Preferences->width;

View file

@ -46,8 +46,6 @@
#define FAILED_DECODES_RESET_THRESHOLD 20 #define FAILED_DECODES_RESET_THRESHOLD 20
#define MAX_RECV_FRAME_RETRIES 100
bool FFmpegVideoDecoder::isHardwareAccelerated() bool FFmpegVideoDecoder::isHardwareAccelerated()
{ {
return m_HwDecodeCfg != nullptr || return m_HwDecodeCfg != nullptr ||
@ -72,6 +70,9 @@ int FFmpegVideoDecoder::getDecoderCapabilities()
capabilities |= CAPABILITY_SLICES_PER_FRAME(slices); capabilities |= CAPABILITY_SLICES_PER_FRAME(slices);
} }
// We use our own decoder thread with the "pull" model
capabilities |= CAPABILITY_PULL_RENDERER;
return capabilities; return capabilities;
} }
@ -137,12 +138,14 @@ FFmpegVideoDecoder::FFmpegVideoDecoder(bool testOnly)
m_VideoFormat(0), m_VideoFormat(0),
m_NeedsSpsFixup(false), m_NeedsSpsFixup(false),
m_TestOnly(testOnly), m_TestOnly(testOnly),
m_CanRetryReceiveFrame(RRF_UNKNOWN) m_DecoderThread(nullptr)
{ {
SDL_zero(m_ActiveWndVideoStats); SDL_zero(m_ActiveWndVideoStats);
SDL_zero(m_LastWndVideoStats); SDL_zero(m_LastWndVideoStats);
SDL_zero(m_GlobalVideoStats); SDL_zero(m_GlobalVideoStats);
SDL_AtomicSet(&m_DecoderThreadShouldQuit, 0);
// Use linear filtering when renderer scaling is required // Use linear filtering when renderer scaling is required
SDL_SetHint(SDL_HINT_RENDER_SCALE_QUALITY, "1"); SDL_SetHint(SDL_HINT_RENDER_SCALE_QUALITY, "1");
} }
@ -167,6 +170,19 @@ IFFmpegRenderer* FFmpegVideoDecoder::getBackendRenderer()
void FFmpegVideoDecoder::reset() void FFmpegVideoDecoder::reset()
{ {
// Terminate the decoder thread before doing anything else.
// It might be touching things we're about to free.
if (m_DecoderThread != nullptr) {
SDL_AtomicSet(&m_DecoderThreadShouldQuit, 1);
LiWakeWaitForVideoFrame();
SDL_WaitThread(m_DecoderThread, NULL);
SDL_AtomicSet(&m_DecoderThreadShouldQuit, 0);
m_DecoderThread = nullptr;
}
m_FramesIn = m_FramesOut = 0;
m_FrameInfoQueue.clear();
delete m_Pacer; delete m_Pacer;
m_Pacer = nullptr; m_Pacer = nullptr;
@ -410,6 +426,13 @@ bool FFmpegVideoDecoder::completeInitialization(const AVCodec* decoder, PDECODER
Session::get()->getOverlayManager().setOverlayRenderer(m_FrontendRenderer); Session::get()->getOverlayManager().setOverlayRenderer(m_FrontendRenderer);
} }
m_DecoderThread = SDL_CreateThread(FFmpegVideoDecoder::decoderThreadProcThunk, "FFDecoder", (void*)this);
if (m_DecoderThread == nullptr) {
SDL_LogError(SDL_LOG_CATEGORY_APPLICATION,
"Failed to create decoder thread: %s", SDL_GetError());
return false;
}
return true; return true;
} }
@ -924,14 +947,128 @@ void FFmpegVideoDecoder::writeBuffer(PLENTRY entry, int& offset)
} }
} }
int FFmpegVideoDecoder::decoderThreadProcThunk(void *context)
{
((FFmpegVideoDecoder*)context)->decoderThreadProc();
return 0;
}
void FFmpegVideoDecoder::decoderThreadProc()
{
while (!SDL_AtomicGet(&m_DecoderThreadShouldQuit)) {
if (m_FramesIn == m_FramesOut) {
VIDEO_FRAME_HANDLE handle;
PDECODE_UNIT du;
// Waiting for input. All output frames have been received.
// Block until we receive a new frame from the host.
if (!LiWaitForNextVideoFrame(&handle, &du)) {
// This might be a signal from the main thread to exit
continue;
}
LiCompleteVideoFrame(handle, submitDecodeUnit(du));
}
if (m_FramesIn != m_FramesOut) {
SDL_assert(m_FramesIn > m_FramesOut);
// We have output frames to receive. Let's poll until we get one,
// and submit new input data if/when we get it.
AVFrame* frame = av_frame_alloc();
if (!frame) {
// Failed to allocate a frame but we did submit,
// so we can return DR_OK
SDL_LogWarn(SDL_LOG_CATEGORY_APPLICATION,
"Failed to allocate frame");
continue;
}
int err;
do {
err = avcodec_receive_frame(m_VideoDecoderCtx, frame);
if (err == 0) {
SDL_assert(m_FrameInfoQueue.size() == m_FramesIn - m_FramesOut);
m_FramesOut++;
// Reset failed decodes count if we reached this far
m_ConsecutiveFailedDecodes = 0;
// Restore default log level after a successful decode
av_log_set_level(AV_LOG_INFO);
// Capture a frame timestamp to measuring pacing delay
frame->pkt_dts = SDL_GetTicks();
if (!m_FrameInfoQueue.isEmpty()) {
FrameInfoTuple infoTuple = m_FrameInfoQueue.dequeue();
// Count time in avcodec_send_packet() and avcodec_receive_frame()
// as time spent decoding. Also count time spent in the decode unit
// queue because that's directly caused by decoder latency.
m_ActiveWndVideoStats.totalDecodeTime += LiGetMillis() - infoTuple.enqueueTimeMs;
// Store the presentation time
frame->pts = infoTuple.presentationTimeMs;
}
m_ActiveWndVideoStats.decodedFrames++;
// Queue the frame for rendering (or render now if pacer is disabled)
m_Pacer->submitFrame(frame);
}
else if (err == AVERROR(EAGAIN)) {
VIDEO_FRAME_HANDLE handle;
PDECODE_UNIT du;
// No output data, so let's try to submit more input data,
// while we're waiting for this to frame to come back.
if (LiPollNextVideoFrame(&handle, &du)) {
// FIXME: Handle EAGAIN on avcodec_send_packet() properly?
LiCompleteVideoFrame(handle, submitDecodeUnit(du));
}
else {
// No output data or input data. Let's wait a little bit.
SDL_Delay(2);
}
}
else {
char errorstring[512];
av_strerror(err, errorstring, sizeof(errorstring));
SDL_LogWarn(SDL_LOG_CATEGORY_APPLICATION,
"avcodec_receive_frame() failed: %s", errorstring);
if (++m_ConsecutiveFailedDecodes == FAILED_DECODES_RESET_THRESHOLD) {
SDL_LogError(SDL_LOG_CATEGORY_APPLICATION,
"Resetting decoder due to consistent failure");
SDL_Event event;
event.type = SDL_RENDER_DEVICE_RESET;
SDL_PushEvent(&event);
}
}
} while (err == AVERROR(EAGAIN) && !SDL_AtomicGet(&m_DecoderThreadShouldQuit));
if (err != 0) {
// Free the frame if we failed to submit it
av_frame_free(&frame);
}
}
}
}
int FFmpegVideoDecoder::submitDecodeUnit(PDECODE_UNIT du) int FFmpegVideoDecoder::submitDecodeUnit(PDECODE_UNIT du)
{ {
PLENTRY entry = du->bufferList; PLENTRY entry = du->bufferList;
int err; int err;
bool submittedFrame = false;
SDL_assert(!m_TestOnly); SDL_assert(!m_TestOnly);
// Bail immediately if we need an IDR frame to continue
if (Session::get()->getAndClearPendingIdrFrameStatus()) {
return DR_NEED_IDR;
}
if (!m_LastFrameNumber) { if (!m_LastFrameNumber) {
m_ActiveWndVideoStats.measurementStartTimestamp = SDL_GetTicks(); m_ActiveWndVideoStats.measurementStartTimestamp = SDL_GetTicks();
m_LastFrameNumber = du->frameNumber; m_LastFrameNumber = du->frameNumber;
@ -1016,115 +1153,10 @@ int FFmpegVideoDecoder::submitDecodeUnit(PDECODE_UNIT du)
return DR_NEED_IDR; return DR_NEED_IDR;
} }
m_FrameInfoQueue.enqueue({.enqueueTimeMs = du->enqueueTimeMs,
.presentationTimeMs = du->presentationTimeMs});
m_FramesIn++; m_FramesIn++;
// We can receive 0 or more frames after submission of a packet, so we must
// try to read until we get EAGAIN to ensure the queue is drained. Some decoders
// run asynchronously and may return several frames at once after warming up.
//
// Some decoders support calling avcodec_receive_frame() without queuing a packet.
// This allows us to drain excess frames and reduce latency. We will try to learn
// if a decoder is capable of this by trying it and seeing if it works.
int receiveRetries = 0;
do {
AVFrame* frame = av_frame_alloc();
if (!frame) {
// Failed to allocate a frame but we did submit,
// so we can return DR_OK
SDL_LogWarn(SDL_LOG_CATEGORY_APPLICATION,
"Failed to allocate frame");
return DR_OK;
}
err = avcodec_receive_frame(m_VideoDecoderCtx, frame);
if (err == 0) {
m_FramesOut++;
// Reset failed decodes count if we reached this far
m_ConsecutiveFailedDecodes = 0;
// Restore default log level after a successful decode
av_log_set_level(AV_LOG_INFO);
// Store the presentation time
// FIXME: This is wrong when reading a batch of frames
frame->pts = du->presentationTimeMs;
// Capture a frame timestamp to measuring pacing delay
frame->pkt_dts = SDL_GetTicks();
// Count time in avcodec_send_packet() and avcodec_receive_frame()
// as time spent decoding. Also count time spent in the decode unit
// queue because that's directly caused by decoder latency.
m_ActiveWndVideoStats.totalDecodeTime += LiGetMillis() - du->enqueueTimeMs;
// Also count the frame-to-frame delay if the decoder is delaying frames
// until a subsequent frame is submitted.
m_ActiveWndVideoStats.totalDecodeTime += (m_FramesIn - m_FramesOut) * (1000 / m_StreamFps);
m_ActiveWndVideoStats.decodedFrames++;
// Queue the frame for rendering (or render now if pacer is disabled)
m_Pacer->submitFrame(frame);
submittedFrame = true;
// Once we receive a frame, transition out of the Unknown state by determining
// whether a receive frame retry was needed to get this frame. We assume that
// any asynchronous decoder is going to return EAGAIN on the first frame.
if (m_CanRetryReceiveFrame == RRF_UNKNOWN) {
SDL_LogInfo(SDL_LOG_CATEGORY_APPLICATION, "RRF mode: %s", receiveRetries > 0 ? "YES" : "NO");
m_CanRetryReceiveFrame = receiveRetries > 0 ? RRF_YES : RRF_NO;
}
}
else {
av_frame_free(&frame);
if (err == AVERROR(EAGAIN)) {
// Break out if we can't retry or we successfully received a frame. We only want
// to retry if we haven't gotten a frame back for this input packet.
if (m_CanRetryReceiveFrame == RRF_NO || receiveRetries == MAX_RECV_FRAME_RETRIES || submittedFrame) {
// We will transition from Unknown -> No if we exceed the maximum retries.
if (m_CanRetryReceiveFrame == RRF_UNKNOWN) {
SDL_assert(!submittedFrame);
SDL_assert(receiveRetries == MAX_RECV_FRAME_RETRIES);
SDL_LogWarn(SDL_LOG_CATEGORY_APPLICATION, "RRF mode: NO (timeout)");
m_CanRetryReceiveFrame = RRF_NO;
}
break;
}
else {
SDL_Delay(1);
}
}
}
} while (err == 0 || (err == AVERROR(EAGAIN) && receiveRetries++ < MAX_RECV_FRAME_RETRIES));
// Treat this as a failed decode if we don't manage to receive a single frame or
// if we finish the loop above with an error other than EAGAIN. Note that some
// limited number of "failed decodes" with EAGAIN are expected for asynchronous
// decoders, so we only reset the decoder if we get a ton of them in a row.
if (!submittedFrame || err != AVERROR(EAGAIN)) {
// Don't spam EAGAIN log messages for asynchronous decoders as long as
// they produce a frame for at least every other submitted packet.
if (m_ConsecutiveFailedDecodes > 0 || err != AVERROR(EAGAIN)) {
char errorstring[512];
av_strerror(err, errorstring, sizeof(errorstring));
SDL_LogWarn(SDL_LOG_CATEGORY_APPLICATION,
"avcodec_receive_frame() failed: %s", errorstring);
}
if (++m_ConsecutiveFailedDecodes == FAILED_DECODES_RESET_THRESHOLD) {
SDL_LogError(SDL_LOG_CATEGORY_APPLICATION,
"Resetting decoder due to consistent failure");
SDL_Event event;
event.type = SDL_RENDER_DEVICE_RESET;
SDL_PushEvent(&event);
}
}
return DR_OK; return DR_OK;
} }

View file

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <functional> #include <functional>
#include <QQueue>
#include "decoder.h" #include "decoder.h"
#include "ffmpeg-renderers/renderer.h" #include "ffmpeg-renderers/renderer.h"
@ -54,6 +55,10 @@ private:
enum AVPixelFormat ffGetFormat(AVCodecContext* context, enum AVPixelFormat ffGetFormat(AVCodecContext* context,
const enum AVPixelFormat* pixFmts); const enum AVPixelFormat* pixFmts);
void decoderThreadProc();
static int decoderThreadProcThunk(void* context);
AVPacket* m_Pkt; AVPacket* m_Pkt;
AVCodecContext* m_VideoDecoderCtx; AVCodecContext* m_VideoDecoderCtx;
QByteArray m_DecodeBuffer; QByteArray m_DecodeBuffer;
@ -74,11 +79,14 @@ private:
int m_VideoFormat; int m_VideoFormat;
bool m_NeedsSpsFixup; bool m_NeedsSpsFixup;
bool m_TestOnly; bool m_TestOnly;
enum { SDL_Thread* m_DecoderThread;
RRF_UNKNOWN, SDL_atomic_t m_DecoderThreadShouldQuit;
RRF_YES,
RRF_NO typedef struct {
} m_CanRetryReceiveFrame; uint64_t enqueueTimeMs;
uint32_t presentationTimeMs;
} FrameInfoTuple;
QQueue<FrameInfoTuple> m_FrameInfoQueue;
static const uint8_t k_H264TestFrame[]; static const uint8_t k_H264TestFrame[];
static const uint8_t k_HEVCMainTestFrame[]; static const uint8_t k_HEVCMainTestFrame[];

@ -1 +1 @@
Subproject commit 6001ece0b8bfcea6a8122a3e56f48f515e1aaaf5 Subproject commit 921b59c467ac78ef2a770ad1bb3e61fbef51bd09