最近写了个小程序用到了C#4.0中的线程安全集合。想起很久以前用C#2.0开发的时候写后台windows服务,为了利用多线程实现生产者和消费者模型,经常要封装一些线程安全的容器,比如泛型队列和字典等等。下面就结合部分MS的源码和自己的开发经验浅显地分析一下如何实现线程安全容器以及实现线程安全容器容易产生的问题。
[h2]一、ArrayList[/h2]
在C#早期版本中已经实现了线程安全的ArrayList,可以通过下面的方式构造线程安全的数组列表:
var array = ArrayList.Synchronized(new ArrayList());
我们从Synchronized方法入手,分析它的源代码看是如何实现线程安全的:
Synchronized /// <summary>Returns an <see cref="T:System.Collections.ArrayList" /> wrapper that is synchronized (thread safe).</summary>
/// <returns>An <see cref="T:System.Collections.ArrayList" /> wrapper that is synchronized (thread safe).</returns>
/// <param name="list">The <see cref="T:System.Collections.ArrayList" /> to synchronize. </param>
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="list" /> is null. </exception>
/// <filterpriority>2</filterpriority>
[HostProtection(SecurityAction.LinkDemand, Synchronization = true)]
public static ArrayList Synchronized(ArrayList list)
{
if (list == null)
{
throw new ArgumentNullException("list");
}
return new ArrayList.SyncArrayList(list);
}
继续跟进去,发现SyncArrayList是一个
[b]继承[/b]自ArrayList的私有类,内部线程安全方法的实现经过分析,很多都是像下面这样lock(注意是lock_root对象而不是数组列表实例对象)一下完事:
lock (this._root)
有心的你可以查看SyncArrayList的源码:
SyncArrayList [Serializable]
private class SyncArrayList : ArrayList
{
private ArrayList _list;
private object _root;
public override int Capacity
{
get
{
int capacity;
lock (this._root)
{
capacity = this._list.Capacity;
}
return capacity;
}
set
{
lock (this._root)
{
this._list.Capacity = value;
}
}
}
public override int Count
{
get
{
int count;
lock (this._root)
{
count = this._list.Count;
}
return count;
}
}
public override bool IsReadOnly
{
get
{
return this._list.IsReadOnly;
}
}
public override bool IsFixedSize
{
get
{
return this._list.IsFixedSize;
}
}
public override bool IsSynchronized
{
get
{
return true;
}
}
public override object this[int index]
{
get
{
object result;
lock (this._root)
{
result = this._list[index];
}
return result;
}
set
{
lock (this._root)
{
this._list[index] = value;
}
}
}
public override object SyncRoot
{
get
{
return this._root;
}
}
internal SyncArrayList(ArrayList list)
: base(false)
{
this._list = list;
this._root = list.SyncRoot;
}
public override int Add(object value)
{
int result;
lock (this._root)
{
result = this._list.Add(value);
}
return result;
}
public override void AddRange(ICollection c)
{
lock (this._root)
{
this._list.AddRange(c);
}
}
public override int BinarySearch(object value)
{
int result;
lock (this._root)
{
result = this._list.BinarySearch(value);
}
return result;
}
public override int BinarySearch(object value, IComparer comparer)
{
int result;
lock (this._root)
{
result = this._list.BinarySearch(value, comparer);
}
return result;
}
public override int BinarySearch(int index, int count, object value, IComparer comparer)
{
int result;
lock (this._root)
{
result = this._list.BinarySearch(index, count, value, comparer);
}
return result;
}
public override void Clear()
{
lock (this._root)
{
this._list.Clear();
}
}
public override object Clone()
{
object result;
lock (this._root)
{
result = new ArrayList.SyncArrayList((ArrayList)this._list.Clone());
}
return result;
}
public override bool Contains(object item)
{
bool result;
lock (this._root)
{
result = this._list.Contains(item);
}
return result;
}
public override void CopyTo(Array array)
{
lock (this._root)
{
this._list.CopyTo(array);
}
}
public override void CopyTo(Array array, int index)
{
lock (this._root)
{
this._list.CopyTo(array, index);
}
}
public override void CopyTo(int index, Array array, int arrayIndex, int count)
{
lock (this._root)
{
this._list.CopyTo(index, array, arrayIndex, count);
}
}
public override IEnumerator GetEnumerator()
{
IEnumerator enumerator;
lock (this._root)
{
enumerator = this._list.GetEnumerator();
}
return enumerator;
}
public override IEnumerator GetEnumerator(int index, int count)
{
IEnumerator enumerator;
lock (this._root)
{
enumerator = this._list.GetEnumerator(index, count);
}
return enumerator;
}
public override int IndexOf(object value)
{
int result;
lock (this._root)
{
result = this._list.IndexOf(value);
}
return result;
}
public override int IndexOf(object value, int startIndex)
{
int result;
lock (this._root)
{
result = this._list.IndexOf(value, startIndex);
}
return result;
}
public override int IndexOf(object value, int startIndex, int count)
{
int result;
lock (this._root)
{
result = this._list.IndexOf(value, startIndex, count);
}
return result;
}
public override void Insert(int index, object value)
{
lock (this._root)
{
this._list.Insert(index, value);
}
}
public override void InsertRange(int index, ICollection c)
{
lock (this._root)
{
this._list.InsertRange(index, c);
}
}
public override int LastIndexOf(object value)
{
int result;
lock (this._root)
{
result = this._list.LastIndexOf(value);
}
return result;
}
public override int LastIndexOf(object value, int startIndex)
{
int result;
lock (this._root)
{
result = this._list.LastIndexOf(value, startIndex);
}
return result;
}
public override int LastIndexOf(object value, int startIndex, int count)
{
int result;
lock (this._root)
{
result = this._list.LastIndexOf(value, startIndex, count);
}
return result;
}
public override void Remove(object value)
{
lock (this._root)
{
this._list.Remove(value);
}
}
public override void RemoveAt(int index)
{
lock (this._root)
{
this._list.RemoveAt(index);
}
}
public override void RemoveRange(int index, int count)
{
lock (this._root)
{
this._list.RemoveRange(index, count);
}
}
public override void Reverse(int index, int count)
{
lock (this._root)
{
this._list.Reverse(index, count);
}
}
public override void SetRange(int index, ICollection c)
{
lock (this._root)
{
this._list.SetRange(index, c);
}
}
public override ArrayList GetRange(int index, int count)
{
ArrayList range;
lock (this._root)
{
range = this._list.GetRange(index, count);
}
return range;
}
public override void Sort()
{
lock (this._root)
{
this._list.Sort();
}
}
public override void Sort(IComparer comparer)
{
lock (this._root)
{
this._list.Sort(comparer);
}
}
public override void Sort(int index, int count, IComparer comparer)
{
lock (this._root)
{
this._list.Sort(index, count, comparer);
}
}
public override object[] ToArray()
{
object[] result;
lock (this._root)
{
result = this._list.ToArray();
}
return result;
}
public override Array ToArray(Type type)
{
Array result;
lock (this._root)
{
result = this._list.ToArray(type);
}
return result;
}
public override void TrimToSize()
{
lock (this._root)
{
this._list.TrimToSize();
}
}
}
ArrayList线程安全实现过程小结:
定义ArrayList的私有实现SyncArrayList,子类内部通过lock同步构造实现线程安全,在ArrayList中通过Synchronized对外间接调用子类。
[h2]二、Hashtable[/h2]
同样,在C#早期版本中实现了线程安全的Hashtable,它也是早期开发中经常用到的缓存容器,可以通过下面的方式构造线程安全的哈希表:
var ht = Hashtable.Synchronized(new Hashtable());
同样地,我们从Synchronized方法入手,分析它的源代码看是如何实现线程安全的:
Synchronized /// <summary>Returns a synchronized (thread-safe) wrapper for the <see cref="T:System.Collections.Hashtable" />.</summary>
/// <returns>A synchronized (thread-safe) wrapper for the <see cref="T:System.Collections.Hashtable" />.</returns>
/// <param name="table">The <see cref="T:System.Collections.Hashtable" /> to synchronize. </param>
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="table" /> is null. </exception>
/// <filterpriority>1</filterpriority>
[HostProtection(SecurityAction.LinkDemand, Synchronization = true)]
public static Hashtable Synchronized(Hashtable table)
{
if (table == null)
{
throw new ArgumentNullException("table");
}
return new Hashtable.SyncHashtable(table);
}
继续跟进去,发现SyncHashtable是一个
[b]继承[/b]自Hashtable和IEnumerable接口的私有类,内部线程安全方法的实现经过分析,很多都是像下面这样lock(注意是lock哈希表的SyncRoot Object实例对象而不是哈希表实例)一下完事:
lock (this._table.SyncRoot)
贴一下SyncHashtable的源码:
[Serializable]
private class SyncHashtable : Hashtable, IEnumerable
{
protected Hashtable _table;
public override int Count
{
get
{
return this._table.Count;
}
}
public override bool IsReadOnly
{
get
{
return this._table.IsReadOnly;
}
}
public override bool IsFixedSize
{
get
{
return this._table.IsFixedSize;
}
}
public override bool IsSynchronized
{
get
{
return true;
}
}
public override object this[object key]
{
[TargetedPatchingOptOut("Performance critical to inline across NGen image boundaries")]
get
{
return this._table[key];
}
set
{
lock (this._table.SyncRoot)
{
this._table[key] = value;
}
}
}
public override object SyncRoot
{
get
{
return this._table.SyncRoot;
}
}
public override ICollection Keys
{
get
{
ICollection keys;
lock (this._table.SyncRoot)
{
keys = this._table.Keys;
}
return keys;
}
}
public override ICollection Values
{
get
{
ICollection values;
lock (this._table.SyncRoot)
{
values = this._table.Values;
}
return values;
}
}
internal SyncHashtable(Hashtable table)
: base(false)
{
this._table = table;
}
internal SyncHashtable(SerializationInfo info, StreamingContext context)
: base(info, context)
{
this._table = (Hashtable)info.GetValue("ParentTable", typeof(Hashtable));
if (this._table == null)
{
throw new SerializationException(Environment.GetResourceString("Serialization_InsufficientState"));
}
}
[SecurityCritical]
public override void GetObjectData(SerializationInfo info, StreamingContext context)
{
if (info == null)
{
throw new ArgumentNullException("info");
}
lock (this._table.SyncRoot)
{
info.AddValue("ParentTable", this._table, typeof(Hashtable));
}
}
public override void Add(object key, object value)
{
lock (this._table.SyncRoot)
{
this._table.Add(key, value);
}
}
public override void Clear()
{
lock (this._table.SyncRoot)
{
this._table.Clear();
}
}
[TargetedPatchingOptOut("Performance critical to inline across NGen image boundaries")]
public override bool Contains(object key)
{
return this._table.Contains(key);
}
[TargetedPatchingOptOut("Performance critical to inline across NGen image boundaries")]
public override bool ContainsKey(object key)
{
return this._table.ContainsKey(key);
}
public override bool ContainsValue(object key)
{
bool result;
lock (this._table.SyncRoot)
{
result = this._table.ContainsValue(key);
}
return result;
}
public override void CopyTo(Array array, int arrayIndex)
{
lock (this._table.SyncRoot)
{
this._table.CopyTo(array, arrayIndex);
}
}
public override object Clone()
{
object result;
lock (this._table.SyncRoot)
{
result = Hashtable.Synchronized((Hashtable)this._table.Clone());
}
return result;
}
IEnumerator IEnumerable.GetEnumerator()
{
return this._table.GetEnumerator();
}
public override IDictionaryEnumerator GetEnumerator()
{
return this._table.GetEnumerator();
}
public override void Remove(object key)
{
lock (this._table.SyncRoot)
{
this._table.Remove(key);
}
}
public override void OnDeserialization(object sender)
{
}
internal override KeyValuePairs[] ToKeyValuePairsArray()
{
return this._table.ToKeyValuePairsArray();
}
}
Hashtable线程安全实现过程小结:
定义Hashtable的私有实现SyncHashtable,子类内部通过lock同步构造实现线程安全,在Hashtable中通过Synchronized对外间接调用子类。
[h2]三、4.0中的线程安全容器[/h2]
[h2]
1、ConcurrentQueue[/h2]
从上面的实现分析来说,封装一个线程安全的容器看起来并不是什么难事,除了对线程安全容器的
[b]异常处理[/b]心有余悸,其他的似乎按步就班就可以了,不是吗?也许还有更高明的实现吧?
在4.0中,多了一个System.Collections.Concurrent命名空间,怀着忐忑的心情查看C#4.0其中的一个线程安全集合
ConcurrentQueue的源码,发现它继承自
IProducerConsumerCollection<T>, IEnumerable<T>, ICollection, IEnumerable接口,内部实现线程安全的时候,通过SpinWait和通过互锁构造(Interlocked)及SpinWait封装的
[b]Segment[/b],间接实现了线程安全。Segment的实现比较复杂,和线程安全密切相关的方法就是TryXXX那几个方法,源码如下:
private class Segment
{
internal T[] m_array;
private int[] m_state;
private ConcurrentQueue<T>.Segment m_next;
internal readonly long m_index;
private int m_low;
private int m_high;
internal ConcurrentQueue<T>.Segment Next
{
get
{
return this.m_next;
}
}
internal bool IsEmpty
{
get
{
return this.Low > this.High;
}
}
internal int Low
{
get
{
return Math.Min(this.m_low, 32);
}
}
internal int High
{
get
{
return Math.Min(this.m_high, 31);
}
}
internal Segment(long index)
{
this.m_array = new T[32];
&n