From 8a2f6e7773e2822ef8522c7f9b86cc7ab42aaf07 Mon Sep 17 00:00:00 2001 From: Michal Adamczak Date: Fri, 15 Jul 2022 18:38:45 +0200 Subject: [PATCH 1/2] StreamBuffer for input Alternative input system for LPMS. The idea is to allow using of FFmpeg demultiplexers with any input mechanism, such as network packet delivery. In order to do that we have to provide custom AVIOContext with custom read() and seek() operations. This also means that synchronization is needed because we will have to perform input and transcoding in different threads (for example demux - which is a part of transcode - may want more data which is not available yet, then transcode should block until input thread delivers more data) --- ffmpeg/decoder.c | 28 +++++- ffmpeg/decoder.h | 3 +- ffmpeg/ffmpeg.go | 31 ++++++ ffmpeg/stream_buffer.c | 219 +++++++++++++++++++++++++++++++++++++++++ ffmpeg/stream_buffer.h | 66 +++++++++++++ ffmpeg/transcoder.c | 42 +++++++- ffmpeg/transcoder.h | 4 + 7 files changed, 386 insertions(+), 7 deletions(-) create mode 100644 ffmpeg/stream_buffer.c create mode 100644 ffmpeg/stream_buffer.h diff --git a/ffmpeg/decoder.c b/ffmpeg/decoder.c index f1b90123e2..a92b62dbad 100755 --- a/ffmpeg/decoder.c +++ b/ffmpeg/decoder.c @@ -73,7 +73,7 @@ int flush_in(struct input_ctx *ictx, AVFrame *frame, int *stream_index) return AVERROR_EOF; } -// FIXME: name me and the other function better +// FIXME: name me and the other function better (and move to utilities...) enum AVPixelFormat hw2pixfmt(AVCodecContext *ctx) { const AVCodec *decoder = ctx->codec; @@ -254,7 +254,26 @@ static void close_video_decoder(struct input_ctx *ictx) if (ictx->last_frame_v) av_frame_free(&ictx->last_frame_v); } -int open_input(input_params *params, struct input_ctx *ctx) +int open_demuxer(input_params *params, struct input_ctx *ctx, StreamBuffer *buffer) +{ + int ret; + ctx->ic = avformat_alloc_context(); + if (buffer) { + // using custom input + if (buffer_setup_as_input(buffer, ctx->ic) < 0) { + // memory allocation failure + return -1; + } + // instruct FFmpeg to use our input, note that file name is empty + ret = avformat_open_input(&ctx->ic, "", NULL, NULL); + } else { + // normal file-based input, note that file name is passed + ret = avformat_open_input(&ctx->ic, params->fname, NULL, NULL); + } + return ret; +} + +int open_input(input_params *params, struct input_ctx *ctx, StreamBuffer *buffer) { char *inp = params->fname; int ret = 0; @@ -268,7 +287,9 @@ int open_input(input_params *params, struct input_ctx *ctx) ctx->device = params->device; // open demuxer - ret = avformat_open_input(&ctx->ic, inp, NULL, NULL); + ret = open_demuxer(params, ctx, buffer); + // TODO: maybe move stuff below to open_demuxer? Could also remove + // indices on the input side, just like it was done for the output if (ret < 0) LPMS_ERR(open_input_err, "demuxer: Unable to open input"); ret = avformat_find_stream_info(ctx->ic, NULL); if (ret < 0) LPMS_ERR(open_input_err, "Unable to find input info"); @@ -341,4 +362,3 @@ void free_input(struct input_ctx *ictx, enum FreeInputPolicy policy) // audio decoder is always closed close_audio_decoder(ictx); } - diff --git a/ffmpeg/decoder.h b/ffmpeg/decoder.h index 77a3e60360..77dbf6fcb9 100755 --- a/ffmpeg/decoder.h +++ b/ffmpeg/decoder.h @@ -5,6 +5,7 @@ #include #include #include "transcoder.h" +#include "stream_buffer.h" struct input_ctx { AVFormatContext *ic; // demuxer required @@ -67,7 +68,7 @@ int demux_in(struct input_ctx *ictx, AVPacket *pkt); int decode_in(struct input_ctx *ictx, AVPacket *pkt, AVFrame *frame, int *stream_index); int flush_in(struct input_ctx *ictx, AVFrame *frame, int *stream_index); int process_in(struct input_ctx *ictx, AVFrame *frame, AVPacket *pkt, int *stream_index); -int open_input(input_params *params, struct input_ctx *ctx); +int open_input(input_params *params, struct input_ctx *ctx, StreamBuffer *buffer); void free_input(struct input_ctx *inctx, enum FreeInputPolicy policy); // this should perhaps be moved to some utility file, as it is not decoder // specific diff --git a/ffmpeg/ffmpeg.go b/ffmpeg/ffmpeg.go index 6a947135a5..4c875664a4 100755 --- a/ffmpeg/ffmpeg.go +++ b/ffmpeg/ffmpeg.go @@ -607,6 +607,36 @@ func isAudioAllDrop(ps []TranscodeOptions) bool { return true } +// load input file into input buffer +func loadInputBuffer(t *Transcoder, input *TranscodeOptionsIn) { + if strings.HasPrefix(strings.ToLower(input.Fname), "pipe:") { + C.lpms_transcode_push_reset(t.handle, 0) + fmt.Println("Skipping buffer input for pipe") + return + } + C.lpms_transcode_push_reset(t.handle, 1) + data, err := os.ReadFile(input.Fname) + if nil != err { + // error loading file + // translate Go error into proper FFmpeg error to get expected behavior + // of tests + var error C.int + if errors.Is(err, os.ErrNotExist) { + error = 1 + } else { + error = 0 + } + C.lpms_transcode_push_error(t.handle, error); + fmt.Println("Error while loading the queue", err) + } else { + // file loaded fine + C.lpms_transcode_push_bytes(t.handle, (*C.uchar)(unsafe.Pointer(&data[0])), C.int(len(data))) + C.lpms_transcode_push_eof(t.handle) + fmt.Println(len(data), "bytes of input loaded into buffer") + } +} + + // create C output params array and return it along with corresponding finalizer // function that makes sure there are no C memory leaks func createCOutputParams(input *TranscodeOptionsIn, ps []TranscodeOptions) ([]C.output_params, func(), error) { @@ -929,6 +959,7 @@ func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions) paramsPointer = (*C.output_params)(¶ms[0]) resultsPointer = (*C.output_results)(&results[0]) } + loadInputBuffer(t, input) ret := int(C.lpms_transcode(inp, paramsPointer, resultsPointer, C.int(len(params)), decoded)) if ret != 0 { if LogTranscodeErrors { diff --git a/ffmpeg/stream_buffer.c b/ffmpeg/stream_buffer.c new file mode 100644 index 0000000000..97aba2cf14 --- /dev/null +++ b/ffmpeg/stream_buffer.c @@ -0,0 +1,219 @@ +#include "stream_buffer.h" + +int buffer_create(StreamBuffer *buffer) +{ + buffer->data = (uint8_t *)malloc(STREAM_BUFFER_BYTES); + if (!buffer->data) return -1; + pthread_mutex_init(&buffer->mutex, NULL); + pthread_cond_init(&buffer->condition_put, NULL); + pthread_cond_init(&buffer->condition_get, NULL); + buffer_reset(buffer); + return 0; +} + +void buffer_destroy(StreamBuffer *buffer) +{ + if (buffer->data) free(buffer->data); + pthread_mutex_destroy(&buffer->mutex); + pthread_cond_destroy(&buffer->condition_put); + pthread_cond_destroy(&buffer->condition_get); + buffer->data = NULL; + buffer->flags = 0; +} + +static int64_t remaining(StreamBuffer *buffer) +{ + return STREAM_BUFFER_BYTES - buffer->unread_bytes - PROTECTED_BYTES; +} + +static int ffmpeg_error(StreamErrors errors) +{ + switch (errors) { + case NO_ENTRY: return AVERROR(ENOENT); + default: return AVERROR(EIO); + } +} + +static int buffer_read_function(void *user_data, uint8_t *buf, int buf_size) +{ + StreamBuffer *buffer = (StreamBuffer *)user_data; + int64_t to_read, end_offset, trailing, first_copy, second_copy; + int ret = 0; + pthread_mutex_lock(&buffer->mutex); + if (buffer->flags & STREAM_ERROR) { + // there was an error, emulate FFmpeg behavior + ret = ffmpeg_error(buffer->error); + goto read_error; + } + // wait for end of stream or some unread data + while (!(END_OF_STREAM & buffer->flags) && !buffer->unread_bytes) { + pthread_cond_wait(&buffer->condition_put, &buffer->mutex); + } + if ((END_OF_STREAM & buffer->flags) && !buffer->unread_bytes) { + // no unread data and none is coming, that is an EOF + ret = AVERROR_EOF; + goto read_error; + } + // have some data to read, copy them out + to_read = (buf_size <= buffer->unread_bytes) ? buf_size : buffer->unread_bytes; + end_offset = (buffer->index + buffer->read_bytes) % STREAM_BUFFER_BYTES; + trailing = STREAM_BUFFER_BYTES - end_offset; + first_copy = (trailing >= to_read) ? to_read : trailing; + memcpy(buf, buffer->data + end_offset, first_copy); + second_copy = to_read - first_copy; + memcpy(buf + first_copy, buffer->data, second_copy); + // update buffer + buffer->read_bytes += to_read; + buffer->unread_bytes -= to_read; + pthread_mutex_unlock(&buffer->mutex); + pthread_cond_signal(&buffer->condition_get); + return to_read; + +read_error: + pthread_mutex_unlock(&buffer->mutex); + return ret; +} + +static int64_t seek_to(StreamBuffer *buffer, int64_t pos) +{ + int64_t available = buffer->read_bytes + buffer->unread_bytes; + int64_t delta = pos - buffer->index; + if (delta < 0) { + // attempt to seek before the start of the buffer + return -1; + } + if (available < delta) { + // attempt to seek after the end of the buffer + return -1; + } + + // execute seek + buffer->read_bytes = delta; + buffer->unread_bytes = available - delta; + return pos; +} + +static int64_t buffer_seek_function(void *user_data, int64_t pos, int whence) +{ + StreamBuffer *buffer = (StreamBuffer *)user_data; + // remove force flag + whence &= ~AVSEEK_FORCE; + pthread_mutex_lock(&buffer->mutex); + int ret; + if (buffer->flags & STREAM_ERROR) { + // there was an error, emulate FFmpeg behavior + ret = ffmpeg_error(buffer->error); + goto seek_finish; + } + // FFmpeg ORs in some extra flags, so have to use AND + if (AVSEEK_SIZE & whence) { + if (buffer->flags & END_OF_STREAM) { + // already have all the data so can say + ret = buffer->index + buffer->read_bytes + buffer->unread_bytes; + goto seek_finish; + } else { + // not supported, because we cannot be sure how many bytes will arrive over + // queue + ret = -1; + } + goto seek_finish; + } + if (SEEK_END == whence) { + if (END_OF_STREAM & buffer->flags) { + // possible, because we reached end of stream already + ret = seek_to(buffer, buffer->index + buffer->read_bytes + buffer->unread_bytes + pos); + } else { + // can't do that because we haven't seen the end yet + ret = -1; + } + } else if (SEEK_SET == whence) { + ret = seek_to(buffer, pos); + } else if (SEEK_CUR == whence) { + ret = seek_to(buffer, buffer->index + pos); + } + +seek_finish: + pthread_mutex_unlock(&buffer->mutex); + pthread_cond_signal(&buffer->condition_get); + return ret; +} + +int buffer_setup_as_input(StreamBuffer *buffer, AVFormatContext *ctx) +{ + // IMPORTANT: I am not sure if ffmpeg documentation states that explicitly, + // but the memory of ctx->pb as well as its io_buffer seem to be released when + // ctx will get closed. I tried otherwise and got "double free" errors +#define BUFFER_SIZE 4096 + void *io_buffer = av_malloc(BUFFER_SIZE); + if (!io_buffer) return -1; + ctx->pb = avio_alloc_context( + io_buffer, BUFFER_SIZE, // buffer and size + 0, // do not write, just read + buffer, // pass buffer as user data + buffer_read_function, + NULL, // no write function supplied + buffer_seek_function); + if (!ctx->pb) return -1; + ctx->flags |= AVFMT_FLAG_CUSTOM_IO; + return 0; +} + +void buffer_reset(StreamBuffer *buffer) +{ + buffer->index = buffer->read_bytes = buffer->unread_bytes = 0; + buffer->flags = 0; + buffer->error = 0; +} + +void buffer_put_bytes(StreamBuffer *buffer, uint8_t *bytes, int64_t size) +{ + int64_t space, end, end_offset, trailing, first_copy, second_copy, deficit; + pthread_mutex_lock(&buffer->mutex); + while (!remaining(buffer)) { + // wait until there is some free(able) space in the buffer + pthread_cond_wait(&buffer->condition_get, &buffer->mutex); + } + // now see how much we can write + space = remaining(buffer); + if (space < size) size = space; + // here we know that we can write + end = buffer->index + buffer->read_bytes + buffer->unread_bytes; + end_offset = end % STREAM_BUFFER_BYTES; + // be careful to wrap around write + trailing = STREAM_BUFFER_BYTES - end_offset; + first_copy = (size <= trailing) ? size : trailing; + memcpy(buffer->data + end_offset, bytes, first_copy); + second_copy = size - first_copy; + memcpy(buffer->data, bytes + first_copy, second_copy); + // unread bytes changes obviously + buffer->unread_bytes += size; + // see if we should move index and change read_bytes + deficit = STREAM_BUFFER_BYTES - buffer->read_bytes - buffer->unread_bytes; + if (deficit < 0) { + // yeah + buffer->index -= deficit; + buffer->read_bytes += deficit; + } + pthread_mutex_unlock(&buffer->mutex); + // signal reader that it can proceed + pthread_cond_signal(&buffer->condition_put); +} + +void buffer_end_of_stream(StreamBuffer *buffer) +{ + pthread_mutex_lock(&buffer->mutex); + buffer->flags = END_OF_STREAM; + pthread_mutex_unlock(&buffer->mutex); + pthread_cond_signal(&buffer->condition_put); +} + +void buffer_error(StreamBuffer *buffer, StreamErrors error) +{ + pthread_mutex_lock(&buffer->mutex); + // set flags to both error and end of stream to get out of any waiting loop + buffer->flags = STREAM_ERROR | END_OF_STREAM; + buffer->error = error; + pthread_mutex_unlock(&buffer->mutex); + pthread_cond_signal(&buffer->condition_put); +} + diff --git a/ffmpeg/stream_buffer.h b/ffmpeg/stream_buffer.h new file mode 100644 index 0000000000..7835107457 --- /dev/null +++ b/ffmpeg/stream_buffer.h @@ -0,0 +1,66 @@ +#ifndef _LPMS_STREAM_BUFFER_H_ +#define _LPMS_STREAM_BUFFER_H_ + +#include +#include + +#define STREAM_BUFFER_BYTES (8 * 1024 * 1024) +#define PROTECTED_BYTES 1024 + +typedef enum { + END_OF_STREAM = 0x1, // end of current stream + STREAM_ERROR = 0x2 // some kind of error occured while loading data into + // queue (this for example to make "invalid file" kind + // of tests happy) +} StreamFlags; + +typedef enum { + OTHER_ERROR = 0, // error without dedicated handling + NO_ENTRY = 1, // ENOENT +} StreamErrors; + +typedef struct { + // These are called "pthread", but FFmpeg also has Windows implementation, + // so we should be safe on all reasonable platforms + pthread_cond_t condition_put; // signalled when data gets added or flags change + pthread_cond_t condition_get; // signalled when data gets read + pthread_mutex_t mutex; + // This is a circular buffer. It follows typical "index and the size" approach + // with several caveats + // 1) There is no "size", two values are used instead: "read_bytes" and + // "unread_bytes". Read bytes are the ones that were already accessed and + // _usually_ could be removed, except that we need to support (limited) seek() + // functionality for FFmpeg demuxing. So unlike typical circular buffer, bytes + // are not removed immediately on read() by moving input index and subtracting + // from size. Instead the number of bytes just read is added to "read_bytes" + // and subtracted from "unread_bytes", so by default no bytes are freed + // 2) Adding new data into the buffer initially only affects "unread_bytes". + // However, when there is not enough free data in the buffer, some of the + // bytes already read have to be written over, and so both index and + // "read_bytes" are affected. Care is taken to leave at least PROTECTED_BYTES + // untouched, so seek back is always possible within PROTECTED_BYTES range + int64_t index; + int64_t read_bytes; + int64_t unread_bytes; + uint8_t *data; + StreamFlags flags; + StreamErrors error; +} StreamBuffer; + +// NOT THREAD SAFE +int buffer_create(StreamBuffer *buffer); +void buffer_destroy(StreamBuffer *buffer); +// setup glue logic to allow ctx to use buffer as input +int buffer_setup_as_input(StreamBuffer *buffer, AVFormatContext *ctx); +void buffer_reset(StreamBuffer *buffer); + +// THREAD SAFE +// This adds bytes to the unread data in the buffer. Usually the function will +// not block, unless there is not enough place in the buffer. Then it will block +// waiting for a chance to write data. Bytes are copied. +void buffer_put_bytes(StreamBuffer *buffer, uint8_t *bytes, int64_t size); +void buffer_end_of_stream(StreamBuffer *buffer); +void buffer_error(StreamBuffer *buffer, StreamErrors error); + +#endif + diff --git a/ffmpeg/transcoder.c b/ffmpeg/transcoder.c index 245581ec1e..423e1dbf61 100755 --- a/ffmpeg/transcoder.c +++ b/ffmpeg/transcoder.c @@ -3,6 +3,7 @@ #include "filter.h" #include "encoder.h" #include "logging.h" +#include "stream_buffer.h" #include #include @@ -76,6 +77,11 @@ struct transcode_thread { int nb_outputs; AVFilterGraph *dnn_filtergraph; + + // Input buffer - when I/O is done outside of transcoder, for example in + // Low Latency scenarios + StreamBuffer input_buffer; + int use_buffer_for_input; }; // TODO: this feels like it belongs elsewhere, not in the top-level transcoder @@ -641,7 +647,7 @@ int lpms_transcode(input_params *inp, output_params *params, if (!needs_decoder(params[i].audio.name)) h->ictx.da = ++decode_a == nb_outputs; } - ret = open_input(inp, &h->ictx); + ret = open_input(inp, &h->ictx, h->use_buffer_for_input ? &h->input_buffer : 0); if (ret < 0) LPMS_ERR(transcode_cleanup, "Unable to open input"); // populate output contexts @@ -727,7 +733,7 @@ void lpms_transcode_stop(struct transcode_thread *handle) } if (handle->dnn_filtergraph) avfilter_graph_free(&handle->dnn_filtergraph); - + buffer_destroy(&handle->input_buffer); free(handle); } @@ -743,10 +749,16 @@ struct transcode_thread* lpms_transcode_new(lvpdnn_opts *dnn_opts) for (int i = 0; i < MAX_OUTPUT_SIZE; i++) { h->ictx.last_dts[i] = -1; } + + if (-1 == buffer_create(&h->input_buffer)) { + free(h); + return NULL; + } // handle dnn filter graph creation if (dnn_opts) { AVFilterGraph *filtergraph = create_dnn_filtergraph(dnn_opts); if (!filtergraph) { + buffer_destroy(&h->input_buffer); free(h); h = NULL; } else { @@ -763,3 +775,29 @@ void lpms_transcode_discontinuity(struct transcode_thread *handle) { handle->ictx.discontinuity[i] = 1; } } + +void lpms_transcode_push_reset(struct transcode_thread *handle, int on) +{ + if (!handle) return; + buffer_reset(&handle->input_buffer); + handle->use_buffer_for_input = on; +} + +void lpms_transcode_push_bytes(struct transcode_thread *handle, uint8_t *bytes, int size) +{ + if (!handle) return; + buffer_put_bytes(&handle->input_buffer, bytes, size); +} + +void lpms_transcode_push_eof(struct transcode_thread *handle) +{ + if (!handle) return; + buffer_end_of_stream(&handle->input_buffer); +} + +void lpms_transcode_push_error(struct transcode_thread *handle, int code) +{ + if (!handle) return; + buffer_error(&handle->input_buffer, code); +} + diff --git a/ffmpeg/transcoder.h b/ffmpeg/transcoder.h index e53a7635c0..83ebce44d8 100755 --- a/ffmpeg/transcoder.h +++ b/ffmpeg/transcoder.h @@ -101,5 +101,9 @@ int lpms_transcode(input_params *inp, output_params *params, output_results *res struct transcode_thread* lpms_transcode_new(lvpdnn_opts *dnn_opts); void lpms_transcode_stop(struct transcode_thread* handle); void lpms_transcode_discontinuity(struct transcode_thread *handle); +void lpms_transcode_push_reset(struct transcode_thread *handle, int on); +void lpms_transcode_push_bytes(struct transcode_thread* handle, uint8_t *bytes, int size); +void lpms_transcode_push_eof(struct transcode_thread *handle); +void lpms_transcode_push_error(struct transcode_thread *handle, int code); #endif // _LPMS_TRANSCODER_H_ From c74c5c69a3cfd8e2e23a0024ee7e38aa0c3320f8 Mon Sep 17 00:00:00 2001 From: Michal Adamczak Date: Sun, 24 Jul 2022 13:30:17 +0200 Subject: [PATCH 2/2] OutputQueue for output Output side counterpart to the StreamBuffer. It is not exactly the same solution, because input and output characteristics are quite different (for one, on the input side stream is just a series of bytes, whereas on the output side we are able to tell packets apart, assign timestamps, packet types, etc). There is simple Golang code for writing the data into the files, so that some tests will pass with this code. Note that the big problem with the queue output is that some muxers (for example mp4) need to be able to seek() in output, and we don't allow that. --- ffmpeg/encoder.c | 47 +++++++++++-- ffmpeg/encoder.h | 3 +- ffmpeg/ffmpeg.go | 51 ++++++++++++++ ffmpeg/filter.h | 4 ++ ffmpeg/output_queue.c | 158 ++++++++++++++++++++++++++++++++++++++++++ ffmpeg/output_queue.h | 57 +++++++++++++++ ffmpeg/transcoder.c | 29 +++++++- ffmpeg/transcoder.h | 5 ++ 8 files changed, 345 insertions(+), 9 deletions(-) create mode 100644 ffmpeg/output_queue.c create mode 100644 ffmpeg/output_queue.h diff --git a/ffmpeg/encoder.c b/ffmpeg/encoder.c index f951766f03..5ba1dffbbb 100755 --- a/ffmpeg/encoder.c +++ b/ffmpeg/encoder.c @@ -252,12 +252,16 @@ static int open_video_encoder(struct input_ctx *ictx, struct output_ctx *octx, static void free_output_no_trailer(struct output_ctx *octx, enum FreeOutputPolicy policy) { if (octx->oc) { - if (!(octx->oc->oformat->flags & AVFMT_NOFILE) && octx->oc->pb) { + // we check against AVFMT_FLAG_CUSTOM_IO to avoid trying to close file + // in case we are using custom i/o - this would cause crash + if (!(octx->oc->oformat->flags & AVFMT_NOFILE) && + !(octx->oc->flags & AVFMT_FLAG_CUSTOM_IO) && octx->oc->pb) { avio_closep(&octx->oc->pb); } avformat_free_context(octx->oc); octx->oc = NULL; } + queue_push_staging(&octx->write_context, END_OF_OUTPUT, -1); if (octx->vc && ((octx->hw_type == AV_HWDEVICE_TYPE_NONE) || (FORCE_CLOSE_HW_ENCODER == policy))) { avcodec_free_context(&octx->vc); @@ -295,7 +299,7 @@ void free_output(struct output_ctx *octx, enum FreeOutputPolicy policy) free_output_no_trailer(octx, policy); } -int open_output(struct output_ctx *octx, struct input_ctx *ictx) +int open_output(struct output_ctx *octx, struct input_ctx *ictx, OutputQueue *queue) { int ret = 0; @@ -351,8 +355,27 @@ int open_output(struct output_ctx *octx, struct input_ctx *ictx) // Muxer headers can be written now once streams were added if (!(fmt->flags & AVFMT_NOFILE)) { - ret = avio_open(&octx->oc->pb, octx->fname, AVIO_FLAG_WRITE); - if (ret < 0) LPMS_ERR(open_output_err, "Error opening output file"); + if (queue) { + // output through queue + ret = queue_setup_as_output(queue, &octx->write_context, octx->oc); + if (ret < 0) LPMS_ERR(open_output_err, "Error setting up output queue"); + // make sure muxer options are compatible with queue output + // TODO: not sure if that is the best option for detecting a container + // type but it is surprisingly hard to find guidance on that + if (fmt->mime_type && !strcmp("video/mp4", fmt->mime_type)) { + // Default configuration of MP4 muxer needs seekable output, which + // the queue is not able to provide. Passing the following flags removes + // seekable requirement. This is also configuration recommended for + // streaming purposes, so it seems better suited anyway (the whole point + // with queues is to provide Low Latency/streaming support) + ret = av_dict_set(&octx->muxer->opts, "movflags", "frag_keyframe+empty_moov", 0); + if (ret < 0) LPMS_ERR(open_output_err, "Error setting movflags for fragmented output"); + } + } else { + // normal file output + ret = avio_open(&octx->oc->pb, octx->fname, AVIO_FLAG_WRITE); + if (ret < 0) LPMS_ERR(open_output_err, "Error opening output file"); + } } // IMPORTANT: notice how up to and including this point open_output_err is @@ -364,6 +387,10 @@ int open_output(struct output_ctx *octx, struct input_ctx *ictx) // call free_output_no_trailer() exclusively! ret = avformat_write_header(octx->oc, &octx->muxer->opts); if (ret < 0) LPMS_ERR(open_output_err, "Error writing header"); + // flush headers +// ret = av_interleaved_write_frame(octx->oc, NULL); + if (ret < 0) LPMS_ERR(open_output_err, "Error flushing headers"); + queue_push_staging(&octx->write_context, BEGIN_OF_OUTPUT, 0); // From now on it is normal free_output(), hence after_header error label if(octx->sfilters != NULL && needs_decoder(octx->video->name) && octx->sf.active == 0) { @@ -431,6 +458,8 @@ static int encode(AVCodecContext* encoder, AVFrame *frame, struct output_ctx* oc int mux(AVPacket *pkt, AVRational tb, struct output_ctx *octx, AVStream *ost) { + int ret; + int64_t pts = pkt->pts; pkt->stream_index = ost->index; if (av_cmp_q(tb, ost->time_base)) { av_packet_rescale_ts(pkt, tb, ost->time_base); @@ -481,7 +510,15 @@ int mux(AVPacket *pkt, AVRational tb, struct output_ctx *octx, AVStream *ost) octx->last_video_dts = pkt->dts; } - return av_interleaved_write_frame(octx->oc, pkt); + // make sure correct timestamp will get carried through to output_queue + ret = av_interleaved_write_frame(octx->oc, pkt); + if (0 > ret) return ret; + // this means "flush output", we want to do it so that output_queue will get + // properly associated packets and timestamps + ret = av_interleaved_write_frame(octx->oc, NULL); + if (0 > ret) return ret; + queue_push_staging(&octx->write_context, PACKET_OUTPUT, pts); + return 0; } static int getmetadatainf(AVFrame *inf, struct output_ctx *octx) diff --git a/ffmpeg/encoder.h b/ffmpeg/encoder.h index 55b202749f..3466547b06 100644 --- a/ffmpeg/encoder.h +++ b/ffmpeg/encoder.h @@ -4,13 +4,14 @@ #include "decoder.h" #include "transcoder.h" #include "filter.h" +#include "output_queue.h" enum FreeOutputPolicy { FORCE_CLOSE_HW_ENCODER, PRESERVE_HW_ENCODER }; -int open_output(struct output_ctx *octx, struct input_ctx *ictx); +int open_output(struct output_ctx *octx, struct input_ctx *ictx, OutputQueue *queue); void free_output(struct output_ctx *octx, enum FreeOutputPolicy); int process_out(struct input_ctx *ictx, struct output_ctx *octx, AVCodecContext *encoder, AVStream *ost, struct filter_ctx *filter, AVFrame *inf); diff --git a/ffmpeg/ffmpeg.go b/ffmpeg/ffmpeg.go index 4c875664a4..afd247aa76 100755 --- a/ffmpeg/ffmpeg.go +++ b/ffmpeg/ffmpeg.go @@ -636,6 +636,47 @@ func loadInputBuffer(t *Transcoder, input *TranscodeOptionsIn) { } } +func storeOutputQueue(t *Transcoder, outputs []TranscodeOptions) (error) { + var fds = make([]*os.File, len(outputs)) + for i := range fds { + file, err := os.Create(outputs[i].Oname) + if nil != err { + return err + } + fds[i] = file + } + for { + // get next output packet + packet := C.lpms_transcode_peek_packet(t.handle) + if 8 == packet.flags { + break + } + // this is a data packet, write it to file + data := C.GoBytes(unsafe.Pointer(packet.data), packet.size) + bytes, err := fds[packet.index].Write(data) + if nil != err { + return err + } + if bytes != int(packet.size) { + panic("storeOutputQueue couldn't write all bytes error") + } + // pop data packet + C.lpms_transcode_pop_packet(t.handle) + } + // if we are here, we just have terminating packet, remove it + // (terminating packet carries no data, it is added there to signify + // the end of all input) + C.lpms_transcode_pop_packet(t.handle) + // Close all the open files + for i := range fds { + if nil != fds[i] { + fds[i].Close() + } + } + // Success + return nil +} + // create C output params array and return it along with corresponding finalizer // function that makes sure there are no C memory leaks @@ -961,6 +1002,16 @@ func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions) } loadInputBuffer(t, input) ret := int(C.lpms_transcode(inp, paramsPointer, resultsPointer, C.int(len(params)), decoded)) + // be careful to use storeOutputQueue to fake lpms_transcode so that the test + // reacts properly + if (ret == 0) { + err = storeOutputQueue(t, ps) + if nil != err { + // fake output error + ret = C.AVERROR_STREAM_NOT_FOUND + } + } + if ret != 0 { if LogTranscodeErrors { glog.Error("Transcoder Return : ", ErrorMap[ret]) diff --git a/ffmpeg/filter.h b/ffmpeg/filter.h index 5e6995c493..15a37da656 100755 --- a/ffmpeg/filter.h +++ b/ffmpeg/filter.h @@ -3,6 +3,7 @@ #include #include "decoder.h" +#include "output_queue.h" struct filter_ctx { int active; @@ -36,6 +37,7 @@ struct filter_ctx { int flushing; }; +// TODO move this away, this ain't filter struct output_ctx { char *fname; // required output file name char *vfilters; // required output video filters @@ -74,6 +76,8 @@ struct output_ctx { output_results *res; // data to return for this output char *xcoderParams; + + WriteContext write_context; }; int init_video_filters(struct input_ctx *ictx, struct output_ctx *octx); diff --git a/ffmpeg/output_queue.c b/ffmpeg/output_queue.c new file mode 100644 index 0000000000..1700bd6e01 --- /dev/null +++ b/ffmpeg/output_queue.c @@ -0,0 +1,158 @@ +#include "output_queue.h" + +void queue_create(OutputQueue *queue) +{ + pthread_mutex_init(&queue->mutex, NULL); + pthread_cond_init(&queue->condition, NULL); + queue->front = queue->back = NULL; +} + +void queue_destroy(OutputQueue *queue) +{ + pthread_mutex_destroy(&queue->mutex); + pthread_cond_destroy(&queue->condition); + queue_reset(queue); +} + +static int queue_write_function(void *user_data, uint8_t *buf, int buf_size) +{ + WriteContext *wctx = (WriteContext *)user_data; + // Prepare packet + OutputPacket *packet = (OutputPacket *)malloc(sizeof(OutputPacket)); + if (!packet) return -1; + packet->data = (uint8_t *)malloc(buf_size); + if (!packet->data) { + free(packet); + return -1; + } + memcpy(packet->data, buf, buf_size); + packet->size = buf_size; + packet->index = wctx->index; + packet->next = NULL; + // Important - we are not adding to the queue now. This is because we don't + // know which flags to assign yet - for example, we can't assign END_OF_OUTPUT + // because we don't know if we are at last packet or not. Instead, packets are + // added to the staging area and queue_push_staging will be used to add them + // to the queue after muxing operation finishes. + if (wctx->staging_back) { + // not the first packet + wctx->staging_back->next = packet; + wctx->staging_back = packet; + } else { + // first packet + wctx->staging_front = wctx->staging_back = packet; + } + return buf_size; +} + +int queue_setup_as_output(OutputQueue *queue, WriteContext *wctx, AVFormatContext *ctx) +{ + wctx->queue = queue; + wctx->staging_front = wctx->staging_back = NULL; + // IMPORTANT: I am not sure if ffmpeg documentation states that explicitly, + // but the memory of ctx->pb as well as its io_buffer seem to be released when + // ctx will get closed. I tried otherwise and got "double free" errors +#define BUFFER_SIZE 4096 + void *io_buffer = av_malloc(BUFFER_SIZE); + if (!io_buffer) return -1; + ctx->pb = avio_alloc_context( + io_buffer, BUFFER_SIZE, // buffer and size + 1, // write allowed + wctx, // pass write context as user data + NULL, // no read function supplied + queue_write_function, + NULL); // no seek function supplied + if (!ctx->pb) return -1; + ctx->flags |= AVFMT_FLAG_CUSTOM_IO | AVFMT_FLAG_FLUSH_PACKETS; + return 0; +} + +void queue_reset(OutputQueue *queue) +{ + while (queue->front) { + OutputPacket *tmp = queue->front; + queue->front = queue->front->next; + if (tmp->data) free(tmp->data); + free(tmp); + } + queue->back = NULL; +} + +const OutputPacket *queue_peek_front(OutputQueue *queue) +{ + OutputPacket *tmp; + pthread_mutex_lock(&queue->mutex); + while (!queue->front) { + // wait until there is packet in the buffer + pthread_cond_wait(&queue->condition, &queue->mutex); + } + tmp = queue->front; + pthread_mutex_unlock(&queue->mutex); + return tmp; +} + +void queue_pop_front(OutputQueue *queue) +{ + OutputPacket *tmp; + pthread_mutex_lock(&queue->mutex); + while (!queue->front) { + // wait until there is packet in the buffer + pthread_cond_wait(&queue->condition, &queue->mutex); + } + tmp = queue->front; + queue->front = queue->front->next; + if (!queue->front) queue->back = NULL; + pthread_mutex_unlock(&queue->mutex); + if (tmp->data) free(tmp->data); + free(tmp); +} + +void queue_push_staging(WriteContext *wctx, PacketFlags flags, int64_t timestamp) +{ + // iterate over staging area setting flags and timestamps + OutputPacket *packet = wctx->staging_front; + // Make sure that END_OF_OUTPUT only gets assigned to the last packet + // this is because the caller knows all packets are emitted, but it + // doesn't know how many of them + PacketFlags safe_flags = flags & ~END_OF_OUTPUT; + if (!packet) return; // nothing to do + while (packet) { + packet->flags = packet->next ? safe_flags : flags; + packet->timestamp = timestamp; + packet = packet->next; + } + // move staging area into queue + pthread_mutex_lock(&wctx->queue->mutex); + if (wctx->queue->back) { + // not empty queue + wctx->queue->back->next = wctx->staging_front; + wctx->queue->back = wctx->staging_back; + } else { + // empty queue + wctx->queue->front = wctx->staging_front; + wctx->queue->back = wctx->staging_back; + } + wctx->staging_front = wctx->staging_back = NULL; + pthread_mutex_unlock(&wctx->queue->mutex); + pthread_cond_signal(&wctx->queue->condition); +} + +int queue_push_end(OutputQueue *queue) +{ + OutputPacket *packet = (OutputPacket *)malloc(sizeof(OutputPacket)); + if (!packet) return -1; + packet->size = 0; + packet->data = NULL; + packet->timestamp = -1; + packet->flags = END_OF_ALL_OUTPUTS; + pthread_mutex_lock(&queue->mutex); + if (queue->back) { + queue->back->next = packet; + queue->back = packet; + } else { + queue->front = queue->back = packet; + } + pthread_mutex_unlock(&queue->mutex); + pthread_cond_signal(&queue->condition); + return 0; +} diff --git a/ffmpeg/output_queue.h b/ffmpeg/output_queue.h new file mode 100644 index 0000000000..33f49064e6 --- /dev/null +++ b/ffmpeg/output_queue.h @@ -0,0 +1,57 @@ +#ifndef _LPMS_OUTPUT_QUEUE_H_ +#define _LPMS_OUTPUT_QUEUE_H_ + +#include +#include + +typedef enum { + BEGIN_OF_OUTPUT = 0x1, // before first packet is muxed - headers, etc + // (these packets will have timestamps of -1) + PACKET_OUTPUT = 0x2, // data packet - has valid timestamp + END_OF_OUTPUT = 0x4, // end of current stream (trailers, also ts == -1) + END_OF_ALL_OUTPUTS = 0x8 // very last packet, no data beyond +} PacketFlags; + +typedef struct _OutputPacket { + struct _OutputPacket *next; + uint8_t *data; + int size; + int index; + PacketFlags flags; + int64_t timestamp; +} OutputPacket; + +typedef struct { + // These are called "pthread", but FFmpeg also has Windows implementation, + // so we should be safe on all reasonable platforms + pthread_cond_t condition; + pthread_mutex_t mutex; + OutputPacket *front; + OutputPacket *back; +} OutputQueue; + +typedef struct { + // Queue to use + OutputQueue *queue; + int index; + // Staging area for packets + OutputPacket *staging_front; + OutputPacket *staging_back; +} WriteContext; + +// NOT THREAD SAFE +void queue_create(OutputQueue *queue); +void queue_destroy(OutputQueue *queue); +// setup glue logic to allow ctx to use queue as output +int queue_setup_as_output(OutputQueue *queue, WriteContext *wctx, AVFormatContext *ctx); +void queue_reset(OutputQueue *queue); + +// THREAD SAFE +const OutputPacket *queue_peek_front(OutputQueue *queue); +void queue_pop_front(OutputQueue *queue); +void queue_push_staging(WriteContext *wctx, PacketFlags flags, int64_t timestamp); +int queue_push_end(OutputQueue *queue); + +#endif + + diff --git a/ffmpeg/transcoder.c b/ffmpeg/transcoder.c index 423e1dbf61..faad8fb3cd 100755 --- a/ffmpeg/transcoder.c +++ b/ffmpeg/transcoder.c @@ -4,6 +4,7 @@ #include "encoder.h" #include "logging.h" #include "stream_buffer.h" +#include "output_queue.h" #include #include @@ -81,7 +82,9 @@ struct transcode_thread { // Input buffer - when I/O is done outside of transcoder, for example in // Low Latency scenarios StreamBuffer input_buffer; - int use_buffer_for_input; + int use_buffer_for_input; // TODO: name it "use custom output" or some such + // Output + OutputQueue output_queue; }; // TODO: this feels like it belongs elsewhere, not in the top-level transcoder @@ -647,7 +650,7 @@ int lpms_transcode(input_params *inp, output_params *params, if (!needs_decoder(params[i].audio.name)) h->ictx.da = ++decode_a == nb_outputs; } - ret = open_input(inp, &h->ictx, h->use_buffer_for_input ? &h->input_buffer : 0); + ret = open_input(inp, &h->ictx, h->use_buffer_for_input ? &h->input_buffer : NULL); if (ret < 0) LPMS_ERR(transcode_cleanup, "Unable to open input"); // populate output contexts @@ -680,8 +683,9 @@ int lpms_transcode(input_params *inp, output_params *params, octx->dv = h->ictx.vi < 0 || is_drop(octx->video->name); octx->da = h->ictx.ai < 0 || is_drop(octx->audio->name); octx->res = &results[i]; + octx->write_context.index = i; - ret = open_output(octx, &h->ictx); + ret = open_output(octx, &h->ictx, h->use_buffer_for_input ? &h->output_queue : NULL); if (ret < 0) LPMS_ERR(transcode_cleanup, "Unable to open output"); } @@ -695,6 +699,10 @@ int lpms_transcode(input_params *inp, output_params *params, // Part IV: shutdown transcode_cleanup: + if (h->use_buffer_for_input) { + // terminate output queue + queue_push_end(&h->output_queue); + } // IMPORTANT: note that this is the only place when PRESERVE_HW_ENCODER and // PRESERVE_HW_DECODER are used. This is done to retain HW encoder and decoder @@ -754,11 +762,14 @@ struct transcode_thread* lpms_transcode_new(lvpdnn_opts *dnn_opts) free(h); return NULL; } + queue_create(&h->output_queue); + // handle dnn filter graph creation if (dnn_opts) { AVFilterGraph *filtergraph = create_dnn_filtergraph(dnn_opts); if (!filtergraph) { buffer_destroy(&h->input_buffer); + queue_destroy(&h->output_queue); free(h); h = NULL; } else { @@ -780,6 +791,7 @@ void lpms_transcode_push_reset(struct transcode_thread *handle, int on) { if (!handle) return; buffer_reset(&handle->input_buffer); + queue_reset(&handle->output_queue); handle->use_buffer_for_input = on; } @@ -801,3 +813,14 @@ void lpms_transcode_push_error(struct transcode_thread *handle, int code) buffer_error(&handle->input_buffer, code); } +const OutputPacket *lpms_transcode_peek_packet(struct transcode_thread *handle) +{ + if (!handle) return NULL; + return queue_peek_front(&handle->output_queue); +} + +void lpms_transcode_pop_packet(struct transcode_thread *handle) +{ + if (!handle) return; + return queue_pop_front(&handle->output_queue); +} diff --git a/ffmpeg/transcoder.h b/ffmpeg/transcoder.h index 83ebce44d8..bc23269973 100755 --- a/ffmpeg/transcoder.h +++ b/ffmpeg/transcoder.h @@ -7,6 +7,7 @@ #include #include #include "logging.h" +#include "output_queue.h" // LPMS specific errors extern const int lpms_ERR_INPUT_PIXFMT; @@ -101,9 +102,13 @@ int lpms_transcode(input_params *inp, output_params *params, output_results *res struct transcode_thread* lpms_transcode_new(lvpdnn_opts *dnn_opts); void lpms_transcode_stop(struct transcode_thread* handle); void lpms_transcode_discontinuity(struct transcode_thread *handle); +// LL interface for input void lpms_transcode_push_reset(struct transcode_thread *handle, int on); void lpms_transcode_push_bytes(struct transcode_thread* handle, uint8_t *bytes, int size); void lpms_transcode_push_eof(struct transcode_thread *handle); void lpms_transcode_push_error(struct transcode_thread *handle, int code); +// LL interface for output +const OutputPacket *lpms_transcode_peek_packet(struct transcode_thread *handle); +void lpms_transcode_pop_packet(struct transcode_thread *handle); #endif // _LPMS_TRANSCODER_H_