update JournalStream with single queue
This commit is contained in:
parent
ed68876300
commit
82dc16fb1b
@ -4,6 +4,49 @@ using System.Threading;
|
|||||||
|
|
||||||
namespace EDPlayerJournal;
|
namespace EDPlayerJournal;
|
||||||
|
|
||||||
|
public class SyncQueue<T> {
|
||||||
|
private Queue<T> queue = new();
|
||||||
|
|
||||||
|
public void Enqueue(T item) {
|
||||||
|
Monitor.Enter(queue);
|
||||||
|
try {
|
||||||
|
queue.Enqueue(item);
|
||||||
|
} finally {
|
||||||
|
Monitor.Exit(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public T? TryDequeue() {
|
||||||
|
if (Monitor.TryEnter(queue)) {
|
||||||
|
T retval;
|
||||||
|
|
||||||
|
Monitor.Enter(queue);
|
||||||
|
try {
|
||||||
|
retval = queue.Dequeue();
|
||||||
|
} finally {
|
||||||
|
Monitor.Exit(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
return retval;
|
||||||
|
}
|
||||||
|
|
||||||
|
return default(T);
|
||||||
|
}
|
||||||
|
|
||||||
|
public T Dequeue() {
|
||||||
|
T retval;
|
||||||
|
|
||||||
|
Monitor.Enter(queue);
|
||||||
|
try {
|
||||||
|
retval = queue.Dequeue();
|
||||||
|
} finally {
|
||||||
|
Monitor.Exit(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
return retval;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
internal class FileWatcherThread : IDisposable {
|
internal class FileWatcherThread : IDisposable {
|
||||||
private string? fullPath;
|
private string? fullPath;
|
||||||
private StreamReader? stream;
|
private StreamReader? stream;
|
||||||
@ -14,38 +57,21 @@ internal class FileWatcherThread : IDisposable {
|
|||||||
|
|
||||||
public string? FullPath => fullPath;
|
public string? FullPath => fullPath;
|
||||||
|
|
||||||
private Mutex mutex = new();
|
private SyncQueue<Entry> queue;
|
||||||
private Queue<Entry> entries = new();
|
|
||||||
|
public SyncQueue<Entry> Queue => queue;
|
||||||
|
|
||||||
public bool Finished => done;
|
public bool Finished => done;
|
||||||
|
|
||||||
public FileWatcherThread() {
|
public FileWatcherThread(SyncQueue<Entry> queue) {
|
||||||
|
this.queue = queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
public FileWatcherThread(string fullpath) {
|
public FileWatcherThread(string fullpath, SyncQueue<Entry> queue) {
|
||||||
|
this.queue = queue;
|
||||||
Start(fullpath);
|
Start(fullpath);
|
||||||
}
|
}
|
||||||
|
|
||||||
public bool IsEmpty {
|
|
||||||
get {
|
|
||||||
lock (mutex) {
|
|
||||||
return entries.Count == 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public Entry Dequeue() {
|
|
||||||
lock (mutex) {
|
|
||||||
return entries.Dequeue();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Enqueue(Entry e) {
|
|
||||||
lock (mutex) {
|
|
||||||
entries.Enqueue(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Signal() {
|
public void Signal() {
|
||||||
wait.Set();
|
wait.Set();
|
||||||
}
|
}
|
||||||
@ -84,7 +110,7 @@ internal class FileWatcherThread : IDisposable {
|
|||||||
// TODO: error handling on wrong entries
|
// TODO: error handling on wrong entries
|
||||||
}
|
}
|
||||||
if (entry != null) {
|
if (entry != null) {
|
||||||
Enqueue(entry);
|
queue.Enqueue(entry);
|
||||||
if (entry.Is(Events.Shutdown)) {
|
if (entry.Is(Events.Shutdown)) {
|
||||||
done = true;
|
done = true;
|
||||||
break;
|
break;
|
||||||
@ -109,7 +135,6 @@ internal class FileWatcherThread : IDisposable {
|
|||||||
thread.Join();
|
thread.Join();
|
||||||
thread = null;
|
thread = null;
|
||||||
}
|
}
|
||||||
entries.Clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void IDisposable.Dispose() {
|
void IDisposable.Dispose() {
|
||||||
@ -128,6 +153,10 @@ public class JournalStream {
|
|||||||
|
|
||||||
public PlayerJournal? Journal;
|
public PlayerJournal? Journal;
|
||||||
|
|
||||||
|
private SyncQueue<Entry> queue = new();
|
||||||
|
|
||||||
|
public SyncQueue<Entry> Queue => queue;
|
||||||
|
|
||||||
public JournalStream() {
|
public JournalStream() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -145,29 +174,11 @@ public class JournalStream {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
FileWatcherThread thread = new(path);
|
FileWatcherThread thread = new(path, queue);
|
||||||
streams[path] = thread;
|
streams[path] = thread;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
private void CleanupStreams() {
|
||||||
/// Processes all file watcher thread queues in the current thread. This must be
|
|
||||||
/// called from the main event loop sometimes to make sure all NewJournalEntry
|
|
||||||
/// delegates fire correctly.
|
|
||||||
/// </summary>
|
|
||||||
public void ProcessQueues() {
|
|
||||||
foreach (var stream in streams) {
|
|
||||||
if (stream.Value == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (stream.Value.IsEmpty) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Entry entry = stream.Value.Dequeue();
|
|
||||||
NewJournalEntry?.Invoke(entry);
|
|
||||||
}
|
|
||||||
|
|
||||||
var toRemove = streams
|
var toRemove = streams
|
||||||
.Where(x => (x.Value != null && x.Value.Finished) || x.Value == null)
|
.Where(x => (x.Value != null && x.Value.Finished) || x.Value == null)
|
||||||
.Select(x => x)
|
.Select(x => x)
|
||||||
@ -180,6 +191,31 @@ public class JournalStream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Processes all file watcher thread queues in the current thread. This must be
|
||||||
|
/// called from the main event loop sometimes to make sure all NewJournalEntry
|
||||||
|
/// delegates fire correctly.
|
||||||
|
/// </summary>
|
||||||
|
public void ProcessQueues() {
|
||||||
|
Entry? entry;
|
||||||
|
|
||||||
|
while ((entry = queue.TryDequeue()) != null) {
|
||||||
|
NewJournalEntry?.Invoke(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
CleanupStreams();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void ProcessQueuesWait() {
|
||||||
|
Entry? entry;
|
||||||
|
|
||||||
|
while ((entry = queue.Dequeue()) != null) {
|
||||||
|
NewJournalEntry?.Invoke(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
CleanupStreams();
|
||||||
|
}
|
||||||
|
|
||||||
public void Open() {
|
public void Open() {
|
||||||
if (watcher != null) {
|
if (watcher != null) {
|
||||||
return;
|
return;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user