Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions common/src/main/java/com/pedro/common/base/BaseSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.pedro.common.BitrateManager
import com.pedro.common.ConnectChecker
import com.pedro.common.StreamBlockingQueue
import com.pedro.common.frame.MediaFrame
import com.pedro.common.trySend
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand All @@ -14,6 +15,7 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.nio.ByteBuffer
import java.util.concurrent.LinkedBlockingQueue

abstract class BaseSender(
protected val connectChecker: ConnectChecker,
Expand All @@ -24,7 +26,7 @@ abstract class BaseSender(
protected var running = false
private var cacheSize = 400
@Volatile
protected var queue = StreamBlockingQueue(cacheSize)
protected var queue = LinkedBlockingQueue<MediaFrame>(cacheSize)
protected var audioFramesSent: Long = 0
protected var videoFramesSent: Long = 0
var droppedAudioFrames: Long = 0
Expand Down Expand Up @@ -94,24 +96,24 @@ abstract class BaseSender(
@Throws(IllegalArgumentException::class)
fun hasCongestion(percentUsed: Float = 20f): Boolean {
if (percentUsed < 0 || percentUsed > 100) throw IllegalArgumentException("the value must be in range 0 to 100")
val size = queue.getSize().toFloat()
val size = queue.size.toFloat()
val remaining = queue.remainingCapacity().toFloat()
val capacity = size + remaining
return size >= capacity * (percentUsed / 100f)
}

fun resizeCache(newSize: Int) {
if (newSize < queue.getSize() - queue.remainingCapacity()) {
if (newSize < queue.size - queue.remainingCapacity()) {
throw RuntimeException("Can't fit current cache inside new cache size")
}
val tempQueue = StreamBlockingQueue(newSize)
val tempQueue = LinkedBlockingQueue<MediaFrame>(newSize)
queue.drainTo(tempQueue)
queue = tempQueue
}

fun getCacheSize(): Int = cacheSize

fun getItemsInCache(): Int = queue.getSize()
fun getItemsInCache(): Int = queue.size

fun clearCache() {
queue.clear()
Expand Down Expand Up @@ -148,7 +150,7 @@ abstract class BaseSender(
fun getBitrateExponentialFactor() = bitrateManager.exponentialFactor

fun setDelay(delay: Long) {
queue.setCacheTime(delay)
// queue.setCacheTime(delay)
}

fun resetBytesSend() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.ByteWriteChannel
import io.ktor.utils.io.close
import io.ktor.utils.io.readFully
import io.ktor.utils.io.readUTF8Line
import io.ktor.utils.io.writeByte
import io.ktor.utils.io.writeFully
import kotlinx.coroutines.Dispatchers
import java.net.InetAddress
Expand All @@ -37,7 +37,10 @@ abstract class TcpStreamSocketKtorBase(
}

override suspend fun close() {
runCatching { output?.flushAndClose() }
runCatching {
output?.flush()
output?.close()
}
runCatching {
address = null
input = null
Expand All @@ -52,7 +55,7 @@ abstract class TcpStreamSocketKtorBase(
}

override suspend fun write(bytes: ByteArray, offset: Int, size: Int) {
output?.writeFully(bytes, offset, offset + size)
output?.writeFully(bytes, offset, size)
}

override suspend fun write(b: Int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import io.ktor.network.sockets.Datagram
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.isClosed
import io.ktor.utils.io.core.remaining
import io.ktor.utils.io.core.ByteReadPacket
import io.ktor.utils.io.core.readBytes
import kotlinx.coroutines.Dispatchers
import kotlinx.io.Buffer
import kotlinx.io.readByteArray
import java.net.ConnectException
import java.net.InetAddress

Expand Down Expand Up @@ -51,15 +50,15 @@ class UdpStreamSocketKtor(
}

override suspend fun write(bytes: ByteArray) {
val datagram = Datagram(Buffer().apply { write(bytes, 0, bytes.size) }, address)
val datagram = Datagram(ByteReadPacket(bytes), address)
socket?.send(datagram)
}

override suspend fun read(): ByteArray {
val socket = socket ?: throw ConnectException("Read with socket closed, broken pipe")
val packet = socket.receive().packet
val length = packet.remaining.toInt()
return packet.readByteArray().sliceArray(0 until length)
return packet.readBytes().sliceArray(0 until length)
}

override fun isConnected(): Boolean = socket?.isClosed != true
Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ dokka = "2.0.0"
appcompat = "1.6.1"
#noinspection GradleDependency, version 2.2.0 need min sdk 21
constraintlayout = "2.1.4"
ktor = "3.3.2"
ktor = "2.3.13"
camerax = "1.5.1"
multidex = "2.0.1"
annotation = "1.9.1"
Expand Down