Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions ffmpeg/decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ int flush_in(struct input_ctx *ictx, AVFrame *frame, int *stream_index)
return AVERROR_EOF;
}

// FIXME: name me and the other function better
// FIXME: name me and the other function better (and move to utilities...)
enum AVPixelFormat hw2pixfmt(AVCodecContext *ctx)
{
const AVCodec *decoder = ctx->codec;
Expand Down Expand Up @@ -254,7 +254,26 @@ static void close_video_decoder(struct input_ctx *ictx)
if (ictx->last_frame_v) av_frame_free(&ictx->last_frame_v);
}

int open_input(input_params *params, struct input_ctx *ctx)
int open_demuxer(input_params *params, struct input_ctx *ctx, StreamBuffer *buffer)
{
int ret;
ctx->ic = avformat_alloc_context();
if (buffer) {
// using custom input
if (buffer_setup_as_input(buffer, ctx->ic) < 0) {
// memory allocation failure
return -1;
}
// instruct FFmpeg to use our input, note that file name is empty
ret = avformat_open_input(&ctx->ic, "", NULL, NULL);
} else {
// normal file-based input, note that file name is passed
ret = avformat_open_input(&ctx->ic, params->fname, NULL, NULL);
}
return ret;
}

int open_input(input_params *params, struct input_ctx *ctx, StreamBuffer *buffer)
{
char *inp = params->fname;
int ret = 0;
Expand All @@ -268,7 +287,9 @@ int open_input(input_params *params, struct input_ctx *ctx)
ctx->device = params->device;

// open demuxer
ret = avformat_open_input(&ctx->ic, inp, NULL, NULL);
ret = open_demuxer(params, ctx, buffer);
// TODO: maybe move stuff below to open_demuxer? Could also remove
// indices on the input side, just like it was done for the output
if (ret < 0) LPMS_ERR(open_input_err, "demuxer: Unable to open input");
ret = avformat_find_stream_info(ctx->ic, NULL);
if (ret < 0) LPMS_ERR(open_input_err, "Unable to find input info");
Expand Down Expand Up @@ -341,4 +362,3 @@ void free_input(struct input_ctx *ictx, enum FreeInputPolicy policy)
// audio decoder is always closed
close_audio_decoder(ictx);
}

3 changes: 2 additions & 1 deletion ffmpeg/decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <libavcodec/avcodec.h>
#include <libavutil/opt.h>
#include "transcoder.h"
#include "stream_buffer.h"

