diff --git a/EDPlayerJournal/JournalStream.cs b/EDPlayerJournal/JournalStream.cs index 7104c29..5eb61c1 100644 --- a/EDPlayerJournal/JournalStream.cs +++ b/EDPlayerJournal/JournalStream.cs @@ -4,6 +4,49 @@ using System.Threading; namespace EDPlayerJournal; +public class SyncQueue { + private Queue queue = new(); + + public void Enqueue(T item) { + Monitor.Enter(queue); + try { + queue.Enqueue(item); + } finally { + Monitor.Exit(queue); + } + } + + public T? TryDequeue() { + if (Monitor.TryEnter(queue)) { + T retval; + + Monitor.Enter(queue); + try { + retval = queue.Dequeue(); + } finally { + Monitor.Exit(queue); + } + + return retval; + } + + return default(T); + } + + public T Dequeue() { + T retval; + + Monitor.Enter(queue); + try { + retval = queue.Dequeue(); + } finally { + Monitor.Exit(queue); + } + + return retval; + } +} + internal class FileWatcherThread : IDisposable { private string? fullPath; private StreamReader? stream; @@ -14,38 +57,21 @@ internal class FileWatcherThread : IDisposable { public string? FullPath => fullPath; - private Mutex mutex = new(); - private Queue entries = new(); + private SyncQueue queue; + + public SyncQueue Queue => queue; public bool Finished => done; - public FileWatcherThread() { + public FileWatcherThread(SyncQueue queue) { + this.queue = queue; } - public FileWatcherThread(string fullpath) { + public FileWatcherThread(string fullpath, SyncQueue queue) { + this.queue = queue; Start(fullpath); } - public bool IsEmpty { - get { - lock (mutex) { - return entries.Count == 0; - } - } - } - - public Entry Dequeue() { - lock (mutex) { - return entries.Dequeue(); - } - } - - public void Enqueue(Entry e) { - lock (mutex) { - entries.Enqueue(e); - } - } - public void Signal() { wait.Set(); } @@ -84,7 +110,7 @@ internal class FileWatcherThread : IDisposable { // TODO: error handling on wrong entries } if (entry != null) { - Enqueue(entry); + queue.Enqueue(entry); if (entry.Is(Events.Shutdown)) { done = true; break; @@ -109,7 +135,6 @@ internal class FileWatcherThread : IDisposable { thread.Join(); thread = null; } - entries.Clear(); } void IDisposable.Dispose() { @@ -128,6 +153,10 @@ public class JournalStream { public PlayerJournal? Journal; + private SyncQueue queue = new(); + + public SyncQueue Queue => queue; + public JournalStream() { } @@ -145,29 +174,11 @@ public class JournalStream { return; } - FileWatcherThread thread = new(path); + FileWatcherThread thread = new(path, queue); streams[path] = thread; } - /// - /// Processes all file watcher thread queues in the current thread. This must be - /// called from the main event loop sometimes to make sure all NewJournalEntry - /// delegates fire correctly. - /// - public void ProcessQueues() { - foreach (var stream in streams) { - if (stream.Value == null) { - continue; - } - - if (stream.Value.IsEmpty) { - continue; - } - - Entry entry = stream.Value.Dequeue(); - NewJournalEntry?.Invoke(entry); - } - + private void CleanupStreams() { var toRemove = streams .Where(x => (x.Value != null && x.Value.Finished) || x.Value == null) .Select(x => x) @@ -180,6 +191,31 @@ public class JournalStream { } } + /// + /// Processes all file watcher thread queues in the current thread. This must be + /// called from the main event loop sometimes to make sure all NewJournalEntry + /// delegates fire correctly. + /// + public void ProcessQueues() { + Entry? entry; + + while ((entry = queue.TryDequeue()) != null) { + NewJournalEntry?.Invoke(entry); + } + + CleanupStreams(); + } + + public void ProcessQueuesWait() { + Entry? entry; + + while ((entry = queue.Dequeue()) != null) { + NewJournalEntry?.Invoke(entry); + } + + CleanupStreams(); + } + public void Open() { if (watcher != null) { return;