diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index 508441a16..70d30cac7 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -118,6 +118,8 @@ public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter implements IAntMediaStreamHandler, IShutdownListener { + public static final String NOT_ASSIGNED = "NOT_ASSIGNED"; + /** * Timeout value that stream is considered as finished or stuck */ @@ -682,6 +684,9 @@ public void closeBroadcast(String streamId) { BroadcastUpdate broadcastUpdate = new BroadcastUpdate(); broadcastUpdate.setUpdateTime(System.currentTimeMillis()); broadcastUpdate.setStatus(AntMediaApplicationAdapter.BROADCAST_STATUS_FINISHED); + if(serverShuttingDown) { + broadcastUpdate.setOriginAdress(NOT_ASSIGNED); + } getDataStore().updateBroadcastFields(streamId, broadcastUpdate); // This is resets Viewer map in HLS Viewer Stats resetHLSStats(streamId); diff --git a/src/main/java/io/antmedia/datastore/db/DataStore.java b/src/main/java/io/antmedia/datastore/db/DataStore.java index eb7f9753c..2be710177 100644 --- a/src/main/java/io/antmedia/datastore/db/DataStore.java +++ b/src/main/java/io/antmedia/datastore/db/DataStore.java @@ -1758,6 +1758,14 @@ else if (streamId.equals(event.getStreamId())) { } + /** + * Query to get Broadcasts for a given origin(host) address + * @param hostAddress + * @return Broadcast List + */ + public abstract List getBroadcastListByHost(String hostAddress); + + /** * Setter for appSettings * @param appSettings diff --git a/src/main/java/io/antmedia/datastore/db/InMemoryDataStore.java b/src/main/java/io/antmedia/datastore/db/InMemoryDataStore.java index 3653c8e14..728e5268f 100644 --- a/src/main/java/io/antmedia/datastore/db/InMemoryDataStore.java +++ b/src/main/java/io/antmedia/datastore/db/InMemoryDataStore.java @@ -1175,4 +1175,17 @@ public boolean hasSubtracks(String streamId) { } return false; } + + @Override + public List getBroadcastListByHost(String hostAddress) { + List broadcastList = new ArrayList<>(); + for (Broadcast broadcast : broadcastMap.values()) + { + if (hostAddress.equals(broadcast.getOriginAdress())) + { + broadcastList.add(broadcast); + } + } + return broadcastList; + } } \ No newline at end of file diff --git a/src/main/java/io/antmedia/datastore/db/MapBasedDataStore.java b/src/main/java/io/antmedia/datastore/db/MapBasedDataStore.java index 97d1ad832..f968ac207 100644 --- a/src/main/java/io/antmedia/datastore/db/MapBasedDataStore.java +++ b/src/main/java/io/antmedia/datastore/db/MapBasedDataStore.java @@ -1611,4 +1611,27 @@ public boolean hasSubtracks(String streamId) { return false; } + + @Override + public List getBroadcastListByHost(String hostAddress) { + long startTime = System.nanoTime(); + + List broadcastList = new ArrayList<>(); + synchronized (this) { + for (String broadcastString : map.values()) + { + Broadcast broadcast = gson.fromJson(broadcastString, Broadcast.class); + if (hostAddress.equals(broadcast.getOriginAdress())) + { + broadcastList.add(broadcast); + } + } + } + + long elapsedNanos = System.nanoTime() - startTime; + addQueryTime(elapsedNanos); + showWarningIfElapsedTimeIsMoreThanThreshold(elapsedNanos, "getBroadcastListByHost(String hostAddress)"); + + return broadcastList; + } } diff --git a/src/main/java/io/antmedia/datastore/db/MongoStore.java b/src/main/java/io/antmedia/datastore/db/MongoStore.java index df7e9b7e8..6d1cb1bf7 100644 --- a/src/main/java/io/antmedia/datastore/db/MongoStore.java +++ b/src/main/java/io/antmedia/datastore/db/MongoStore.java @@ -1173,6 +1173,10 @@ public boolean updateBroadcastFields(String streamId, BroadcastUpdate broadcast) if (broadcast.getQuality() != null) { updates.add(set("quality", broadcast.getQuality())); } + + if (broadcast.getOriginAdress() != null) { + updates.add(set(ORIGIN_ADDRESS, broadcast.getOriginAdress())); + } prepareFields(broadcast, updates); @@ -2398,5 +2402,22 @@ public CaffeineCache getSubscriberCache() { return subscriberCache; } + + @Override + public List getBroadcastListByHost(String hostAddress) { + long startTime = System.nanoTime(); + + List broadcastList = new ArrayList<>(); + synchronized(this) { + broadcastList = datastore.find(Broadcast.class).filter(Filters.eq(ORIGIN_ADDRESS, hostAddress)) + .iterator().toList(); + } + + long elapsedNanos = System.nanoTime() - startTime; + addQueryTime(elapsedNanos); + showWarningIfElapsedTimeIsMoreThanThreshold(elapsedNanos, "getActiveSubtracks"); + + return broadcastList; + } } diff --git a/src/test/java/io/antmedia/test/db/DBStoresUnitTest.java b/src/test/java/io/antmedia/test/db/DBStoresUnitTest.java index 04c9ee55d..6ffa54d3f 100644 --- a/src/test/java/io/antmedia/test/db/DBStoresUnitTest.java +++ b/src/test/java/io/antmedia/test/db/DBStoresUnitTest.java @@ -197,6 +197,7 @@ public void testMapDBStore() throws Exception { testGetSubtracksWithStatus(dataStore); testGetSubtracksWithOrdering(dataStore); testGetSubtracksWithSearch(dataStore); + testGetBroadcastByHost(dataStore); dataStore.close(false); @@ -293,6 +294,8 @@ public void testMemoryDataStore() throws Exception { testGetSubtracksWithStatus(dataStore); testGetSubtracksWithOrdering(dataStore); testGetSubtracksWithSearch(dataStore); + testGetBroadcastByHost(dataStore); + dataStore.close(false); @@ -373,6 +376,8 @@ public void testMongoStore() throws Exception { testGetSubtracksWithOrdering(dataStore); testGetSubtracksWithSearch(dataStore); + testGetBroadcastByHost(dataStore); + dataStore.close(true); @@ -3956,6 +3961,43 @@ public void testSubscriberCache(DataStore dataStore) { assertNull(subscriberFromCache); } + + public void testGetBroadcastByHost(DataStore dataStore) { + clear(dataStore); + + assertEquals(0, dataStore.getBroadcastCount()); + + Broadcast broadcast1 = new Broadcast(null, null); + try { + broadcast1.setStreamId("broadcast1"); + } catch (Exception e) { + e.printStackTrace(); + } + broadcast1.setOriginAdress("origin1"); + dataStore.save(broadcast1); + + + Broadcast broadcast2 = new Broadcast(null, null); + try { + broadcast2.setStreamId("broadcast2"); + } catch (Exception e) { + e.printStackTrace(); + } + broadcast2.setOriginAdress("origin2"); + dataStore.save(broadcast2); + + List list = dataStore.getBroadcastListByHost("origin1"); + + assertEquals(1, list.size()); + assertEquals("broadcast1", list.get(0).getStreamId()); + + list = dataStore.getBroadcastListByHost("origin2"); + + assertEquals(1, list.size()); + assertEquals("broadcast2", list.get(0).getStreamId()); + + + } }