diff --git a/src/com/videoprocessor/Main.java b/src/com/videoprocessor/Main.java index f54a96f..144af81 100644 --- a/src/com/videoprocessor/Main.java +++ b/src/com/videoprocessor/Main.java @@ -9,12 +9,15 @@ import com.videoprocessor.storage.VideoStorageScanner; import com.videoprocessor.processor.FFmpegExecutor; import com.videoprocessor.resource.FFmpegResourceManager; +// Metrics +import com.videoprocessor.telemetry.MetricsPublisher; public class Main { public static void main(String[] args) throws Exception { + JobQueue jobQueue = new JobQueue(); MetricsTracker metricsTracker = @@ -32,6 +35,15 @@ public static void main(String[] args) FFmpegExecutor ffmpegExecutor = new FFmpegExecutor(resourceManager); + // Metrics Section + MetricsPublisher metricsPublisher = + new MetricsPublisher( + metricsTracker, + jobQueue, + resourceManager + ); + // Metrics end + WorkerManager manager = new WorkerManager( metricsTracker, @@ -40,6 +52,8 @@ public static void main(String[] args) ffmpegExecutor ); + // Initializing metrics + metricsPublisher.start(); manager.start(jobQueue); int totalJobs = 50000; @@ -66,7 +80,22 @@ public static void main(String[] args) jobQueue.submitJob(job); } - Thread.sleep(5000); + while (true) { + + boolean queueEmpty = + jobQueue.size() == 0; + + boolean noActiveJobs = + metricsTracker + .getActiveJobs() == 0; + + if (queueEmpty && noActiveJobs) { + + break; + } + + Thread.sleep(1000); + } manager.shutdown(); diff --git a/src/com/videoprocessor/MonitorApplication.java b/src/com/videoprocessor/MonitorApplication.java new file mode 100644 index 0000000..0bebe59 --- /dev/null +++ b/src/com/videoprocessor/MonitorApplication.java @@ -0,0 +1,15 @@ +package com.videoprocessor; + +import com.videoprocessor.monitor + .MetricsServer; + +public class MonitorApplication { + + public static void main(String[] args) { + + MetricsServer server = + new MetricsServer(); + + server.start(); + } +} \ No newline at end of file diff --git a/src/com/videoprocessor/metrics/MetricsTracker.java b/src/com/videoprocessor/metrics/MetricsTracker.java index 9ffe52b..864960c 100644 --- a/src/com/videoprocessor/metrics/MetricsTracker.java +++ b/src/com/videoprocessor/metrics/MetricsTracker.java @@ -7,6 +7,12 @@ public class MetricsTracker { private final AtomicInteger processedJobs = new AtomicInteger(0); + private final AtomicInteger failedJobs = + new AtomicInteger(0); + + private final AtomicInteger activeJobs = + new AtomicInteger(0); + public void incrementProcessedJobs() { int updatedValue = @@ -18,7 +24,39 @@ public void incrementProcessedJobs() { ); } + public void incrementFailedJobs() { + + int updatedValue = + failedJobs.incrementAndGet(); + + System.out.println( + "[METRICS] Failed Jobs: " + + updatedValue + ); + } + public int getProcessedJobs() { + return processedJobs.get(); } + + public int getFailedJobs() { + + return failedJobs.get(); + } + + public void incrementActiveJobs() { + + activeJobs.incrementAndGet(); + } + + public void decrementActiveJobs() { + + activeJobs.decrementAndGet(); + } + + public int getActiveJobs() { + + return activeJobs.get(); + } } \ No newline at end of file diff --git a/src/com/videoprocessor/monitor/DashboardRenderer.java b/src/com/videoprocessor/monitor/DashboardRenderer.java new file mode 100644 index 0000000..a886479 --- /dev/null +++ b/src/com/videoprocessor/monitor/DashboardRenderer.java @@ -0,0 +1,4 @@ +package com.videoprocessor.monitor; + +public class DashboardRenderer { +} diff --git a/src/com/videoprocessor/monitor/MetricsServer.java b/src/com/videoprocessor/monitor/MetricsServer.java new file mode 100644 index 0000000..dadef16 --- /dev/null +++ b/src/com/videoprocessor/monitor/MetricsServer.java @@ -0,0 +1,63 @@ +package com.videoprocessor.monitor; + +import java.io.ObjectInputStream; +import java.net.ServerSocket; +import java.net.Socket; + +public class MetricsServer { + + private static final int PORT = 8081; + + public void start() { + + System.out.println( + "[MONITOR] Starting metrics server..." + ); + + try (ServerSocket serverSocket = + new ServerSocket(PORT)) { + + while (true) { + + Socket socket = + serverSocket.accept(); + + Thread.ofVirtual().start(() -> { + + handleClient(socket); + }); + } + + } catch (Exception e) { + + System.out.println( + "[MONITOR ERROR] " + + e.getMessage() + ); + } + } + + private void handleClient( + Socket socket + ) { + + try (ObjectInputStream input = + new ObjectInputStream( + socket.getInputStream() + )) { + + MetricsSnapshot snapshot = + (MetricsSnapshot) + input.readObject(); + + System.out.println(snapshot); + + } catch (Exception e) { + + System.out.println( + "[CLIENT ERROR] " + + e.getMessage() + ); + } + } +} \ No newline at end of file diff --git a/src/com/videoprocessor/monitor/MetricsSnapshot.java b/src/com/videoprocessor/monitor/MetricsSnapshot.java new file mode 100644 index 0000000..09a6425 --- /dev/null +++ b/src/com/videoprocessor/monitor/MetricsSnapshot.java @@ -0,0 +1,72 @@ +package com.videoprocessor.monitor; + +import java.io.Serializable; + +public class MetricsSnapshot + implements Serializable { + + private final int processedJobs; + + private final int failedJobs; + + private final int queueSize; + + private final int activeFFmpegProcesses; + + public MetricsSnapshot( + int processedJobs, + int failedJobs, + int queueSize, + int activeFFmpegProcesses + ) { + + this.processedJobs = + processedJobs; + + this.failedJobs = + failedJobs; + + this.queueSize = + queueSize; + + this.activeFFmpegProcesses = + activeFFmpegProcesses; + } + + public int getProcessedJobs() { + return processedJobs; + } + + public int getFailedJobs() { + return failedJobs; + } + + public int getQueueSize() { + return queueSize; + } + + public int getActiveFFmpegProcesses() { + return activeFFmpegProcesses; + } + + @Override + public String toString() { + + return """ + ============================== + METRICS SNAPSHOT + ============================== + Processed Jobs : %d + Failed Jobs : %d + Queue Size : %d + Active FFmpeg : %d + ============================== + """ + .formatted( + processedJobs, + failedJobs, + queueSize, + activeFFmpegProcesses + ); + } +} \ No newline at end of file diff --git a/src/com/videoprocessor/processor/FFmpegExecutor.java b/src/com/videoprocessor/processor/FFmpegExecutor.java index 0985c58..7d40fcf 100644 --- a/src/com/videoprocessor/processor/FFmpegExecutor.java +++ b/src/com/videoprocessor/processor/FFmpegExecutor.java @@ -1,6 +1,6 @@ //package com.videoprocessor.processor; // -//import com.videoprocessor.resource +//import com.videoprocessor.com.videoprocessor.resource // .FFmpegResourceManager; // //import java.io.BufferedReader; diff --git a/src/com/videoprocessor/queue/JobQueue.java b/src/com/videoprocessor/queue/JobQueue.java index d226350..76cc927 100644 --- a/src/com/videoprocessor/queue/JobQueue.java +++ b/src/com/videoprocessor/queue/JobQueue.java @@ -13,6 +13,11 @@ public JobQueue() { this.queue = new LinkedBlockingQueue<>(); } + public int size() { + + return queue.size(); + } + public void submitJob(Job job) throws InterruptedException { queue.put(job); System.out.println( diff --git a/src/resource/FFmpegResourceManager.java b/src/com/videoprocessor/resource/FFmpegResourceManager.java similarity index 52% rename from src/resource/FFmpegResourceManager.java rename to src/com/videoprocessor/resource/FFmpegResourceManager.java index 7a6bbd2..46abf18 100644 --- a/src/resource/FFmpegResourceManager.java +++ b/src/com/videoprocessor/resource/FFmpegResourceManager.java @@ -1,12 +1,15 @@ package com.videoprocessor.resource; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; public class FFmpegResourceManager { - // max concurrent FFmpeg processes private final Semaphore semaphore; + private final AtomicInteger activeProcesses = + new AtomicInteger(0); + public FFmpegResourceManager( int maxConcurrentProcesses ) { @@ -22,9 +25,12 @@ public void acquire() semaphore.acquire(); + int active = + activeProcesses.incrementAndGet(); + System.out.println( - "[RESOURCE] Permit acquired | Available: " - + semaphore.availablePermits() + "[RESOURCE] Permit acquired | Active FFmpeg: " + + active ); } @@ -32,9 +38,17 @@ public void release() { semaphore.release(); + int active = + activeProcesses.decrementAndGet(); + System.out.println( - "[RESOURCE] Permit released | Available: " - + semaphore.availablePermits() + "[RESOURCE] Permit released | Active FFmpeg: " + + active ); } + + public int getActiveProcesses() { + + return activeProcesses.get(); + } } \ No newline at end of file diff --git a/src/com/videoprocessor/telemetry/MetricsClient.java b/src/com/videoprocessor/telemetry/MetricsClient.java new file mode 100644 index 0000000..9da8eac --- /dev/null +++ b/src/com/videoprocessor/telemetry/MetricsClient.java @@ -0,0 +1,44 @@ +package com.videoprocessor.telemetry; + +import com.videoprocessor.monitor + .MetricsSnapshot; + +import java.io.ObjectOutputStream; +import java.net.Socket; + +public class MetricsClient { + + private static final String HOST = + "localhost"; + + private static final int PORT = 8081; + + public void send( + MetricsSnapshot snapshot + ) { + + try ( + + Socket socket = + new Socket(HOST, PORT); + + ObjectOutputStream output = + new ObjectOutputStream( + socket.getOutputStream() + ) + + ) { + + output.writeObject(snapshot); + + output.flush(); + + } catch (Exception e) { + + System.out.println( + "[TELEMETRY ERROR] " + + e.getMessage() + ); + } + } +} \ No newline at end of file diff --git a/src/com/videoprocessor/telemetry/MetricsPublisher.java b/src/com/videoprocessor/telemetry/MetricsPublisher.java new file mode 100644 index 0000000..cce8b07 --- /dev/null +++ b/src/com/videoprocessor/telemetry/MetricsPublisher.java @@ -0,0 +1,77 @@ +package com.videoprocessor.telemetry; + +import com.videoprocessor.metrics + .MetricsTracker; +import com.videoprocessor.monitor + .MetricsSnapshot; +import com.videoprocessor.queue + .JobQueue; +import com.videoprocessor.resource + .FFmpegResourceManager; + +public class MetricsPublisher { + + private final MetricsTracker + metricsTracker; + + private final JobQueue jobQueue; + + private final FFmpegResourceManager + resourceManager; + + private final MetricsClient client = + new MetricsClient(); + + public MetricsPublisher( + MetricsTracker metricsTracker, + JobQueue jobQueue, + FFmpegResourceManager resourceManager + ) { + + this.metricsTracker = + metricsTracker; + + this.jobQueue = + jobQueue; + + this.resourceManager = + resourceManager; + } + + public void start() { + + Thread.ofVirtual().start(() -> { + + while (true) { + + try { + + MetricsSnapshot snapshot = + new MetricsSnapshot( + metricsTracker + .getProcessedJobs(), + + metricsTracker + .getFailedJobs(), + + jobQueue.size(), + + resourceManager + .getActiveProcesses() + ); + + client.send(snapshot); + + Thread.sleep(3000); + + } catch (Exception e) { + + System.out.println( + "[PUBLISHER ERROR] " + + e.getMessage() + ); + } + } + }); + } +} \ No newline at end of file diff --git a/src/com/videoprocessor/worker/Worker.java b/src/com/videoprocessor/worker/Worker.java index 56f86cd..8ea337f 100644 --- a/src/com/videoprocessor/worker/Worker.java +++ b/src/com/videoprocessor/worker/Worker.java @@ -8,7 +8,6 @@ import com.videoprocessor.retry.RetryHandler; import com.videoprocessor.status.JobStatusTracker; - public class Worker { private final MetricsTracker metricsTracker; @@ -26,11 +25,14 @@ public Worker( FFmpegExecutor executor ) { - this.metricsTracker = metricsTracker; + this.metricsTracker = + metricsTracker; - this.statusTracker = statusTracker; + this.statusTracker = + statusTracker; - this.retryHandler = retryHandler; + this.retryHandler = + retryHandler; this.videoProcessor = new VideoProcessor(executor); @@ -40,7 +42,8 @@ public void processJob(Job job) throws InterruptedException { String threadName = - Thread.currentThread().getName(); + Thread.currentThread() + .getName(); statusTracker.markJobProcessing( job.getJobId() @@ -82,6 +85,8 @@ public void processJob(Job job) statusTracker.markJobFailed( job.getJobId() ); + + metricsTracker.incrementFailedJobs(); } } } diff --git a/src/com/videoprocessor/worker/WorkerManager.java b/src/com/videoprocessor/worker/WorkerManager.java index cae6dd3..1180410 100644 --- a/src/com/videoprocessor/worker/WorkerManager.java +++ b/src/com/videoprocessor/worker/WorkerManager.java @@ -2,6 +2,7 @@ import com.videoprocessor.job.Job; import com.videoprocessor.metrics.MetricsTracker; +import com.videoprocessor.processor.FFmpegExecutor; import com.videoprocessor.queue.JobQueue; import com.videoprocessor.retry.RetryHandler; import com.videoprocessor.status.JobStatusTracker; @@ -9,7 +10,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import com.videoprocessor.processor.FFmpegExecutor; public class WorkerManager { @@ -17,6 +17,8 @@ public class WorkerManager { private final Worker worker; + private final MetricsTracker metricsTracker; + private Thread dispatcherThread; private volatile boolean running = true; @@ -28,6 +30,9 @@ public WorkerManager( FFmpegExecutor executor ) { + this.metricsTracker = + metricsTracker; + this.executorService = Executors.newThreadPerTaskExecutor( Thread.ofVirtual() @@ -56,6 +61,8 @@ public void start(JobQueue jobQueue) { Job job = jobQueue.takeJob(); + metricsTracker.incrementActiveJobs(); + executorService.submit(() -> { try { @@ -66,6 +73,11 @@ public void start(JobQueue jobQueue) { Thread.currentThread() .interrupt(); + + } finally { + + metricsTracker + .decrementActiveJobs(); } }); @@ -75,7 +87,8 @@ public void start(JobQueue jobQueue) { "[DISPATCHER] Interrupted" ); - Thread.currentThread().interrupt(); + Thread.currentThread() + .interrupt(); break; } @@ -120,4 +133,13 @@ public void shutdown() "[MANAGER] Shutdown complete" ); } + + public boolean isIdle( + JobQueue jobQueue + ) { + + return jobQueue.size() == 0 + && metricsTracker + .getActiveJobs() == 0; + } } \ No newline at end of file