refactor JournalStream into multi-thread

This commit is contained in:
Florian Stinglmayr 2025-08-01 12:43:16 +02:00
parent f21bf5ea5e
commit ed68876300
4 changed files with 198 additions and 18 deletions

View File

@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\EDPlayerJournal\EDPlayerJournal.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,21 @@
using EDPlayerJournal;
using EDPlayerJournal.Entries;
namespace EDJournalWatcher;
public class Program {
public static void Main(string[] args) {
PlayerJournal journal = new();
JournalStream stream = new(journal);
stream.NewJournalEntry += Stream_NewJournalEntry;
while (true) {
stream.ProcessQueues();
}
}
private static void Stream_NewJournalEntry(Entry entry) {
Console.WriteLine(entry.ToString());
}
}

View File

@ -7,6 +7,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EDPlayerJournal", "EDPlayer
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EDPlayerJournalTests", "EDPlayerJournalTests\EDPlayerJournalTests.csproj", "{FAFE4437-B1AE-0255-B1C1-6350A81816F1}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EDPlayerJournalTests", "EDPlayerJournalTests\EDPlayerJournalTests.csproj", "{FAFE4437-B1AE-0255-B1C1-6350A81816F1}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EDJournalWatcher", "EDJournalWatcher\EDJournalWatcher.csproj", "{DF502BDA-F888-476E-A3B9-CD857359FA7E}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@ -21,6 +23,10 @@ Global
{FAFE4437-B1AE-0255-B1C1-6350A81816F1}.Debug|Any CPU.Build.0 = Debug|Any CPU {FAFE4437-B1AE-0255-B1C1-6350A81816F1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FAFE4437-B1AE-0255-B1C1-6350A81816F1}.Release|Any CPU.ActiveCfg = Release|Any CPU {FAFE4437-B1AE-0255-B1C1-6350A81816F1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FAFE4437-B1AE-0255-B1C1-6350A81816F1}.Release|Any CPU.Build.0 = Release|Any CPU {FAFE4437-B1AE-0255-B1C1-6350A81816F1}.Release|Any CPU.Build.0 = Release|Any CPU
{DF502BDA-F888-476E-A3B9-CD857359FA7E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DF502BDA-F888-476E-A3B9-CD857359FA7E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DF502BDA-F888-476E-A3B9-CD857359FA7E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DF502BDA-F888-476E-A3B9-CD857359FA7E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE

View File

@ -1,10 +1,126 @@
using EDPlayerJournal.Entries; using EDPlayerJournal.Entries;
using System.Linq;
using System.Threading;
namespace EDPlayerJournal;
internal class FileWatcherThread : IDisposable {
private string? fullPath;
private StreamReader? stream;
private FileStream? file;
private Thread? thread;
private bool done = false;
private EventWaitHandle wait = new(false, EventResetMode.AutoReset);
public string? FullPath => fullPath;
private Mutex mutex = new();
private Queue<Entry> entries = new();
public bool Finished => done;
public FileWatcherThread() {
}
public FileWatcherThread(string fullpath) {
Start(fullpath);
}
public bool IsEmpty {
get {
lock (mutex) {
return entries.Count == 0;
}
}
}
public Entry Dequeue() {
lock (mutex) {
return entries.Dequeue();
}
}
public void Enqueue(Entry e) {
lock (mutex) {
entries.Enqueue(e);
}
}
public void Signal() {
wait.Set();
}
public void Start(string fullpath) {
fullPath = fullpath;
file = new FileStream(fullPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
stream = new StreamReader(file);
done = false;
thread = new Thread(ReaderThread);
thread.Start();
}
private void ReaderThread(object? args) {
while (!done) {
wait.WaitOne();
if (done || stream == null) {
return;
}
while (!done) {
var task = stream.ReadLineAsync();
if (!task.Wait(TimeSpan.FromMilliseconds(10))) {
break;
}
string? line = task.Result;
if (line == null) {
break;
}
Entry? entry = null;
try {
entry = Entry.Parse(line);
} catch (Exception) {
// TODO: error handling on wrong entries
}
if (entry != null) {
Enqueue(entry);
if (entry.Is(Events.Shutdown)) {
done = true;
break;
}
}
}
}
}
public void Close() {
if (stream != null) {
stream.Close();
stream = null;
}
if (file != null) {
file.Close();
file = null;
}
if (thread != null) {
done = true;
Signal();
thread.Join();
thread = null;
}
entries.Clear();
}
void IDisposable.Dispose() {
Close();
}
}
namespace EDPlayerJournal;
public class JournalStream { public class JournalStream {
private FileSystemWatcher? watcher = null; private FileSystemWatcher? watcher = null;
private Dictionary<string, StreamReader> streams = new Dictionary<string, StreamReader>(); private Dictionary<string, FileWatcherThread> streams = new();
public delegate void NewJournalEntryDelegate(Entry entry); public delegate void NewJournalEntryDelegate(Entry entry);
@ -21,13 +137,46 @@ public class JournalStream {
} }
private void AddFileToStreams(string path, bool seekend = false) { private void AddFileToStreams(string path, bool seekend = false) {
if (!streams.ContainsKey(path) && JournalFile.VerifyFile(path)) { if (!JournalFile.VerifyFile(path)) {
var filestream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); return;
streams[path] = new StreamReader(filestream); }
if (seekend) { if (streams.ContainsKey(path)) {
streams[path].BaseStream.Seek(0, SeekOrigin.End); return;
}
FileWatcherThread thread = new(path);
streams[path] = thread;
}
/// <summary>
/// Processes all file watcher thread queues in the current thread. This must be
/// called from the main event loop sometimes to make sure all NewJournalEntry
/// delegates fire correctly.
/// </summary>
public void ProcessQueues() {
foreach (var stream in streams) {
if (stream.Value == null) {
continue;
} }
if (stream.Value.IsEmpty) {
continue;
}
Entry entry = stream.Value.Dequeue();
NewJournalEntry?.Invoke(entry);
}
var toRemove = streams
.Where(x => (x.Value != null && x.Value.Finished) || x.Value == null)
.Select(x => x)
;
foreach (var stream in toRemove) {
if (stream.Value != null) {
stream.Value.Close();
}
streams.Remove(stream.Key);
} }
} }
@ -79,16 +228,6 @@ public class JournalStream {
return; return;
} }
string? line; stream.Signal();
while ((line = stream.ReadLine()) != null) {
try {
Entry? entry = Entry.Parse(line);
if (entry != null) {
NewJournalEntry?.Invoke(entry);
}
} catch (Exception) {
}
}
} }
} }