Skip to content

Commit

Permalink
Fix encoder buffer leak (#132)
Browse files Browse the repository at this point in the history
* Improve demo app

* Fix encoder buffer leak
  • Loading branch information
natario1 authored Mar 30, 2021
1 parent a283dca commit f489ae9
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.function.Consumer;

import androidx.annotation.NonNull;
import androidx.appcompat.app.AppCompatActivity;
import androidx.core.content.FileProvider;

import kotlin.collections.ArraysKt;


public class TranscoderActivity extends AppCompatActivity implements
TranscoderListener {
Expand Down Expand Up @@ -70,9 +75,6 @@ public class TranscoderActivity extends AppCompatActivity implements
private boolean mIsTranscoding;
private boolean mIsAudioOnly;
private Future<Void> mTranscodeFuture;
private Uri mTranscodeInputUri1;
private Uri mTranscodeInputUri2;
private Uri mTranscodeInputUri3;
private Uri mAudioReplacementUri;
private File mTranscodeOutputFile;
private long mTranscodeStartTime;
Expand Down Expand Up @@ -241,15 +243,13 @@ protected void onActivityResult(int requestCode, int resultCode, final Intent da
&& data != null) {
if (data.getClipData() != null) {
ClipData clipData = data.getClipData();
mTranscodeInputUri1 = clipData.getItemAt(0).getUri();
mTranscodeInputUri2 = clipData.getItemCount() >= 2 ? clipData.getItemAt(1).getUri() : null;
mTranscodeInputUri3 = clipData.getItemCount() >= 3 ? clipData.getItemAt(2).getUri() : null;
transcode();
List<Uri> uris = new ArrayList<>();
for (int i = 0; i < clipData.getItemCount(); i++) {
uris.add(clipData.getItemAt(i).getUri());
}
transcode(uris.toArray(new Uri[0]));
} else if (data.getData() != null) {
mTranscodeInputUri1 = data.getData();
mTranscodeInputUri2 = null;
mTranscodeInputUri3 = null;
transcode();
transcode(data.getData());
}
}
if (requestCode == REQUEST_CODE_PICK_AUDIO
Expand All @@ -262,7 +262,7 @@ protected void onActivityResult(int requestCode, int resultCode, final Intent da
}
}

private void transcode() {
private void transcode(@NonNull Uri... uris) {
// Create a temporary file for output.
try {
File outputDir = new File(getExternalFilesDir(null), "outputs");
Expand Down Expand Up @@ -296,20 +296,16 @@ private void transcode() {
setIsTranscoding(true);
LOG.e("Building transcoding options...");
TranscoderOptions.Builder builder = Transcoder.into(mTranscodeOutputFile.getAbsolutePath());
List<DataSource> sources = ArraysKt.map(uris, uri -> new UriDataSource(this, uri));
sources.set(0, new TrimDataSource(sources.get(0), mTrimStartUs, mTrimEndUs));
if (mAudioReplacementUri == null) {
if (mTranscodeInputUri1 != null) {
DataSource source = new UriDataSource(this, mTranscodeInputUri1);
builder.addDataSource(new TrimDataSource(source, mTrimStartUs, mTrimEndUs));
for (DataSource source : sources) {
builder.addDataSource(source);
}
if (mTranscodeInputUri2 != null) builder.addDataSource(this, mTranscodeInputUri2);
if (mTranscodeInputUri3 != null) builder.addDataSource(this, mTranscodeInputUri3);
} else {
if (mTranscodeInputUri1 != null) {
DataSource source = new UriDataSource(this, mTranscodeInputUri1);
builder.addDataSource(TrackType.VIDEO, new TrimDataSource(source, mTrimStartUs, mTrimEndUs));
for (DataSource source : sources) {
builder.addDataSource(TrackType.VIDEO, source);
}
if (mTranscodeInputUri2 != null) builder.addDataSource(TrackType.VIDEO, this, mTranscodeInputUri2);
if (mTranscodeInputUri3 != null) builder.addDataSource(TrackType.VIDEO, this, mTranscodeInputUri3);
builder.addDataSource(TrackType.AUDIO, this, mAudioReplacementUri);
}
LOG.e("Starting transcoding!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import com.otaliastudios.transcoder.internal.audio.remix.AudioRemixer
import com.otaliastudios.transcoder.internal.codec.*
import com.otaliastudios.transcoder.internal.pipeline.*
import com.otaliastudios.transcoder.internal.utils.Logger
import com.otaliastudios.transcoder.internal.utils.trackMapOf
import com.otaliastudios.transcoder.resample.AudioResampler
import com.otaliastudios.transcoder.stretch.AudioStretcher
import java.util.concurrent.atomic.AtomicInteger
import kotlin.math.ceil
import kotlin.math.floor

Expand All @@ -22,7 +24,10 @@ internal class AudioEngine(
private val targetFormat: MediaFormat
): QueuedStep<DecoderData, DecoderChannel, EncoderData, EncoderChannel>(), DecoderChannel {

private val log = Logger("AudioEngine")
companion object {
private val ID = AtomicInteger(0)
}
private val log = Logger("AudioEngine(${ID.getAndIncrement()})")

override val channel = this
private val buffers = ShortBuffers()
Expand All @@ -37,12 +42,14 @@ internal class AudioEngine(
override fun handleSourceFormat(sourceFormat: MediaFormat): Surface? = null

override fun handleRawFormat(rawFormat: MediaFormat) {
log.i("handleRawFormat($rawFormat)")
this.rawFormat = rawFormat
remixer = AudioRemixer[rawFormat.channels, targetFormat.channels]
chunks = ChunkQueue(rawFormat.sampleRate, rawFormat.channels)
}

override fun enqueueEos(data: DecoderData) {
log.i("enqueueEos()")
data.release(false)
chunks.enqueueEos()
}
Expand All @@ -55,8 +62,14 @@ internal class AudioEngine(
}

override fun drain(): State<EncoderData> {
if (chunks.isEmpty()) return State.Wait
val (outBytes, outId) = next.buffer() ?: return State.Wait
if (chunks.isEmpty()) {
log.i("drain(): no chunks, waiting...")
return State.Wait
}
val (outBytes, outId) = next.buffer() ?: return run {
log.i("drain(): no next buffer, waiting...")
State.Wait
}
val outBuffer = outBytes.asShortBuffer()
return chunks.drain(
eos = State.Eos(EncoderData(outBytes, outId, 0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.otaliastudios.transcoder.internal.codec
import android.media.MediaCodec.*
import android.media.MediaFormat
import android.view.Surface
import com.otaliastudios.transcoder.common.trackType
import com.otaliastudios.transcoder.internal.data.ReaderChannel
import com.otaliastudios.transcoder.internal.data.ReaderData
import com.otaliastudios.transcoder.internal.media.MediaCodecBuffers
Expand All @@ -11,7 +12,11 @@ import com.otaliastudios.transcoder.internal.pipeline.Channel
import com.otaliastudios.transcoder.internal.pipeline.QueuedStep
import com.otaliastudios.transcoder.internal.pipeline.State
import com.otaliastudios.transcoder.internal.utils.Logger
import com.otaliastudios.transcoder.internal.utils.trackMapOf
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
import kotlin.properties.Delegates
import kotlin.properties.Delegates.observable


internal open class DecoderData(
Expand All @@ -30,32 +35,52 @@ internal class Decoder(
continuous: Boolean, // relevant if the source sends no-render chunks. should we compensate or not?
) : QueuedStep<ReaderData, ReaderChannel, DecoderData, DecoderChannel>(), ReaderChannel {

private val log = Logger("Decoder")
companion object {
private val ID = trackMapOf(AtomicInteger(0), AtomicInteger(0))
}

private val log = Logger("Decoder(${format.trackType},${ID[format.trackType].getAndIncrement()})")
override val channel = this

private val codec = createDecoderByType(format.getString(MediaFormat.KEY_MIME)!!)
private val buffers by lazy { MediaCodecBuffers(codec) }
private var info = BufferInfo()
private val dropper = DecoderDropper(continuous)

private var dequeuedInputs by observable(0) { _, _, _ -> printDequeued() }
private var dequeuedOutputs by observable(0) { _, _, _ -> printDequeued() }
private fun printDequeued() {
// log.v("dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs")
}

override fun initialize(next: DecoderChannel) {
super.initialize(next)
log.i("initialize()")
val surface = next.handleSourceFormat(format)
codec.configure(format, surface, null, 0)
codec.start()
}

override fun buffer(): Pair<ByteBuffer, Int>? {
val id = codec.dequeueInputBuffer(0)
log.v("buffer(): id=$id")
return if (id >= 0) buffers.getInputBuffer(id) to id else null
return if (id >= 0) {
dequeuedInputs++
buffers.getInputBuffer(id) to id
} else {
log.i("buffer() failed. dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs")
null
}
}

override fun enqueueEos(data: ReaderData) {
log.i("enqueueEos()!")
dequeuedInputs--
val flag = BUFFER_FLAG_END_OF_STREAM
codec.queueInputBuffer(data.id, 0, 0, 0, flag)
}

override fun enqueue(data: ReaderData) {
dequeuedInputs--
val (chunk, id) = data
val flag = if (chunk.keyframe) BUFFER_FLAG_SYNC_FRAME else 0
codec.queueInputBuffer(id, chunk.buffer.position(), chunk.buffer.remaining(), chunk.timeUs, flag)
Expand All @@ -65,36 +90,43 @@ internal class Decoder(
override fun drain(): State<DecoderData> {
val result = codec.dequeueOutputBuffer(info, 0)
return when (result) {
INFO_TRY_AGAIN_LATER -> State.Wait
INFO_TRY_AGAIN_LATER -> {
log.i("drain(): got INFO_TRY_AGAIN_LATER, waiting.")
State.Wait
}
INFO_OUTPUT_FORMAT_CHANGED -> {
log.i("drain(): got INFO_OUTPUT_FORMAT_CHANGED, handling format and retrying. format=${codec.outputFormat}")
next.handleRawFormat(codec.outputFormat)
State.Retry
}
INFO_OUTPUT_BUFFERS_CHANGED -> {
log.i("drain(): got INFO_OUTPUT_BUFFERS_CHANGED, retrying.")
buffers.onOutputBuffersChanged()
State.Retry
}
else -> {
val isEos = info.flags and BUFFER_FLAG_END_OF_STREAM != 0
val timeUs = if (isEos) 0 else dropper.output(info.presentationTimeUs)
if (timeUs != null /* && (isEos || info.size > 0) */) {
dequeuedOutputs++
val buffer = buffers.getOutputBuffer(result)
val data = DecoderData(buffer, timeUs) {
codec.releaseOutputBuffer(result, it)
dequeuedOutputs--
}
// log.w("TDBG isEos=$isEos timeUs=$timeUs")
if (isEos) State.Eos(data) else State.Ok(data)
} else {
codec.releaseOutputBuffer(result, false)
State.Wait
}.also {
log.v("drain(): returning $it")
}
}
}.also {
log.v("Returning $it")
}
}

override fun release() {
log.i("release(): releasing codec. dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs")
codec.stop()
codec.release()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import android.media.MediaCodec
import android.media.MediaCodec.*
import android.view.Surface
import com.otaliastudios.transcoder.common.TrackType
import com.otaliastudios.transcoder.common.trackType
import com.otaliastudios.transcoder.internal.Codecs
import com.otaliastudios.transcoder.internal.data.WriterChannel
import com.otaliastudios.transcoder.internal.data.WriterData
Expand All @@ -12,7 +13,9 @@ import com.otaliastudios.transcoder.internal.pipeline.Channel
import com.otaliastudios.transcoder.internal.pipeline.QueuedStep
import com.otaliastudios.transcoder.internal.pipeline.State
import com.otaliastudios.transcoder.internal.utils.Logger
import com.otaliastudios.transcoder.internal.utils.trackMapOf
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
import kotlin.properties.Delegates
import kotlin.properties.Delegates.observable

Expand Down Expand Up @@ -44,13 +47,15 @@ internal class Encoder(
)

companion object {
// Debugging
private val log = Logger("Encoder")
private var dequeuedInputs by observable(0) { _, _, _ -> printDequeued() }
private var dequeuedOutputs by observable(0) { _, _, _ -> printDequeued() }
private fun printDequeued() {
log.v("dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs")
}
private val ID = trackMapOf(AtomicInteger(0), AtomicInteger(0))
}

private val type = if (surface != null) TrackType.VIDEO else TrackType.AUDIO
private val log = Logger("Encoder(${type},${ID[type].getAndIncrement()})")
private var dequeuedInputs by observable(0) { _, _, _ -> printDequeued() }
private var dequeuedOutputs by observable(0) { _, _, _ -> printDequeued() }
private fun printDequeued() {
log.v("dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs")
}

override val channel = this
Expand All @@ -69,22 +74,26 @@ internal class Encoder(

override fun buffer(): Pair<ByteBuffer, Int>? {
val id = codec.dequeueInputBuffer(0)
log.v("buffer(): id=$id")
if (id >= 0) dequeuedInputs++
return if (id >= 0) buffers.getInputBuffer(id) to id else null
return if (id >= 0) {
dequeuedInputs++
buffers.getInputBuffer(id) to id
} else {
log.i("buffer() failed. dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs")
null
}
}

private var eosReceivedButNotEnqueued = false

override fun enqueueEos(data: EncoderData) {
if (!ownsCodecStop) {
eosReceivedButNotEnqueued = true
} else if (surface != null) {
codec.signalEndOfInputStream()
} else {
val flag = BUFFER_FLAG_END_OF_STREAM
if (surface == null) {
if (!ownsCodecStop) eosReceivedButNotEnqueued = true
val flag = if (!ownsCodecStop) 0 else BUFFER_FLAG_END_OF_STREAM
codec.queueInputBuffer(data.id, 0, 0, 0, flag)
dequeuedInputs--
} else {
if (!ownsCodecStop) eosReceivedButNotEnqueued = true
else codec.signalEndOfInputStream()
}
}

Expand All @@ -104,6 +113,7 @@ internal class Encoder(
if (eosReceivedButNotEnqueued) {
// Horrible hack. When we don't own the MediaCodec, we can't enqueue EOS so we
// can't dequeue them. INFO_TRY_AGAIN_LATER is returned. We assume this means EOS.
log.i("Sending fake Eos. dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs")
val buffer = ByteBuffer.allocateDirect(0)
State.Eos(WriterData(buffer, 0L, 0) {})
} else {
Expand Down Expand Up @@ -145,6 +155,7 @@ internal class Encoder(
}

override fun release() {
log.i("release(): ownsStop=$ownsCodecStop dequeuedInputs=${dequeuedInputs} dequeuedOutputs=$dequeuedOutputs")
if (ownsCodecStop) {
codec.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal class Pipeline private constructor(name: String, private val chain: Lis
log.v("execute(): step ${step.name} (#$index/${chain.size}) is waiting. headState=$headState headIndex=$headIndex")
return State.Wait
}
log.v("execute(): executed ${step.name} (#$index/${chain.size}). result=$state")
// log.v("execute(): executed ${step.name} (#$index/${chain.size}). result=$state")
if (state is State.Eos) {
log.i("execute(): EOS from ${step.name} (#$index/${chain.size}).")
headState = state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.io.IOException;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;

import static android.media.MediaMetadataRetriever.METADATA_KEY_DURATION;
import static android.media.MediaMetadataRetriever.METADATA_KEY_LOCATION;
Expand All @@ -26,7 +27,8 @@
*/
public abstract class DefaultDataSource implements DataSource {

private final Logger LOG = new Logger("DefaultDataSource(" + this.hashCode() + ")");
private final static AtomicInteger ID = new AtomicInteger(0);
private final Logger LOG = new Logger("DefaultDataSource(" + ID.getAndIncrement() + ")");

private final MutableTrackMap<MediaFormat> mFormat = mutableTrackMapOf(null);
private final MutableTrackMap<Integer> mIndex = mutableTrackMapOf(null);
Expand Down Expand Up @@ -269,7 +271,7 @@ public int getOrientation() {

@Override
public long getDurationUs() {
LOG.v("getDurationUs()");
// LOG.v("getDurationUs()");
try {
return Long.parseLong(mMetadata.extractMetadata(METADATA_KEY_DURATION)) * 1000;
} catch (NumberFormatException e) {
Expand Down

0 comments on commit f489ae9

Please sign in to comment.