利用ManualResetEvent來控制子執行緒進行同步作業(以利用MultiThread重複打API為範例)
using FETC.eTag.BMS.BMSWeb.utility;
using FETC.eTag.BMS.DAL;
using log4net;
using log4net.Config;
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using RestSharp;
using System.Net;
using Newtonsoft.Json;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
namespace APIHelper
{
class Program
{
/*
* 作業名稱:{APIHelper}
*/
#region 用於設定只能開啟一個執行程式,不可多開
[DllImport("User32.dll")]
private static extern bool ShowWindowAsync(IntPtr hWnd, int cmdShow);
[DllImport("User32.dll")]
private static extern bool SetForegroundWindow(IntPtr hWnd);
#endregion
#region 全域變數
private static readonly ILog KLog = LogManager.GetLogger(typeof(Program));//使用log4net的Log
//static KLog log = null;//使用BMSWeb的Log
static UserInfo info = null;
private static List<ManualResetEvent> _waitHandles = null;
private static Dictionary<string, string> dicResult = null;
#endregion
static void Main(string[] args)
{
bool isSuccess = false;
try
{
#region Log初始化
//使用log4net的Log
string log4net = System.AppDomain.CurrentDomain.BaseDirectory;
XmlConfigurator.Configure(new FileInfo(log4net + "log4netconfig.xml"));
////使用BMSWeb的Log
//if (log == null)
//{
// log = new KLog();
//}
#endregion
Process instance = RunningInstance();
if (instance == null)
{
KLog.Info("開始排程");
////使用sqlConnection可以透過此下段程式取得ConnectionString
//String ConnString = ConfigurationManager.ConnectionStrings["BMSContext"].ConnectionString;
info = new UserInfo("admin");
//To do...
isSuccess = String.IsNullOrEmpty(MainProcess()) ? true : false;
}
else
{
// Make old one foreground
ShowWindowAsync(instance.MainWindowHandle, 1);
SetForegroundWindow(instance.MainWindowHandle);
KLog.Info("Process Running!");
Environment.Exit(0);
}
}
catch (Exception ex)
{
KLog.Error("系統錯誤", ex);
}
finally
{
KLog.Info("結束排程");
//Dispose BMSDBEntities
if (info != null && info.db != null)
{
info.db.Dispose();
}
Environment.Exit(isSuccess ? 0 : 1);
////使用BMSWeb的Log
//if (log != null)
//{
// log.tracer.Dispose();
//}
}
}
/// <summary> RunningInstance,用於判斷是否已有Process執行中
/// </summary>
/// <returns>Process</returns>
private static Process RunningInstance()
{
Process current = Process.GetCurrentProcess();
Process[] processes = Process.GetProcessesByName(current.ProcessName);
foreach (Process process in processes)
{
if (process.Id != current.Id)
{
if (Assembly.GetExecutingAssembly().Location.Replace("/", "\\") == current.MainModule.FileName)
{
return process;
}
}
}
return null;
}
private static string MainProcess()
{
bool isSuccess = false;
var info = new UserInfo("admin");
try
{
#region 主程式商業邏輯撰寫
var data = info.db.Database.SqlQuery<string>(@"
SELECT DISTINCT top 10 ID_NO FROM BMS_PRO_CONSUMER C WITH(NOLOCK)
INNER JOIN BMS_ACC_ACCOUNT ACC WITH(NOLOCK) ON C.CONSUMER_ID = ACC.ACC_OWNER_ID
WHERE LEN(C.ID_NO) = 8 AND ACC.ACC_STATUS_ID = 'R'
").ToList();
if (data != null)
{
KLog.Info("Data Count =>" + data.Count);
int defaultMaxworkerThreads = 0;
int defaultmaxIOThreads = 0;
int waitHandleIndex = 0;
int maxThreadsAtOnce = Convert.ToInt32(System.Configuration.ConfigurationManager.AppSettings["MaxThread"]);;
_waitHandles = new List<ManualResetEvent>();
ThreadPool.SetMaxThreads(maxThreadsAtOnce, maxThreadsAtOnce);
ThreadPool.GetMaxThreads(out defaultMaxworkerThreads, out defaultmaxIOThreads);
dicResult = new Dictionary<string, string>();
KLog.Info("Start Time =>" + DateTime.Now.ToString());
foreach (var itemIdNo in data)
{
/*ManualResetEvent(MRE,一個MRE可控制多個子執行緒繼續或中止流程)
* https://www.twblogs.net/a/5da6ad84bd9eee310d9fbf88
* https://www.itread01.com/content/1511591884.html
* ManualResetEvent初始參數:true=>mre預設狀態為Set(),遇到WaitOne不會中止執行緒, false=>mre預設狀態為ReSet(),遇到WaitOne會中止執行緒
* 1.Set()=>關閉WaitOne訊號,(ThreadPool撈出的)執行緒可以繼續執行.WaitOne()之後的程式
* 2.Reset()=>開啟WaitOne訊號,(ThreadPool撈出的)執行緒遇到WaitOne()會被中止
* 3.WaitOne()=>mre設定執行緒中止點,中斷執行緒到重新呼叫Set()。執行緒才會繼續往下執行WaitOne()後面的程式
*/
var resetEvent = new ManualResetEvent(false);//管理執行緒的狀態 true:預設訊號綠燈, false:預設訊號紅燈。
var controller = new MyJob(waitHandleIndex, itemIdNo);
ThreadPool.QueueUserWorkItem(new WaitCallback(OnThreadedDataRequest), controller);//每呼叫一次QueueUserWorkItem,就會從執行緒池喚醒一個正在休眠的執行緒並取出,ThreadPool預設上限為250個執行緒
waitHandleIndex++;
_waitHandles.Add(resetEvent);
if (_waitHandles.Count >= maxThreadsAtOnce)
{
/*宣告10個MRE後,要等所有執行緒都跑完OnThreadedDataRequest流程。之後清空MRE List,重新宣告10個MRE供執行緒綁定*/
System.Threading.WaitHandle.WaitAll(_waitHandles.ToArray<ManualResetEvent>());//等待所有執行緒都完成才往下一行,如果有被Reset()或WaitOne()就會一直卡在這行
_waitHandles = new List<ManualResetEvent>();
waitHandleIndex = 0;
}
}
KLog.Info("End Time =>" + DateTime.Now.ToString());
}
isSuccess = true; // 成功才設為true
#endregion
}
catch (Exception e)
{
KLog.Error(e);
return "發生異常錯誤:" + e.Message;
}
return "";
}
public static void OnThreadedDataRequest(object sender)
{
var controller = sender as MyJob;
if (controller == null) return;
controller.Execute();
_waitHandles[controller.WaitHandleIndex].Set();
/*waitHandles[controller.WaitHandleIndex].Set()。
*1.從_waitHandles[controller.WaitHandleIndex]取得ManualResetEvent,並替執行緒綁定ManualResetEvent。
*2.設定_waitHandles[controller.WaitHandleIndex]狀態為Set(),讓執行緒繼續執行.Set()後面的程式
*/
}
public class MyJob
{
public int WaitHandleIndex = 0;
public string idNo;
public MyJob(int waitHandleIndex, string paramIdNo)
{
WaitHandleIndex = waitHandleIndex;
// Add input parameters to the constructor to store input variables in class level variables
// for use by the Execute method below.
idNo = paramIdNo;
}
public void Execute()
{
try
{
DoWorker(idNo);
}
catch (Exception ex)
{
KLog.Error("Execute()_" + ex.ToString());
throw ex;
}
}
}
private static void DoWorker(object idNo)
{
var info = new UserInfo("admin");
string apiRes = "";
string strIddNo = (string)idNo;
CallWebApi(strIddNo, out apiRes);
KLog.Info(apiRes);
if (!string.IsNullOrEmpty(apiRes))
{
dicResult.Add(strIddNo, apiRes);
}
else
{
dicResult.Add(strIddNo, "NoReturnData");
}
}
private static bool CallWebApi(string idNo, out string apiReturn)
{
bool res = false;
string url = string.Format(@"https://data.gcis.nat.gov.tw/od/data/api/5F64D864-61CB-4D0D-8AD9-492047CC1EA6?$format=json&$filter=Business_Accounting_NO%20eq%20{0}&$skip=0&$top=50", idNo);
apiReturn = string.Empty;
try
{
var client = new RestClient(url);
var request = new RestRequest(Method.GET);
//request.AddHeader("accept", "application/json");
//request.AddHeader("content-type", "application/json");
//request.AddHeader("x-ibm-client-id", xIbmClientId);
//if (!string.IsNullOrEmpty(token))
//{
// request.AddHeader("token", token);
//}
//request.AddParameter("application/json", json, ParameterType.RequestBody);
ServicePointManager.ServerCertificateValidationCallback = new RemoteCertificateValidationCallback(CertificateCheck);
ServicePointManager.SecurityProtocol =
SecurityProtocolType.Ssl3 | SecurityProtocolType.Tls |
SecurityProtocolType.Tls11 | SecurityProtocolType.Tls12;
IRestResponse response = client.Execute(request);
if (response.StatusCode != HttpStatusCode.OK)
{
return res;
}
apiReturn = response.Content;
}
catch (Exception e)
{
throw e;
}
return res;
}
private static bool CertificateCheck(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
if (sslPolicyErrors == SslPolicyErrors.None)
{
return true;
}
var request = sender as HttpWebRequest;
if (request != null)
{
var result = request.RequestUri.Host.Contains("fetc");
return result;
}
return false;
}
}
}
多ManualResetEvent物件範例:https://www.itread01.com/content/1511591884.html
迴圈會內會由ThreadPool撈出一個新的Thread(t),並new一個ManualResetEvent(MRE)。MRE[i]會被指派給t控制執行緒,
並由MRE的Set(). ReSet(). WaitOne()來控制執行緒執行或暫停