[Serializable]
struct Message
{
public string Text;
}
class Test
{
IMailBox mail;
public Test()
{
mail = new ProcessMailBox("TMProcessTest",1024);
}
public void RunWriter()
{
Console.WriteLine("Writer started");
Message msg;
while(true)
{
msg.Text = Console.ReadLine();
if(msg.Text.Equals("exit"))
break;
mail.Content = msg;
}
}
public void RunReader()
{
Console.WriteLine("Reader started");
while(true)
{
Message msg = (Message)mail.Content;
Console.WriteLine(msg.Text);
}
}
[STAThread]
static void Main(string[] args)
{
Test test = new Test();
if(args.Length > 0)
test.RunWriter();
else
test.RunReader();
}
}
void Work1()
{
NonCriticalSection1();
Monitor.Enter(this);
try
{
CriticalSection();
}
finally
{
Monitor.Exit(this);
}
NonCriticalSection2();
}
void Work2()
{
NonCriticalSection1();
lock(this)
{
CriticalSection();
}
NonCriticalSection2();
}
public sealed class ThreadSemaphore : ISemaphore
{
private int counter;
private readonly int max;
public ThreadSemaphore() : this(0, int.Max) {}
public ThreadSemaphore(int initial) : this(initial, int.Max) {}
public ThreadSemaphore(int initial, int max)
{
this.counter = Math.Min(initial,max);
this.max = max;
}
public void Acquire()
{
lock(this)
{
counter--;
if(counter < 0 && !Monitor.Wait(this))
throw new SemaphoreFailedException();
}
}
public void Acquire(TimeSpan timeout)
{
lock(this)
{
counter--;
if(counter < 0 && !Monitor.Wait(this,timeout))
throw new SemaphoreFailedException();
}
}
public void Release()
{
lock(this)
{
if(counter >= max)
throw new SemaphoreFailedException();
if(counter < 0)
Monitor.Pulse(this);
counter++;
}
}
}
ThreadSemaphore s = new ThreadSemaphore(1);
void Work3()
{
NonCriticalSection1();
s.Acquire();
try
{
CriticalSection();
}
finally
{
s.Release();
}
NonCriticalSection2();
}
[DllImport("kernel32",EntryPoint="CreateSemaphore",
SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern uint CreateSemaphore(
SecurityAttributes auth, int initialCount,
int maximumCount, string name);
[DllImport("kernel32",EntryPoint="WaitForSingleObject",
SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern uint WaitForSingleObject(
uint hHandle, uint dwMilliseconds);
[DllImport("kernel32",EntryPoint="ReleaseSemaphore",
SetLastError=true,CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool ReleaseSemaphore(
uint hHandle, int lReleaseCount, out int lpPreviousCount);
[DllImport("kernel32",EntryPoint="CloseHandle",SetLastError=true,
CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool CloseHandle(uint hHandle);
public class ProcessSemaphore : ISemaphore, IDisposable
{
private uint handle;
private readonly uint interruptReactionTime;
public ProcessSemaphore(string name) : this(
name,0,int.MaxValue,500) {}
public ProcessSemaphore(string name, int initial) : this(
name,initial,int.MaxValue,500) {}
public ProcessSemaphore(string name, int initial,
int max, int interruptReactionTime)
{
this.interruptReactionTime = (uint)interruptReactionTime;
this.handle = NTKernel.CreateSemaphore(null, initial, max, name);
if(handle == 0)
throw new SemaphoreFailedException();
}
public void Acquire()
{
while(true)
{ //looped 0.5s timeout to make NT-blocked threads interruptable.
uint res = NTKernel.WaitForSingleObject(handle,
interruptReactionTime);
try {System.Threading.Thread.Sleep(0);}
catch(System.Threading.ThreadInterruptedException e)
{
if(res == 0)
{ //Rollback
int previousCount;
NTKernel.ReleaseSemaphore(handle,1,out previousCount);
}
throw e;
}
if(res == 0)
return;
if(res != 258)
throw new SemaphoreFailedException();
}
}
public void Acquire(TimeSpan timeout)
{
uint milliseconds = (uint)timeout.TotalMilliseconds;
if(NTKernel.WaitForSingleObject(handle, milliseconds) != 0)
throw new SemaphoreFailedException();
}
public void Release()
{
int previousCount;
if(!NTKernel.ReleaseSemaphore(handle, 1, out previousCount))
throw new SemaphoreFailedException();
}
#region IDisposable Member
public void Dispose()
{
if(handle != 0)
{
if(NTKernel.CloseHandle(handle))
handle = 0;
}
}
#endregion
}
[DllImport("Kernel32.dll",EntryPoint="CreateFileMapping",
SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern IntPtr CreateFileMapping(uint hFile,
SecurityAttributes lpAttributes, uint flProtect,
uint dwMaximumSizeHigh, uint dwMaximumSizeLow, string lpName);
[DllImport("Kernel32.dll",EntryPoint="MapViewOfFile",
SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern IntPtr MapViewOfFile(IntPtr hFileMappingObject,
uint dwDesiredAccess, uint dwFileOffsetHigh,
uint dwFileOffsetLow, uint dwNumberOfBytesToMap);
[DllImport("Kernel32.dll",EntryPoint="UnmapViewOfFile",
SetLastError=true,CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool UnmapViewOfFile(IntPtr lpBaseAddress);
public static MemoryMappedFile CreateFile(string name,
FileAccess access, int size)
{
if(size < 0)
throw new ArgumentException("Size must not be negative","size");
IntPtr fileMapping = NTKernel.CreateFileMapping(0xFFFFFFFFu,null,
(uint)access,0,(uint)size,name);
if(fileMapping == IntPtr.Zero)
throw new MemoryMappingFailedException();
return new MemoryMappedFile(fileMapping,size,access);
}
public MemoryMappedFileView CreateView(int offset, int size,
MemoryMappedFileView.ViewAccess access)
{
if(this.access == FileAccess.ReadOnly && access ==
MemoryMappedFileView.ViewAccess.ReadWrite)
throw new ArgumentException(
"Only read access to views allowed on files without write access",
"access");
if(offset < 0)
throw new ArgumentException("Offset must not be negative","size");
if(size < 0)
throw new ArgumentException("Size must not be negative","size");
IntPtr mappedView = NTKernel.MapViewOfFile(fileMapping,
(uint)access,0,(uint)offset,(uint)size);
return new MemoryMappedFileView(mappedView,size,access);
}
public byte ReadByte(int offset)
{
return Marshal.ReadByte(mappedView,offset);
}
public void WriteByte(byte data, int offset)
{
Marshal.WriteByte(mappedView,offset,data);
}
public int ReadInt32(int offset)
{
return Marshal.ReadInt32(mappedView,offset);
}
public void WriteInt32(int data, int offset)
{
Marshal.WriteInt32(mappedView,offset,data);
}
public void ReadBytes(byte[] data, int offset)
{
for(int i=0;i<data.Length;i++)
data[i] = Marshal.ReadByte(mappedView,offset+i);
}
public void WriteBytes(byte[] data, int offset)
{
for(int i=0;i<data.Length;i++)
Marshal.WriteByte(mappedView,offset+i,data[i]);
}
public object ReadDeserialize(int offset, int length)
{
byte[] binaryData = new byte[length];
ReadBytes(binaryData,offset);
System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter
= new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
System.IO.MemoryStream ms = new System.IO.MemoryStream(
binaryData,0,length,true,true);
object data = formatter.Deserialize(ms);
ms.Close();
return data;
}
public void WriteSerialize(object data, int offset, int length)
{
System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter
= new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
byte[] binaryData = new byte[length];
System.IO.MemoryStream ms = new System.IO.MemoryStream(
binaryData,0,length,true,true);
formatter.Serialize(ms,data);
ms.Flush();
ms.Close();
WriteBytes(binaryData,offset);
}
public sealed class ThreadMailBox : IMailBox
{
private object content;
private ThreadSemaphore empty, full;
public ThreadMailBox()
{
empty = new ThreadSemaphore(1,1);
full = new ThreadSemaphore(0,1);
}
public object Content
{
get
{
full.Acquire();
object item = content;
empty.Release();
return item;
}
set
{
empty.Acquire();
content = value;
full.Release();
}
}
}
public sealed class ProcessMailBox : IMailBox, IDisposable
{
private MemoryMappedFile file;
private MemoryMappedFileView view;
private ProcessSemaphore empty, full;
public ProcessMailBox(string name,int size)
{
empty = new ProcessSemaphore(name+".EmptySemaphore.MailBox",1,1);
full = new ProcessSemaphore(name+".FullSemaphore.MailBox",0,1);
file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.MailBox",
MemoryMappedFile.FileAccess.ReadWrite,size);
view = file.CreateView(0,size,
MemoryMappedFileView.ViewAccess.ReadWrite);
}
public object Content
{
get
{
full.Acquire();
object item;
try {item = view.ReadDeserialize();}
catch(Exception e)
{ //Rollback
full.Release();
throw e;
}
empty.Release();
return item;
}
set
{
empty.Acquire();
try {view.WriteSerialize(value);}
catch(Exception e)
{ //Rollback
empty.Release();
throw e;
}
full.Release();
}
}
#region IDisposable Member
public void Dispose()
{
view.Dispose();
file.Dispose();
empty.Dispose();
full.Dispose();
}
#endregion
}
public sealed class ThreadChannel : ThreadReliability, IChannel
{
private Queue queue;
private ThreadSemaphore empty, full;
public ThreadChannel(int size)
{
queue = Queue.Synchronized(new Queue(size));
empty = new ThreadSemaphore(size,size);
full = new ThreadSemaphore(0,size);
}
public void Send(object item)
{
try {empty.Acquire();}
catch(System.Threading.ThreadInterruptedException e)
{
DumpItem(item);
throw e;
}
queue.Enqueue(item);
full.Release();
}
public void Send(object item, TimeSpan timeout)
{
try {empty.Acquire(timeout);}
...
}
public object Receive()
{
full.Acquire();
object item = queue.Dequeue();
empty.Release();
return item;
}
public object Receive(TimeSpan timeout)
{
full.Acquire(timeout);
...
}
protected override void DumpStructure()
{
lock(queue.SyncRoot)
{
foreach(object item in queue)
DumpItem(item);
queue.Clear();
}
}
}
public sealed class ProcessChannel : ProcessReliability, IChannel, IDisposable
{
private MemoryMappedFile file;
private MemoryMappedFileView view;
private MemoryMappedQueue queue;
private ProcessSemaphore empty, full, mutex;
public ProcessChannel( int size, string name, int maxBytesPerEntry)
{
int fileSize = 64+size*maxBytesPerEntry;
empty = new ProcessSemaphore(name+".EmptySemaphore.Channel",size,size);
full = new ProcessSemaphore(name+".FullSemaphore.Channel",0,size);
mutex = new ProcessSemaphore(name+".MutexSemaphore.Channel",1,1);
file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.Channel",
MemoryMappedFile.FileAccess.ReadWrite,fileSize);
view = file.CreateView(0,fileSize,
MemoryMappedFileView.ViewAccess.ReadWrite);
queue = new MemoryMappedQueue(view,size,maxBytesPerEntry,true,0);
if(queue.Length < size || queue.BytesPerEntry < maxBytesPerEntry)
throw new MemoryMappedArrayFailedException();
}
public void Send(object item)
{
try {empty.Acquire();}
catch(System.Threading.ThreadInterruptedException e)
{
DumpItemSynchronized(item);
throw e;
}
try {mutex.Acquire();}
catch(System.Threading.ThreadInterruptedException e)
{
DumpItemSynchronized(item);
empty.Release();
throw e;
}
queue.Enqueue();
try {queue.WriteSerialize(item,0);}
catch(Exception e)
{
queue.RollbackEnqueue();
mutex.Release();
empty.Release();
throw e;
}
mutex.Release();
full.Release();
}
public void Send(object item, TimeSpan timeout)
{
try {empty.Acquire(timeout);}
...
}
public object Receive()
{
full.Acquire();
mutex.Acquire();
object item;
queue.Dequeue();
try {item = queue.ReadDeserialize(0);}
catch(Exception e)
{
queue.RollbackDequeue();
mutex.Release();
full.Release();
throw e;
}
mutex.Release();
empty.Release();
return item;
}
public object Receive(TimeSpan timeout)
{
full.Acquire(timeout);
...
}
protected override void DumpStructure()
{
mutex.Acquire();
byte[][] dmp = queue.DumpClearAll();
for(int i=0;i<dmp.Length;i++)
DumpItemSynchronized(dmp[i]);
mutex.Release();
}
#region IDisposable Member
public void Dispose()
{
view.Dispose();
file.Dispose();
empty.Dispose();
full.Dispose();
mutex.Dispose();
}
#endregion
}
public class ChannelForwarder : SingleRunnable
{
private IChannel source, target;
private readonly int envelope;
public ChannelForwarder(IChannel source,
IChannel target, bool autoStart, bool waitOnStop)
: base(true,autoStart,waitOnStop)
{
this.source = source;
this.target = target;
this.envelope = -1;
}
public ChannelForwarder(IChannel source, IChannel target,
int envelope, bool autoStart, bool waitOnStop)
: base(true,autoStart,waitOnStop)
{
this.source = source;
this.target = target;
this.envelope = envelope;
}
protected override void Run()
{ //NOTE: IChannel.Send is interrupt save and
//automatically dumps the argument.
if(envelope == -1)
while(running)
target.Send(source.Receive());
else
{
MessageEnvelope env;
env.ID = envelope;
while(running)
{
env.Message = source.Receive();
target.Send(env);
}
}
}
}
public class ChannelMultiplexer : MultiRunnable
{
private ChannelForwarder[] forwarders;
public ChannelMultiplexer(IChannel[] channels, int[] ids,
IChannel output, bool autoStart, bool waitOnStop)
{
int count = channels.Length;
if(count != ids.Length)
throw new ArgumentException("Channel and ID count mismatch.","ids");
forwarders = new ChannelForwarder[count];
for(int i=0;i<count;i++)
forwarders[i] = new ChannelForwarder(channels[i],
output,ids[i],autoStart,waitOnStop);
SetRunnables((SingleRunnable[])forwarders);
}
}
public class ChannelDemultiplexer : SingleRunnable
{
private HybridDictionary dictionary;
private IChannel input;
public ChannelDemultiplexer(IChannel[] channels, int[] ids,
IChannel input, bool autoStart, bool waitOnStop)
: base(true,autoStart,waitOnStop)
{
this.input = input;
int count = channels.Length;
if(count != ids.Length)
throw new ArgumentException("Channel and ID count mismatch.","ids");
dictionary = new HybridDictionary(count,true);
for(int i=0;i<count;i++)
dictionary.add(ids[i],channels[i]);
}
protected override void Run()
{ //NOTE: IChannel.Send is interrupt save and
//automatically dumps the argument.
while(running)
{
MessageEnvelope env = (MessageEnvelope)input.Receive();
IChannel channel = (IChannel)dictionary[env.ID];
channel.send(env.Message);
}
}
}
public class ChannelEventGateway : SingleRunnable
{
private IChannel source;
public event MessageReceivedEventHandler MessageReceived;
public ChannelEventGateway(IChannel source, bool autoStart,
bool waitOnStop) : base(true,autoStart,waitOnStop)
{
this.source = source;
}
protected override void Run()
{
while(running)
{
object c = source.Receive();
MessageReceivedEventHandler handler = MessageReceived;
if(handler != null)
handler(this,new MessageReceivedEventArgs(c));
}
}
}
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有