Skip to content

Commit 630c5ed

Browse files
authored
Merge pull request #5162 from ant-media/add_hls_upload_servlet
Upload HLS files to S3 on the fly
2 parents 04902a8 + 16f7c6d commit 630c5ed

File tree

14 files changed

+554
-82
lines changed

14 files changed

+554
-82
lines changed

pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,18 @@
600600
<version>${mockito-core.version}</version>
601601
<scope>test</scope>
602602
</dependency>
603+
<dependency>
604+
<groupId>org.powermock</groupId>
605+
<artifactId>powermock-module-junit4</artifactId>
606+
<version>2.0.9</version>
607+
<scope>test</scope>
608+
</dependency>
609+
<dependency>
610+
<groupId>org.powermock</groupId>
611+
<artifactId>powermock-api-mockito2</artifactId>
612+
<version>2.0.9</version>
613+
<scope>test</scope>
614+
</dependency>
603615
<dependency>
604616
<groupId>javax.websocket</groupId>
605617
<artifactId>javax.websocket-api</artifactId>

src/main/java/io/antmedia/AntMediaApplicationAdapter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1606,6 +1606,7 @@ public void setStorageclientSettings(AppSettings settings) {
16061606
storageClient.setEnabled(settings.isS3RecordingEnabled());
16071607
storageClient.setPermission(settings.getS3Permission());
16081608
storageClient.setStorageClass(settings.getS3StorageClass());
1609+
storageClient.setCacheControl(settings.getS3CacheControl());
16091610
storageClient.reset();
16101611
}
16111612