struct input_ctx {
AVFormatContext *ic; // demuxer required
Expand Down Expand Up @@ -67,7 +68,7 @@ int demux_in(struct input_ctx *ictx, AVPacket *pkt);
int decode_in(struct input_ctx *ictx, AVPacket *pkt, AVFrame *frame, int *stream_index);
int flush_in(struct input_ctx *ictx, AVFrame *frame, int *stream_index);
int process_in(struct input_ctx *ictx, AVFrame *frame, AVPacket *pkt, int *stream_index);
int open_input(input_params *params, struct input_ctx *ctx);
int open_input(input_params *params, struct input_ctx *ctx, StreamBuffer *buffer);
void free_input(struct input_ctx *inctx, enum FreeInputPolicy policy);
// this should perhaps be moved to some utility file, as it is not decoder
// specific
Expand Down
47 changes: 42 additions & 5 deletions ffmpeg/encoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,16 @@ static int open_video_encoder(struct input_ctx *ictx, struct output_ctx *octx,
static void free_output_no_trailer(struct output_ctx *octx, enum FreeOutputPolicy policy)
{
if (octx->oc) {
if (!(octx->oc->oformat->flags & AVFMT_NOFILE) && octx->oc->pb) {
// we check against AVFMT_FLAG_CUSTOM_IO to avoid trying to close file
// in case we are using custom i/o - this would cause crash
if (!(octx->oc->oformat->flags & AVFMT_NOFILE) &&
!(octx->oc->flags & AVFMT_FLAG_CUSTOM_IO) && octx->oc->pb) {
avio_closep(&octx->oc->pb);
}
avformat_free_context(octx->oc);
octx->oc = NULL;
}
queue_push_staging(&octx->write_context, END_OF_OUTPUT, -1);
if (octx->vc &&
((octx->hw_type == AV_HWDEVICE_TYPE_NONE) || (FORCE_CLOSE_HW_ENCODER == policy))) {
avcodec_free_context(&octx->vc);
Expand Down Expand Up @@ -295,7 +299,7 @@ void free_output(struct output_ctx *octx, enum FreeOutputPolicy policy)
free_output_no_trailer(octx, policy);
}

int open_output(struct output_ctx *octx, struct input_ctx *ictx)
int open_output(struct output_ctx *octx, struct input_ctx *ictx, OutputQueue *queue)
{
int ret = 0;

Expand Down Expand Up @@ -351,8 +355,27 @@ int open_output(struct output_ctx *octx, struct input_ctx *ictx)

// Muxer headers can be written now once streams were added
if (!(fmt->flags & AVFMT_NOFILE)) {
ret = avio_open(&octx->oc->pb, octx->fname, AVIO_FLAG_WRITE);
if (ret < 0) LPMS_ERR(open_output_err, "Error opening output file");
if (queue) {
// output through queue
ret = queue_setup_as_output(queue, &octx->write_context, octx->oc);
if (ret < 0) LPMS_ERR(open_output_err, "Error setting up output queue");
// make sure muxer options are compatible with queue output
// TODO: not sure if that is the best option for detecting a container
// type but it is surprisingly hard to find guidance on that
if (fmt->mime_type && !strcmp("video/mp4", fmt->mime_type)) {
// Default configuration of MP4 muxer needs seekable output, which
// the queue is not able to provide. Passing the following flags removes
// seekable requirement. This is also configuration recommended for
// streaming purposes, so it seems better suited anyway (the whole point
// with queues is to provide Low Latency/streaming support)
ret = av_dict_set(&octx->muxer->opts, "movflags", "frag_keyframe+empty_moov", 0);
if (ret < 0) LPMS_ERR(open_output_err, "Error setting movflags for fragmented output");
}
} else {
// normal file output
ret = avio_open(&octx->oc->pb, octx->fname, AVIO_FLAG_WRITE);
if (ret < 0) LPMS_ERR(open_output_err, "Error opening output file");
}
}

// IMPORTANT: notice how up to and including this point open_output_err is
Expand All @@ -364,6 +387,10 @@ int open_output(struct output_ctx *octx, struct input_ctx *ictx)
// call free_output_no_trailer() exclusively!
ret = avformat_write_header(octx->oc, &octx->muxer->opts);
if (ret < 0) LPMS_ERR(open_output_err, "Error writing header");
// flush headers
// ret = av_interleaved_write_frame(octx->oc, NULL);
if (ret < 0) LPMS_ERR(open_output_err, "Error flushing headers");
queue_push_staging(&octx->write_context, BEGIN_OF_OUTPUT, 0);

// From now on it is normal free_output(), hence after_header error label
if(octx->sfilters != NULL && needs_decoder(octx->video->name) && octx->sf.active == 0) {
Expand Down Expand Up @@ -431,6 +458,8 @@ static int encode(AVCodecContext* encoder, AVFrame *frame, struct output_ctx* oc

int mux(AVPacket *pkt, AVRational tb, struct output_ctx *octx, AVStream *ost)
{
int ret;
int64_t pts = pkt->pts;
pkt->stream_index = ost->index;
if (av_cmp_q(tb, ost->time_base)) {
av_packet_rescale_ts(pkt, tb, ost->time_base);
Expand Down Expand Up @@ -481,7 +510,15 @@ int mux(AVPacket *pkt, AVRational tb, struct output_ctx *octx, AVStream *ost)
octx->last_video_dts = pkt->dts;
}

return av_interleaved_write_frame(octx->oc, pkt);
// make sure correct timestamp will get carried through to output_queue
ret = av_interleaved_write_frame(octx->oc, pkt);
if (0 > ret) return ret;
// this means "flush output", we want to do it so that output_queue will get
// properly associated packets and timestamps
ret = av_interleaved_write_frame(octx->oc, NULL);
if (0 > ret) return ret;
queue_push_staging(&octx->write_context, PACKET_OUTPUT, pts);
return 0;
}

static int getmetadatainf(AVFrame *inf, struct output_ctx *octx)
Expand Down
3 changes: 2 additions & 1 deletion ffmpeg/encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
#include "decoder.h"
#include "transcoder.h"
#include "filter.h"
#include "output_queue.h"

enum FreeOutputPolicy {
FORCE_CLOSE_HW_ENCODER,
PRESERVE_HW_ENCODER
};

int open_output(struct output_ctx *octx, struct input_ctx *ictx);
int open_output(struct output_ctx *octx, struct input_ctx *ictx, OutputQueue *queue);
void free_output(struct output_ctx *octx, enum FreeOutputPolicy);
int process_out(struct input_ctx *ictx, struct output_ctx *octx, AVCodecContext *encoder, AVStream *ost,
struct filter_ctx *filter, AVFrame *inf);
Expand Down
82 changes: 82 additions & 0 deletions ffmpeg/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,77 @@ func isAudioAllDrop(ps []TranscodeOptions) bool {
return true
}

// load input file into input buffer
func loadInputBuffer(t *Transcoder, input *TranscodeOptionsIn) {
if strings.HasPrefix(strings.ToLower(input.Fname), "pipe:") {
C.lpms_transcode_push_reset(t.handle, 0)
fmt.Println("Skipping buffer input for pipe")
return
}
C.lpms_transcode_push_reset(t.handle, 1)
data, err := os.ReadFile(input.Fname)
if nil != err {
// error loading file
// translate Go error into proper FFmpeg error to get expected behavior
// of tests
var error C.int
if errors.Is(err, os.ErrNotExist) {
error = 1
} else {
error = 0
}
C.lpms_transcode_push_error(t.handle, error);
fmt.Println("Error while loading the queue", err)
} else {
// file loaded fine
C.lpms_transcode_push_bytes(t.handle, (*C.uchar)(unsafe.Pointer(&data[0])), C.int(len(data)))
C.lpms_transcode_push_eof(t.handle)
fmt.Println(len(data), "bytes of input loaded into buffer")
}
}

func storeOutputQueue(t *Transcoder, outputs []TranscodeOptions) (error) {
var fds = make([]*os.File, len(outputs))
for i := range fds {
file, err := os.Create(outputs[i].Oname)
if nil != err {
return err
}
fds[i] = file
}
for {
// get next output packet
packet := C.lpms_transcode_peek_packet(t.handle)
if 8 == packet.flags {
break
}
// this is a data packet, write it to file
data := C.GoBytes(unsafe.Pointer(packet.data), packet.size)
bytes, err := fds[packet.index].Write(data)
if nil != err {
return err
}
if bytes != int(packet.size) {
panic("storeOutputQueue couldn't write all bytes error")
}
// pop data packet
C.lpms_transcode_pop_packet(t.handle)
}
// if we are here, we just have terminating packet, remove it
// (terminating packet carries no data, it is added there to signify
// the end of all input)
C.lpms_transcode_pop_packet(t.handle)
// Close all the open files
for i := range fds {
if nil != fds[i] {
fds[i].Close()
}
}
// Success
return nil
}


// create C output params array and return it along with corresponding finalizer
// function that makes sure there are no C memory leaks
func createCOutputParams(input *TranscodeOptionsIn, ps []TranscodeOptions) ([]C.output_params, func(), error) {
Expand Down Expand Up @@ -929,7 +1000,18 @@ func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions)
paramsPointer = (*C.output_params)(&params[0])
resultsPointer = (*C.output_results)(&results[0])
}
loadInputBuffer(t, input)
ret := int(C.lpms_transcode(inp, paramsPointer, resultsPointer, C.int(len(params)), decoded))
// be careful to use storeOutputQueue to fake lpms_transcode so that the test
// reacts properly
if (ret == 0) {
err = storeOutputQueue(t, ps)
if nil != err {
// fake output error
ret = C.AVERROR_STREAM_NOT_FOUND
}
}

if ret != 0 {
if LogTranscodeErrors {
glog.Error("Transcoder Return : ", ErrorMap[ret])
Expand Down
4 changes: 4 additions & 0 deletions ffmpeg/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <libavfilter/avfilter.h>
#include "decoder.h"
#include "output_queue.h"

struct filter_ctx {
int active;
Expand Down Expand Up @@ -36,6 +37,7 @@ struct filter_ctx {
int flushing;
};

// TODO move this away, this ain't filter
struct output_ctx {
char *fname; // required output file name
char *vfilters; // required output video filters
Expand Down Expand Up @@ -74,6 +76,8 @@ struct output_ctx {

output_results *res; // data to return for this output
char *xcoderParams;

WriteContext write_context;
};

int init_video_filters(struct input_ctx *ictx, struct output_ctx *octx);
Expand Down
Loading