From ed68876300b2b32b97c2dfc436f3fb98e7f1a45d Mon Sep 17 00:00:00 2001 From: Florian Stinglmayr Date: Fri, 1 Aug 2025 12:43:16 +0200 Subject: [PATCH] refactor JournalStream into multi-thread --- EDJournalWatcher/EDJournalWatcher.csproj | 14 ++ EDJournalWatcher/Program.cs | 21 +++ EDPlayerJournal.sln | 6 + EDPlayerJournal/JournalStream.cs | 175 ++++++++++++++++++++--- 4 files changed, 198 insertions(+), 18 deletions(-) create mode 100644 EDJournalWatcher/EDJournalWatcher.csproj create mode 100644 EDJournalWatcher/Program.cs diff --git a/EDJournalWatcher/EDJournalWatcher.csproj b/EDJournalWatcher/EDJournalWatcher.csproj new file mode 100644 index 0000000..d9ed817 --- /dev/null +++ b/EDJournalWatcher/EDJournalWatcher.csproj @@ -0,0 +1,14 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + diff --git a/EDJournalWatcher/Program.cs b/EDJournalWatcher/Program.cs new file mode 100644 index 0000000..e9f9770 --- /dev/null +++ b/EDJournalWatcher/Program.cs @@ -0,0 +1,21 @@ +using EDPlayerJournal; +using EDPlayerJournal.Entries; + +namespace EDJournalWatcher; + +public class Program { + public static void Main(string[] args) { + PlayerJournal journal = new(); + JournalStream stream = new(journal); + + stream.NewJournalEntry += Stream_NewJournalEntry; + + while (true) { + stream.ProcessQueues(); + } + } + + private static void Stream_NewJournalEntry(Entry entry) { + Console.WriteLine(entry.ToString()); + } +} \ No newline at end of file diff --git a/EDPlayerJournal.sln b/EDPlayerJournal.sln index e3d1c75..2184bed 100644 --- a/EDPlayerJournal.sln +++ b/EDPlayerJournal.sln @@ -7,6 +7,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EDPlayerJournal", "EDPlayer EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EDPlayerJournalTests", "EDPlayerJournalTests\EDPlayerJournalTests.csproj", "{FAFE4437-B1AE-0255-B1C1-6350A81816F1}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EDJournalWatcher", "EDJournalWatcher\EDJournalWatcher.csproj", "{DF502BDA-F888-476E-A3B9-CD857359FA7E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -21,6 +23,10 @@ Global {FAFE4437-B1AE-0255-B1C1-6350A81816F1}.Debug|Any CPU.Build.0 = Debug|Any CPU {FAFE4437-B1AE-0255-B1C1-6350A81816F1}.Release|Any CPU.ActiveCfg = Release|Any CPU {FAFE4437-B1AE-0255-B1C1-6350A81816F1}.Release|Any CPU.Build.0 = Release|Any CPU + {DF502BDA-F888-476E-A3B9-CD857359FA7E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DF502BDA-F888-476E-A3B9-CD857359FA7E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DF502BDA-F888-476E-A3B9-CD857359FA7E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DF502BDA-F888-476E-A3B9-CD857359FA7E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/EDPlayerJournal/JournalStream.cs b/EDPlayerJournal/JournalStream.cs index 2aa0565..7104c29 100644 --- a/EDPlayerJournal/JournalStream.cs +++ b/EDPlayerJournal/JournalStream.cs @@ -1,10 +1,126 @@ using EDPlayerJournal.Entries; +using System.Linq; +using System.Threading; + +namespace EDPlayerJournal; + +internal class FileWatcherThread : IDisposable { + private string? fullPath; + private StreamReader? stream; + private FileStream? file; + private Thread? thread; + private bool done = false; + private EventWaitHandle wait = new(false, EventResetMode.AutoReset); + + public string? FullPath => fullPath; + + private Mutex mutex = new(); + private Queue entries = new(); + + public bool Finished => done; + + public FileWatcherThread() { + } + + public FileWatcherThread(string fullpath) { + Start(fullpath); + } + + public bool IsEmpty { + get { + lock (mutex) { + return entries.Count == 0; + } + } + } + + public Entry Dequeue() { + lock (mutex) { + return entries.Dequeue(); + } + } + + public void Enqueue(Entry e) { + lock (mutex) { + entries.Enqueue(e); + } + } + + public void Signal() { + wait.Set(); + } + + public void Start(string fullpath) { + fullPath = fullpath; + file = new FileStream(fullPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); + stream = new StreamReader(file); + + done = false; + thread = new Thread(ReaderThread); + thread.Start(); + } + + private void ReaderThread(object? args) { + while (!done) { + wait.WaitOne(); + + if (done || stream == null) { + return; + } + + while (!done) { + var task = stream.ReadLineAsync(); + if (!task.Wait(TimeSpan.FromMilliseconds(10))) { + break; + } + string? line = task.Result; + if (line == null) { + break; + } + Entry? entry = null; + try { + entry = Entry.Parse(line); + } catch (Exception) { + // TODO: error handling on wrong entries + } + if (entry != null) { + Enqueue(entry); + if (entry.Is(Events.Shutdown)) { + done = true; + break; + } + } + } + } + } + + public void Close() { + if (stream != null) { + stream.Close(); + stream = null; + } + if (file != null) { + file.Close(); + file = null; + } + if (thread != null) { + done = true; + Signal(); + thread.Join(); + thread = null; + } + entries.Clear(); + } + + void IDisposable.Dispose() { + Close(); + } +} -namespace EDPlayerJournal; public class JournalStream { private FileSystemWatcher? watcher = null; - private Dictionary streams = new Dictionary(); + private Dictionary streams = new(); public delegate void NewJournalEntryDelegate(Entry entry); @@ -21,13 +137,46 @@ public class JournalStream { } private void AddFileToStreams(string path, bool seekend = false) { - if (!streams.ContainsKey(path) && JournalFile.VerifyFile(path)) { - var filestream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); - streams[path] = new StreamReader(filestream); + if (!JournalFile.VerifyFile(path)) { + return; + } - if (seekend) { - streams[path].BaseStream.Seek(0, SeekOrigin.End); + if (streams.ContainsKey(path)) { + return; + } + + FileWatcherThread thread = new(path); + streams[path] = thread; + } + + /// + /// Processes all file watcher thread queues in the current thread. This must be + /// called from the main event loop sometimes to make sure all NewJournalEntry + /// delegates fire correctly. + /// + public void ProcessQueues() { + foreach (var stream in streams) { + if (stream.Value == null) { + continue; } + + if (stream.Value.IsEmpty) { + continue; + } + + Entry entry = stream.Value.Dequeue(); + NewJournalEntry?.Invoke(entry); + } + + var toRemove = streams + .Where(x => (x.Value != null && x.Value.Finished) || x.Value == null) + .Select(x => x) + ; + foreach (var stream in toRemove) { + if (stream.Value != null) { + stream.Value.Close(); + } + streams.Remove(stream.Key); } } @@ -79,16 +228,6 @@ public class JournalStream { return; } - string? line; - - while ((line = stream.ReadLine()) != null) { - try { - Entry? entry = Entry.Parse(line); - if (entry != null) { - NewJournalEntry?.Invoke(entry); - } - } catch (Exception) { - } - } + stream.Signal(); } }