Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/com/videoprocessor/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
import com.videoprocessor.storage.VideoStorageScanner;
import com.videoprocessor.processor.FFmpegExecutor;
import com.videoprocessor.resource.FFmpegResourceManager;
// Metrics
import com.videoprocessor.telemetry.MetricsPublisher;

public class Main {

public static void main(String[] args)
throws Exception {


JobQueue jobQueue = new JobQueue();

MetricsTracker metricsTracker =
Expand All @@ -32,6 +35,15 @@ public static void main(String[] args)
FFmpegExecutor ffmpegExecutor =
new FFmpegExecutor(resourceManager);

// Metrics Section
MetricsPublisher metricsPublisher =
new MetricsPublisher(
metricsTracker,
jobQueue,
resourceManager
);
// Metrics end

WorkerManager manager =
new WorkerManager(
metricsTracker,
Expand All @@ -40,6 +52,8 @@ public static void main(String[] args)
ffmpegExecutor
);

// Initializing metrics
metricsPublisher.start();
manager.start(jobQueue);

int totalJobs = 50000;
Expand All @@ -66,7 +80,22 @@ public static void main(String[] args)
jobQueue.submitJob(job);
}

Thread.sleep(5000);
while (true) {

boolean queueEmpty =
jobQueue.size() == 0;

boolean noActiveJobs =
metricsTracker
.getActiveJobs() == 0;

if (queueEmpty && noActiveJobs) {

break;
}

Thread.sleep(1000);
}

manager.shutdown();

Comment on lines +97 to 101
Expand Down
15 changes: 15 additions & 0 deletions src/com/videoprocessor/MonitorApplication.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.videoprocessor;

import com.videoprocessor.monitor
.MetricsServer;

public class MonitorApplication {

public static void main(String[] args) {

MetricsServer server =
new MetricsServer();

server.start();
}
}
38 changes: 38 additions & 0 deletions src/com/videoprocessor/metrics/MetricsTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ public class MetricsTracker {
private final AtomicInteger processedJobs =
new AtomicInteger(0);

private final AtomicInteger failedJobs =
new AtomicInteger(0);

private final AtomicInteger activeJobs =
new AtomicInteger(0);

public void incrementProcessedJobs() {

int updatedValue =
Expand All @@ -18,7 +24,39 @@ public void incrementProcessedJobs() {
);
}

public void incrementFailedJobs() {

int updatedValue =
failedJobs.incrementAndGet();

System.out.println(
"[METRICS] Failed Jobs: "
+ updatedValue
);
}

public int getProcessedJobs() {

return processedJobs.get();
}

public int getFailedJobs() {

return failedJobs.get();
}

public void incrementActiveJobs() {

activeJobs.incrementAndGet();
}

public void decrementActiveJobs() {

activeJobs.decrementAndGet();
}

public int getActiveJobs() {

return activeJobs.get();
}
}
4 changes: 4 additions & 0 deletions src/com/videoprocessor/monitor/DashboardRenderer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.videoprocessor.monitor;

public class DashboardRenderer {
}
63 changes: 63 additions & 0 deletions src/com/videoprocessor/monitor/MetricsServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.videoprocessor.monitor;

import java.io.ObjectInputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class MetricsServer {

private static final int PORT = 8081;

public void start() {

System.out.println(
"[MONITOR] Starting metrics server..."
);

try (ServerSocket serverSocket =
new ServerSocket(PORT)) {

Comment on lines +17 to +19
while (true) {

Socket socket =
serverSocket.accept();

Thread.ofVirtual().start(() -> {

handleClient(socket);
});
}

} catch (Exception e) {

System.out.println(
"[MONITOR ERROR] "
+ e.getMessage()
);
}
}

private void handleClient(
Socket socket
) {

try (ObjectInputStream input =
new ObjectInputStream(
socket.getInputStream()
)) {

MetricsSnapshot snapshot =
(MetricsSnapshot)
input.readObject();

Comment on lines +44 to +52
System.out.println(snapshot);

} catch (Exception e) {

System.out.println(
"[CLIENT ERROR] "
+ e.getMessage()
);
}
}
}
72 changes: 72 additions & 0 deletions src/com/videoprocessor/monitor/MetricsSnapshot.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.videoprocessor.monitor;

import java.io.Serializable;

public class MetricsSnapshot
implements Serializable {

private final int processedJobs;

private final int failedJobs;

private final int queueSize;

private final int activeFFmpegProcesses;

public MetricsSnapshot(
int processedJobs,
int failedJobs,
int queueSize,
int activeFFmpegProcesses
) {

this.processedJobs =
processedJobs;

this.failedJobs =
failedJobs;

this.queueSize =
queueSize;

this.activeFFmpegProcesses =
activeFFmpegProcesses;
}

public int getProcessedJobs() {
return processedJobs;
}

public int getFailedJobs() {
return failedJobs;
}

public int getQueueSize() {
return queueSize;
}

public int getActiveFFmpegProcesses() {
return activeFFmpegProcesses;
}

@Override
public String toString() {

return """
==============================
METRICS SNAPSHOT
==============================
Processed Jobs : %d
Failed Jobs : %d
Queue Size : %d
Active FFmpeg : %d
==============================
"""
.formatted(
processedJobs,
failedJobs,
queueSize,
activeFFmpegProcesses
);
}
}
2 changes: 1 addition & 1 deletion src/com/videoprocessor/processor/FFmpegExecutor.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//package com.videoprocessor.processor;
//
//import com.videoprocessor.resource
//import com.videoprocessor.com.videoprocessor.resource
// .FFmpegResourceManager;
Comment on lines +3 to 4
//
//import java.io.BufferedReader;
Expand Down
5 changes: 5 additions & 0 deletions src/com/videoprocessor/queue/JobQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ public JobQueue() {
this.queue = new LinkedBlockingQueue<>();
}

public int size() {

return queue.size();
}

public void submitJob(Job job) throws InterruptedException {
queue.put(job);
System.out.println(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package com.videoprocessor.resource;

import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

public class FFmpegResourceManager {

// max concurrent FFmpeg processes
private final Semaphore semaphore;

private final AtomicInteger activeProcesses =
new AtomicInteger(0);

public FFmpegResourceManager(
int maxConcurrentProcesses
) {
Expand All @@ -22,19 +25,30 @@ public void acquire()

semaphore.acquire();

int active =
activeProcesses.incrementAndGet();

System.out.println(
"[RESOURCE] Permit acquired | Available: "
+ semaphore.availablePermits()
"[RESOURCE] Permit acquired | Active FFmpeg: "
+ active
);
}

public void release() {

semaphore.release();

int active =
activeProcesses.decrementAndGet();

System.out.println(
"[RESOURCE] Permit released | Available: "
+ semaphore.availablePermits()
"[RESOURCE] Permit released | Active FFmpeg: "
+ active
);
}

public int getActiveProcesses() {

return activeProcesses.get();
}
}
44 changes: 44 additions & 0 deletions src/com/videoprocessor/telemetry/MetricsClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.videoprocessor.telemetry;

import com.videoprocessor.monitor
.MetricsSnapshot;

import java.io.ObjectOutputStream;
import java.net.Socket;

public class MetricsClient {

private static final String HOST =
"localhost";

private static final int PORT = 8081;

public void send(
MetricsSnapshot snapshot
) {

try (

Socket socket =
new Socket(HOST, PORT);

ObjectOutputStream output =
new ObjectOutputStream(
socket.getOutputStream()
)

) {

output.writeObject(snapshot);

output.flush();

} catch (Exception e) {

System.out.println(
"[TELEMETRY ERROR] "
+ e.getMessage()
);
}
}
}
Loading