Compare commits

...

5 Commits

4 changed files with 239 additions and 20 deletions

View File

@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\EDPlayerJournal\EDPlayerJournal.csproj" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,22 @@
using EDPlayerJournal;
using EDPlayerJournal.Entries;
namespace EDJournalWatcher;
public class Program {
public static void Main(string[] args) {
PlayerJournal journal = new();
JournalStream stream = new(journal);
stream.NewJournalEntry += Stream_NewJournalEntry;
while (true) {
stream.ProcessQueues();
Thread.Sleep(100);
}
}
private static void Stream_NewJournalEntry(Entry entry) {
Console.WriteLine(entry.ToString());
}
}

View File

@@ -7,6 +7,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EDPlayerJournal", "EDPlayer
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EDPlayerJournalTests", "EDPlayerJournalTests\EDPlayerJournalTests.csproj", "{FAFE4437-B1AE-0255-B1C1-6350A81816F1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EDJournalWatcher", "EDJournalWatcher\EDJournalWatcher.csproj", "{DF502BDA-F888-476E-A3B9-CD857359FA7E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -21,6 +23,10 @@ Global
{FAFE4437-B1AE-0255-B1C1-6350A81816F1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FAFE4437-B1AE-0255-B1C1-6350A81816F1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FAFE4437-B1AE-0255-B1C1-6350A81816F1}.Release|Any CPU.Build.0 = Release|Any CPU
{DF502BDA-F888-476E-A3B9-CD857359FA7E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DF502BDA-F888-476E-A3B9-CD857359FA7E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DF502BDA-F888-476E-A3B9-CD857359FA7E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DF502BDA-F888-476E-A3B9-CD857359FA7E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

View File

@@ -1,10 +1,150 @@
using EDPlayerJournal.Entries;
using System.Linq;
using System.Threading;
namespace EDPlayerJournal;
public class SyncQueue<T> {
private Queue<T> queue = new();
public void Enqueue(T item) {
Monitor.Enter(queue);
try {
queue.Enqueue(item);
} finally {
Monitor.Exit(queue);
}
}
public T? TryDequeue() {
if (Monitor.TryEnter(queue)) {
T retval;
try {
queue.TryDequeue(out retval);
} finally {
Monitor.Exit(queue);
}
return retval;
}
return default(T);
}
public T Dequeue() {
T retval;
Monitor.Enter(queue);
try {
retval = queue.Dequeue();
} finally {
Monitor.Exit(queue);
}
return retval;
}
}
internal class FileWatcherThread : IDisposable {
private string? fullPath;
private StreamReader? stream;
private FileStream? file;
private Thread? thread;
private bool done = false;
private EventWaitHandle wait = new(false, EventResetMode.AutoReset);
public string? FullPath => fullPath;
private SyncQueue<Entry> queue;
public SyncQueue<Entry> Queue => queue;
public bool Finished => done;
public FileWatcherThread(SyncQueue<Entry> queue) {
this.queue = queue;
}
public FileWatcherThread(string fullpath, SyncQueue<Entry> queue) {
this.queue = queue;
Start(fullpath);
}
public void Signal() {
wait.Set();
}
public void Start(string fullpath) {
fullPath = fullpath;
file = new FileStream(fullPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
stream = new StreamReader(file);
done = false;
thread = new Thread(ReaderThread);
thread.Start();
}
private void ReaderThread(object? args) {
while (!done) {
wait.WaitOne();
if (done || stream == null) {
return;
}
while (!done) {
var task = stream.ReadLineAsync();
if (!task.Wait(TimeSpan.FromMilliseconds(10))) {
break;
}
string? line = task.Result;
if (line == null) {
break;
}
Entry? entry = null;
try {
entry = Entry.Parse(line);
} catch (Exception) {
// TODO: error handling on wrong entries
}
if (entry != null) {
queue.Enqueue(entry);
if (entry.Is(Events.Shutdown)) {
done = true;
break;
}
}
}
}
}
public void Close() {
if (thread != null) {
done = true;
Signal();
thread.Join();
thread = null;
}
if (stream != null) {
stream.Close();
stream = null;
}
if (file != null) {
file.Close();
file = null;
}
}
void IDisposable.Dispose() {
Close();
}
}
namespace EDPlayerJournal;
public class JournalStream {
private FileSystemWatcher? watcher = null;
private Dictionary<string, StreamReader> streams = new Dictionary<string, StreamReader>();
private Dictionary<string, FileWatcherThread> streams = new();
public delegate void NewJournalEntryDelegate(Entry entry);
@@ -12,6 +152,10 @@ public class JournalStream {
public PlayerJournal? Journal;
private SyncQueue<Entry> queue = new();
public SyncQueue<Entry> Queue => queue;
public JournalStream() {
}
@@ -21,14 +165,54 @@ public class JournalStream {
}
private void AddFileToStreams(string path, bool seekend = false) {
if (!streams.ContainsKey(path) && JournalFile.VerifyFile(path)) {
var filestream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
streams[path] = new StreamReader(filestream);
if (seekend) {
streams[path].BaseStream.Seek(0, SeekOrigin.End);
}
if (!JournalFile.VerifyFile(path)) {
return;
}
if (streams.ContainsKey(path)) {
return;
}
FileWatcherThread thread = new(path, queue);
streams[path] = thread;
}
private void CleanupStreams() {
var toRemove = streams
.Where(x => (x.Value != null && x.Value.Finished) || x.Value == null)
.Select(x => x)
;
foreach (var stream in toRemove) {
if (stream.Value != null) {
stream.Value.Close();
}
streams.Remove(stream.Key);
}
}
/// <summary>
/// Processes all file watcher thread queues in the current thread. This must be
/// called from the main event loop sometimes to make sure all NewJournalEntry
/// delegates fire correctly.
/// </summary>
public void ProcessQueues() {
Entry? entry;
while ((entry = queue.TryDequeue()) != null) {
NewJournalEntry?.Invoke(entry);
}
CleanupStreams();
}
public void ProcessQueuesWait() {
Entry? entry;
while ((entry = queue.Dequeue()) != null) {
NewJournalEntry?.Invoke(entry);
}
CleanupStreams();
}
public void Open() {
@@ -64,6 +248,9 @@ public class JournalStream {
watcher.EnableRaisingEvents = false;
}
watcher = null;
foreach (var stream in streams) {
stream.Value.Close();
}
streams.Clear();
}
@@ -79,16 +266,6 @@ public class JournalStream {
return;
}
string? line;
while ((line = stream.ReadLine()) != null) {
try {
Entry? entry = Entry.Parse(line);
if (entry != null) {
NewJournalEntry?.Invoke(entry);
}
} catch (Exception) {
}
}
stream.Signal();
}
}