src/main/java/io/antmedia/AppSettings.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ public class AppSettings implements Serializable{
308308
public static final String SETTINGS_S3_BUCKET_NAME = "settings.s3BucketName";
309309
public static final String SETTINGS_S3_ENDPOINT = "settings.s3Endpoint";
310310
public static final String SETTINGS_S3_PERMISSION = "settings.s3Permission";
311+
public static final String SETTINGS_S3_CACHE_CONTROL = "settings.s3CacheControl";
311312
public static final String SETTINGS_ENABLE_TIME_TOKEN_PLAY = "settings.enableTimeTokenForPlay";
312313
public static final String SETTINGS_ENABLE_TIME_TOKEN_PUBLISH = "settings.enableTimeTokenForPublish";
313314

@@ -1421,6 +1422,12 @@ public class AppSettings implements Serializable{
14211422
@Value( "${"+SETTINGS_S3_ENDPOINT+":#{null}}" )
14221423
private String s3Endpoint;
14231424

1425+
/**
1426+
* S3 Cache Control Metadata
1427+
*/
1428+
@Value( "${"+SETTINGS_S3_CACHE_CONTROL+":no-store, no-cache, must-revalidate, max-age=0}" )
1429+
private String s3CacheControl = "no-store, no-cache, must-revalidate, max-age=0";
1430+
14241431
/*
14251432
* The permission to use in uploading the files to the S3.
14261433
* Following values are accepted. Default value is public-read
@@ -1607,7 +1614,7 @@ public boolean isWriteStatsToDatastore() {
16071614
*
16081615
* In initialization no matter if spring or field definition is effective, the important thing is that having some random value
16091616
*/
1610-
@Value( "${"+SETTINGS_CLUSTER_COMMUNICATION_KEY+ ":+ #{ T(org.apache.commons.lang3.RandomStringUtils).randomAlphanumeric(32)}" )
1617+
@Value( "${"+SETTINGS_CLUSTER_COMMUNICATION_KEY+ ":#{ T(org.apache.commons.lang3.RandomStringUtils).randomAlphanumeric(32)}" )
16111618
private String clusterCommunicationKey = RandomStringUtils.randomAlphanumeric(32);
16121619

16131620
public void setWriteStatsToDatastore(boolean writeStatsToDatastore) {
@@ -2830,6 +2837,14 @@ public void setS3Endpoint(String s3Endpoint) {
28302837
this.s3Endpoint = s3Endpoint;
28312838
}
28322839

2840+
public String getS3CacheControl() {
2841+
return s3CacheControl;
2842+
}
2843+
2844+
public void setS3CacheControl(String s3CacheControl) {
2845+
this.s3CacheControl = s3CacheControl;
2846+
}
2847+
28332848
public void setDashHttpEndpoint(String dashHttpEndpoint) {
28342849
this.dashHttpEndpoint = dashHttpEndpoint;
28352850
}

src/main/java/io/antmedia/muxer/HLSMuxer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,9 @@ public void init(IScope scope, String name, int resolutionHeight, String subFold
158158
@Override
159159
public String getOutputURL()
160160
{
161-
if (httpEndpoint != null)
161+
if (httpEndpoint != null )
162162
{
163-
return httpEndpoint + File.separator + streamId + extension;
163+
return httpEndpoint + File.separator + initialResourceNameWithoutExtension + extension;
164164
}
165165
return super.getOutputURL();
166166
}

src/main/java/io/antmedia/muxer/MuxAdaptor.java

Lines changed: 44 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -252,10 +252,10 @@ public PacketTime(long packetTimeMs, long systemTimeMs) {
252252
TIME_BASE_FOR_MS.num(1);
253253
TIME_BASE_FOR_MS.den(1000);
254254
}
255-
255+
256256
private AVRational videoTimeBase = TIME_BASE_FOR_MS;
257257
private AVRational audioTimeBase = TIME_BASE_FOR_MS;
258-
258+
259259
//NOSONAR because we need to keep the reference of the field
260260
private AVChannelLayout channelLayout;
261261

@@ -301,7 +301,7 @@ protected MuxAdaptor(ClientBroadcastStream clientBroadcastStream) {
301301

302302
this.broadcastStream = clientBroadcastStream;
303303
}
304-
304+
305305
public boolean addMuxer(Muxer muxer) {
306306
return addMuxer(muxer, 0);
307307
}
@@ -512,7 +512,7 @@ else if (broadcast.getWebMEnabled() == RECORDING_ENABLED_FOR_STREAM)
512512
}
513513

514514
}
515-
515+
516516
public static void setUpEndPoints(MuxAdaptor muxAdaptor, Broadcast broadcast, Vertx vertx)
517517
{
518518
if (broadcast != null) {
@@ -536,17 +536,17 @@ public AVCodecParameters getAudioCodecParameters() {
536536
if (audioDataConf != null && audioCodecParameters == null)
537537
{
538538
AACConfigParser aacParser = new AACConfigParser(audioDataConf, 0);
539-
539+
540540
if (!aacParser.isErrorOccured())
541541
{
542542
audioCodecParameters = new AVCodecParameters();
543543
audioCodecParameters.sample_rate(aacParser.getSampleRate());
544-
544+
545545
channelLayout = new AVChannelLayout();
546546
av_channel_layout_default(channelLayout, aacParser.getChannelCount());
547-
547+
548548
audioCodecParameters.ch_layout(channelLayout);
549-
549+
550550
audioCodecParameters.codec_id(AV_CODEC_ID_AAC);
551551
audioCodecParameters.codec_type(AVMEDIA_TYPE_AUDIO);
552552

@@ -624,7 +624,7 @@ public boolean prepare() throws Exception {
624624
videoStreamIndex = streamIndex;
625625
streamIndex++;
626626
}
627-
627+
628628

629629
AVCodecParameters parameters = getAudioCodecParameters();
630630
if (parameters != null) {
@@ -691,7 +691,7 @@ public boolean prepareFromInputFormatContext(AVFormatContext inputFormatContext)
691691
AVStream stream = inputFormatContext.streams(i);
692692
AVCodecParameters codecpar = stream.codecpar();
693693
if (codecpar.codec_type() == AVMEDIA_TYPE_VIDEO && !isBlacklistCodec(codecpar.codec_id())) {
694-
694+
695695
videoTimeBase = inputFormatContext.streams(i).time_base();
696696
logger.info("Video format codec Id: {} width:{} height:{} for stream: {} source index:{} target index:{}", codecpar.codec_id(), codecpar.width(), codecpar.height(), streamId, i, streamIndex);
697697
width = codecpar.width();
@@ -837,12 +837,12 @@ public void prepareMuxerIO()
837837
*/
838838
public void changeStreamQualityParameters(String streamId, String quality, double speed, int inputQueueSize) {
839839
long now = System.currentTimeMillis();
840-
840+
841841
//increase updating time to 5 seconds because it may cause some issues in mongodb updates and no need to update every 5 seconds
842842
if ((now - lastQualityUpdateTime) > 5000 &&
843843
((quality != null && !quality.equals(oldQuality)) || oldspeed == 0 || Math.abs(speed - oldspeed) > 0.05))
844844
{
845-
845+
846846
logger.info("Stream queue size:{} for streamId:{} ", inputQueueSize, streamId);
847847
lastQualityUpdateTime = now;
848848
getStreamHandler().setQualityParameters(streamId, quality, speed, inputQueueSize);
@@ -1003,17 +1003,17 @@ public void execute()
10031003
return;
10041004

10051005
}
1006-
1006+
10071007
IStreamCodecInfo codecInfo = broadcastStream.getCodecInfo();
10081008
enableVideo = codecInfo.hasVideo();
10091009
enableAudio = codecInfo.hasAudio();
1010-
1010+
10111011
getVideoDataConf(codecInfo);
10121012
getAudioDataConf(codecInfo);
1013-
1013+
10141014
// Sometimes AAC Sequenece Header is received later
10151015
// so that we check if we get the audio codec parameters correctly
1016-
1016+
10171017
if (enableVideo && enableAudio && getAudioCodecParameters() != null)
10181018
{
10191019
logger.info("Video and audio is enabled in stream:{} queue size: {}", streamId, queueSize.get());
@@ -1236,8 +1236,8 @@ public void updateQualityParameters(long pts, AVRational timebase) {
12361236
}
12371237
}
12381238
changeStreamQualityParameters(this.streamId, null, speed, getInputQueueSize());
1239-
1240-
1239+
1240+
12411241
}
12421242

12431243
public void closeRtmpConnection() {
@@ -1319,9 +1319,6 @@ public synchronized void closeResources() {
13191319

13201320
writeTrailer();
13211321

1322-
isRecording.set(false);
1323-
1324-
13251322
if (videoExtraDataPointer != null) {
13261323
av_free(videoExtraDataPointer.position(0));
13271324
videoExtraDataPointer.close();
@@ -1336,9 +1333,11 @@ public synchronized void closeResources() {
13361333

13371334
changeStreamQualityParameters(this.streamId, null, 0, getInputQueueSize());
13381335
getStreamHandler().muxAdaptorRemoved(this);
1336+
1337+
isRecording.set(false);
13391338
}
1340-
1341-
1339+
1340+
13421341
/**
13431342
* This method means that if the MuxAdaptor writes
13441343
* incoming packets to muxers({@link MuxAdaptor#muxerList}) directly without any StreamAdaptor/Encoders
@@ -1423,7 +1422,7 @@ public void stop(boolean shutdownCompletely) {
14231422
}
14241423

14251424
public int getInputQueueSize() {
1426-
return queueSize .get();
1425+
return queueSize.get();
14271426
}
14281427

14291428
public boolean isStopRequestExist() {
@@ -1444,7 +1443,7 @@ public void writeBufferedPacket()
14441443
while(!bufferQueue.isEmpty())
14451444
{
14461445
IStreamPacket tempPacket = peekTheNextPacketFromBuffer();
1447-
1446+
14481447
long now = System.currentTimeMillis();
14491448
long pktTimeDifferenceMs = tempPacket.getTimestamp() - firstPacketReadyToSentTimeMs;
14501449
long passedTime = now - bufferingFinishTimeMs;
@@ -1493,7 +1492,7 @@ public IStreamPacket peekTheNextPacketFromBuffer() {
14931492
}
14941493
}
14951494
}
1496-
1495+
14971496
return tempPacket;
14981497
}
14991498

@@ -1521,6 +1520,17 @@ public void packetReceived(IBroadcastStream stream, IStreamPacket packet)
15211520
logger.info("first received frame timestamp: {} for stream:{} ", lastFrameTimestamp, streamId);
15221521
firstReceivedFrameTimestamp = lastFrameTimestamp;
15231522
}
1523+
if (stopRequestExist)
1524+
{
1525+
//there may be a bug that red5 cannot stop the stream ClientBroadcastStream and it may fill the memory
1526+
logger.warn("Stop request exist and dropping incoming packet for stream:{}", streamId);
1527+
//try to close the connection again
1528+
closeRtmpConnection();
1529+
return;
1530+
}
1531+
1532+
1533+
15241534
queueSize.incrementAndGet();
15251535

15261536
CachedEvent event = new CachedEvent();
@@ -1530,6 +1540,7 @@ public void packetReceived(IBroadcastStream stream, IStreamPacket packet)
15301540
event.setTimestamp(packet.getTimestamp());
15311541

15321542
streamPacketQueue.add(event);
1543+
15331544
}
15341545

15351546
@Override
@@ -1716,7 +1727,7 @@ public RecordMuxer startRecording(RecordType recordType, int resolutionHeight) {
17161727
if(recordType == RecordType.MP4) {
17171728
Mp4Muxer mp4Muxer = createMp4Muxer();
17181729
muxer = mp4Muxer;
1719-
1730+
17201731
addMuxer(muxer, resolutionHeight);
17211732
}
17221733
else if(recordType == RecordType.WEBM) {
@@ -1732,7 +1743,7 @@ else if(recordType == RecordType.WEBM) {
17321743
public boolean prepareMuxer(Muxer muxer, int resolutionHeight)
17331744
{
17341745
boolean streamAdded = false;
1735-
1746+
17361747
muxer.init(scope, streamId, resolutionHeight, broadcast != null ? broadcast.getSubFolder(): null, 0);
17371748
logger.info("prepareMuxer for stream:{} muxer:{}", streamId, muxer.getClass().getSimpleName());
17381749

@@ -1774,7 +1785,7 @@ public boolean prepareMuxer(Muxer muxer, int resolutionHeight)
17741785
if (streamAdded)
17751786
{
17761787
prepared = muxer.prepareIO();
1777-
1788+
17781789
if (prepared)
17791790
{
17801791
prepared = addMuxerInternal(muxer);
@@ -1784,7 +1795,7 @@ public boolean prepareMuxer(Muxer muxer, int resolutionHeight)
17841795
{
17851796
logger.warn("Muxer:{} cannot be prepared for streamId:{}", muxer.getClass().getSimpleName(), streamId);
17861797
}
1787-
1798+
17881799
//TODO: Check to release the resources if it's not already released
17891800
}
17901801

@@ -1893,7 +1904,7 @@ public void endpointStatusHealthCheck(String url)
18931904
healthCheckPeriodMS = appSettings.getEndpointHealthCheckPeriodMs();
18941905
vertx.setPeriodic(healthCheckPeriodMS, id ->
18951906
{
1896-
1907+
18971908
String status = statusMap.getValueOrDefault(url, null);
18981909
logger.info("Checking the endpoint health for: {} and status: {} ", url, status);
18991910
//Broadcast might get deleted in the process of checking
@@ -2202,7 +2213,7 @@ public AVRational getVideoTimeBase() {
22022213
public AVRational getAudioTimeBase() {
22032214
return audioTimeBase;
22042215
}
2205-
2216+
22062217
public void setVideoTimeBase(AVRational videoTimeBase) {
22072218
this.videoTimeBase = videoTimeBase;
22082219
}
@@ -2237,7 +2248,7 @@ public void setIsRecording(boolean isRecording) {
22372248
public void setAudioDataConf(byte[] audioDataConf) {
22382249
this.audioDataConf = audioDataConf;
22392250
}
2240-
2251+
22412252
public boolean isBlacklistCodec(int codecId) {
22422253
return (codecId == AV_CODEC_ID_PNG);
22432254
}

0 commit comments

Comments
 (0)