Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/java/io/antmedia/AntMediaApplicationAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@

public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter implements IAntMediaStreamHandler, IShutdownListener {

public static final String NOT_ASSIGNED = "NOT_ASSIGNED";

/**
* Timeout value that stream is considered as finished or stuck
*/
Expand Down Expand Up @@ -682,6 +684,9 @@ public void closeBroadcast(String streamId) {
BroadcastUpdate broadcastUpdate = new BroadcastUpdate();
broadcastUpdate.setUpdateTime(System.currentTimeMillis());
broadcastUpdate.setStatus(AntMediaApplicationAdapter.BROADCAST_STATUS_FINISHED);
if(serverShuttingDown) {
broadcastUpdate.setOriginAdress(NOT_ASSIGNED);
}
getDataStore().updateBroadcastFields(streamId, broadcastUpdate);
// This is resets Viewer map in HLS Viewer Stats
resetHLSStats(streamId);
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/antmedia/datastore/db/DataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1758,6 +1758,14 @@ else if (streamId.equals(event.getStreamId())) {

}

/**
* Query to get Broadcasts for a given origin(host) address
* @param hostAddress
* @return Broadcast List
*/
public abstract List<Broadcast> getBroadcastListByHost(String hostAddress);


/**
* Setter for appSettings
* @param appSettings
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/io/antmedia/datastore/db/InMemoryDataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1175,4 +1175,17 @@ public boolean hasSubtracks(String streamId) {
}
return false;
}

@Override
public List<Broadcast> getBroadcastListByHost(String hostAddress) {
List<Broadcast> broadcastList = new ArrayList<>();
for (Broadcast broadcast : broadcastMap.values())
{
if (hostAddress.equals(broadcast.getOriginAdress()))
{
broadcastList.add(broadcast);
}
}
return broadcastList;
}
}
23 changes: 23 additions & 0 deletions src/main/java/io/antmedia/datastore/db/MapBasedDataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1611,4 +1611,27 @@ public boolean hasSubtracks(String streamId) {

return false;
}

@Override
public List<Broadcast> getBroadcastListByHost(String hostAddress) {
long startTime = System.nanoTime();

List<Broadcast> broadcastList = new ArrayList<>();
synchronized (this) {
for (String broadcastString : map.values())
{
Broadcast broadcast = gson.fromJson(broadcastString, Broadcast.class);
if (hostAddress.equals(broadcast.getOriginAdress()))
{
broadcastList.add(broadcast);
}
}
}

long elapsedNanos = System.nanoTime() - startTime;
addQueryTime(elapsedNanos);
showWarningIfElapsedTimeIsMoreThanThreshold(elapsedNanos, "getBroadcastListByHost(String hostAddress)");

return broadcastList;
}
}
21 changes: 21 additions & 0 deletions src/main/java/io/antmedia/datastore/db/MongoStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,10 @@ public boolean updateBroadcastFields(String streamId, BroadcastUpdate broadcast)
if (broadcast.getQuality() != null) {
updates.add(set("quality", broadcast.getQuality()));
}

if (broadcast.getOriginAdress() != null) {
updates.add(set(ORIGIN_ADDRESS, broadcast.getOriginAdress()));
}


prepareFields(broadcast, updates);
Expand Down Expand Up @@ -2398,5 +2402,22 @@ public CaffeineCache getSubscriberCache() {

return subscriberCache;
}

@Override
public List<Broadcast> getBroadcastListByHost(String hostAddress) {
long startTime = System.nanoTime();

List<Broadcast> broadcastList = new ArrayList<>();
synchronized(this) {
broadcastList = datastore.find(Broadcast.class).filter(Filters.eq(ORIGIN_ADDRESS, hostAddress))
.iterator().toList();
}

long elapsedNanos = System.nanoTime() - startTime;
addQueryTime(elapsedNanos);
showWarningIfElapsedTimeIsMoreThanThreshold(elapsedNanos, "getActiveSubtracks");

return broadcastList;
}

}
42 changes: 42 additions & 0 deletions src/test/java/io/antmedia/test/db/DBStoresUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public void testMapDBStore() throws Exception {
testGetSubtracksWithStatus(dataStore);
testGetSubtracksWithOrdering(dataStore);
testGetSubtracksWithSearch(dataStore);
testGetBroadcastByHost(dataStore);


dataStore.close(false);
Expand Down Expand Up @@ -293,6 +294,8 @@ public void testMemoryDataStore() throws Exception {
testGetSubtracksWithStatus(dataStore);
testGetSubtracksWithOrdering(dataStore);
testGetSubtracksWithSearch(dataStore);
testGetBroadcastByHost(dataStore);



dataStore.close(false);
Expand Down Expand Up @@ -373,6 +376,8 @@ public void testMongoStore() throws Exception {

testGetSubtracksWithOrdering(dataStore);
testGetSubtracksWithSearch(dataStore);
testGetBroadcastByHost(dataStore);


dataStore.close(true);

Expand Down Expand Up @@ -3956,6 +3961,43 @@ public void testSubscriberCache(DataStore dataStore) {
assertNull(subscriberFromCache);

}

public void testGetBroadcastByHost(DataStore dataStore) {
clear(dataStore);

assertEquals(0, dataStore.getBroadcastCount());

Broadcast broadcast1 = new Broadcast(null, null);
try {
broadcast1.setStreamId("broadcast1");
} catch (Exception e) {
e.printStackTrace();
}
broadcast1.setOriginAdress("origin1");
dataStore.save(broadcast1);


Broadcast broadcast2 = new Broadcast(null, null);
try {
broadcast2.setStreamId("broadcast2");
} catch (Exception e) {
e.printStackTrace();
}
broadcast2.setOriginAdress("origin2");
dataStore.save(broadcast2);

List<Broadcast> list = dataStore.getBroadcastListByHost("origin1");

assertEquals(1, list.size());
assertEquals("broadcast1", list.get(0).getStreamId());

list = dataStore.getBroadcastListByHost("origin2");

assertEquals(1, list.size());
assertEquals("broadcast2", list.get(0).getStreamId());


}


}
Loading