Compare commits

...

4 Commits

Author SHA1 Message Date
38807cc198 fix deadlock 2025-08-06 15:59:52 +02:00
26bf6f5e02 user proper dequeue method 2025-08-06 14:44:36 +02:00
a3ddb00b69 properly close all threads 2025-08-06 14:39:52 +02:00
82dc16fb1b update JournalStream with single queue 2025-08-01 13:57:19 +02:00
2 changed files with 91 additions and 52 deletions

View File

@@ -12,6 +12,7 @@ public class Program {
while (true) { while (true) {
stream.ProcessQueues(); stream.ProcessQueues();
Thread.Sleep(100);
} }
} }

View File

@@ -4,6 +4,48 @@ using System.Threading;
namespace EDPlayerJournal; namespace EDPlayerJournal;
public class SyncQueue<T> {
private Queue<T> queue = new();
public void Enqueue(T item) {
Monitor.Enter(queue);
try {
queue.Enqueue(item);
} finally {
Monitor.Exit(queue);
}
}
public T? TryDequeue() {
if (Monitor.TryEnter(queue)) {
T retval;
try {
queue.TryDequeue(out retval);
} finally {
Monitor.Exit(queue);
}
return retval;
}
return default(T);
}
public T Dequeue() {
T retval;
Monitor.Enter(queue);
try {
retval = queue.Dequeue();
} finally {
Monitor.Exit(queue);
}
return retval;
}
}
internal class FileWatcherThread : IDisposable { internal class FileWatcherThread : IDisposable {
private string? fullPath; private string? fullPath;
private StreamReader? stream; private StreamReader? stream;
@@ -14,38 +56,21 @@ internal class FileWatcherThread : IDisposable {
public string? FullPath => fullPath; public string? FullPath => fullPath;
private Mutex mutex = new(); private SyncQueue<Entry> queue;
private Queue<Entry> entries = new();
public SyncQueue<Entry> Queue => queue;
public bool Finished => done; public bool Finished => done;
public FileWatcherThread() { public FileWatcherThread(SyncQueue<Entry> queue) {
this.queue = queue;
} }
public FileWatcherThread(string fullpath) { public FileWatcherThread(string fullpath, SyncQueue<Entry> queue) {
this.queue = queue;
Start(fullpath); Start(fullpath);
} }
public bool IsEmpty {
get {
lock (mutex) {
return entries.Count == 0;
}
}
}
public Entry Dequeue() {
lock (mutex) {
return entries.Dequeue();
}
}
public void Enqueue(Entry e) {
lock (mutex) {
entries.Enqueue(e);
}
}
public void Signal() { public void Signal() {
wait.Set(); wait.Set();
} }
@@ -84,7 +109,7 @@ internal class FileWatcherThread : IDisposable {
// TODO: error handling on wrong entries // TODO: error handling on wrong entries
} }
if (entry != null) { if (entry != null) {
Enqueue(entry); queue.Enqueue(entry);
if (entry.Is(Events.Shutdown)) { if (entry.Is(Events.Shutdown)) {
done = true; done = true;
break; break;
@@ -95,6 +120,12 @@ internal class FileWatcherThread : IDisposable {
} }
public void Close() { public void Close() {
if (thread != null) {
done = true;
Signal();
thread.Join();
thread = null;
}
if (stream != null) { if (stream != null) {
stream.Close(); stream.Close();
stream = null; stream = null;
@@ -103,13 +134,6 @@ internal class FileWatcherThread : IDisposable {
file.Close(); file.Close();
file = null; file = null;
} }
if (thread != null) {
done = true;
Signal();
thread.Join();
thread = null;
}
entries.Clear();
} }
void IDisposable.Dispose() { void IDisposable.Dispose() {
@@ -128,6 +152,10 @@ public class JournalStream {
public PlayerJournal? Journal; public PlayerJournal? Journal;
private SyncQueue<Entry> queue = new();
public SyncQueue<Entry> Queue => queue;
public JournalStream() { public JournalStream() {
} }
@@ -145,29 +173,11 @@ public class JournalStream {
return; return;
} }
FileWatcherThread thread = new(path); FileWatcherThread thread = new(path, queue);
streams[path] = thread; streams[path] = thread;
} }
/// <summary> private void CleanupStreams() {
/// Processes all file watcher thread queues in the current thread. This must be
/// called from the main event loop sometimes to make sure all NewJournalEntry
/// delegates fire correctly.
/// </summary>
public void ProcessQueues() {
foreach (var stream in streams) {
if (stream.Value == null) {
continue;
}
if (stream.Value.IsEmpty) {
continue;
}
Entry entry = stream.Value.Dequeue();
NewJournalEntry?.Invoke(entry);
}
var toRemove = streams var toRemove = streams
.Where(x => (x.Value != null && x.Value.Finished) || x.Value == null) .Where(x => (x.Value != null && x.Value.Finished) || x.Value == null)
.Select(x => x) .Select(x => x)
@@ -180,6 +190,31 @@ public class JournalStream {
} }
} }
/// <summary>
/// Processes all file watcher thread queues in the current thread. This must be
/// called from the main event loop sometimes to make sure all NewJournalEntry
/// delegates fire correctly.
/// </summary>
public void ProcessQueues() {
Entry? entry;
while ((entry = queue.TryDequeue()) != null) {
NewJournalEntry?.Invoke(entry);
}
CleanupStreams();
}
public void ProcessQueuesWait() {
Entry? entry;
while ((entry = queue.Dequeue()) != null) {
NewJournalEntry?.Invoke(entry);
}
CleanupStreams();
}
public void Open() { public void Open() {
if (watcher != null) { if (watcher != null) {
return; return;
@@ -213,6 +248,9 @@ public class JournalStream {
watcher.EnableRaisingEvents = false; watcher.EnableRaisingEvents = false;
} }
watcher = null; watcher = null;
foreach (var stream in streams) {
stream.Value.Close();
}
streams.Clear(); streams.Clear();
} }