`
ideage
  • 浏览: 319484 次
社区版块
存档分类
最新评论

D语言的IOCP(完成端口)例子

阅读更多

在WindowsNT平台上,最具有伸缩性和吞吐量的网络服务器程序都使用了完成端口。为了在D中使用完成端口,我写了这个简单的例子。希望大家指正!

在DMD1.020-1.022,WindowsXP,编译测试通过。

没有使用std.socket,因为std里面的socket实现不能使用重叠IO。

D 代码
  1. // D Program Language IOCP    
  2. // write by ideage@gmail.com   
  3. // complie: dmd ic ws2_32.lib   
  4.   
  5. import std.c.windows.windows, std.c.windows.winsock;   
  6. import std.string, std.stdint, std.c.string, std.c.stdlib;   
  7. import std.stdio;   
  8. import std.thread;   
  9.   
  10.   
  11.   
  12.   
  13. alias HANDLE WSAEVENT;   
  14. alias OVERLAPPED WSAOVERLAPPED;   
  15. alias OVERLAPPED* LPWSAOVERLAPPED;   
  16. alias OVERLAPPED* POVERLAPPED, LPOVERLAPPED;   
  17.   
  18. struct GUID {             
  19. align(1):   
  20.     DWORD Data1;   
  21.     WORD  Data2;   
  22.     WORD  Data3;   
  23.     BYTE  Data4[8];   
  24. }   
  25.   
  26. struct WSAPROTOCOLCHAIN {   
  27.     int                       ChainLen;   
  28.     DWORD[7] ChainEntries;   
  29. }   
  30.   
  31. alias WSAPROTOCOLCHAIN* LPWSAPROTOCOLCHAIN;   
  32.   
  33. const WSAPROTOCOL_LEN = 255;   
  34. const ERROR_IO_PENDING = 997;   
  35.   
  36. struct WSAPROTOCOL_INFOW {   
  37.     DWORD dwServiceFlags1;   
  38.     DWORD dwServiceFlags2;   
  39.     DWORD dwServiceFlags3;   
  40.     DWORD dwServiceFlags4;   
  41.     DWORD dwProviderFlags;   
  42.     GUID ProviderId;   
  43.     DWORD dwCatalogEntryId;   
  44.     WSAPROTOCOLCHAIN ProtocolChain;   
  45.     int iVersion;   
  46.     int iAddressFamily;   
  47.     int iMaxSockAddr;   
  48.     int iMinSockAddr;   
  49.     int iSocketType;   
  50.     int iProtocol;   
  51.     int iProtocolMaxOffset;   
  52.     int iNetworkByteOrder;   
  53.     int iSecurityScheme;   
  54.     DWORD dwMessageSize;   
  55.     DWORD dwProviderReserved;   
  56.     WCHAR[WSAPROTOCOL_LEN+1] szProtocol;   
  57. }   
  58.   
  59. alias WSAPROTOCOL_INFOW* LPWSAPROTOCOL_INFOW;   
  60. const WSA_FLAG_OVERLAPPED = 0x01;   
  61.   
  62. struct WSABUF {   
  63.     uint  len;   
  64.     char* buf;   
  65. }   
  66.   
  67. struct SOCKADDR {   
  68.     ushort  sa_family;   
  69.     char[14] sa_data;   
  70. }   
  71. alias SOCKADDR* PSOCKADDR, LPSOCKADDR;   
  72.   
  73. alias WSABUF* LPWSABUF;   
  74. alias uint GROUP;   
  75.   
  76.   
  77. extern(Windows)   
  78. {      
  79.     alias void function(DWORD, DWORD, LPWSAOVERLAPPED, DWORD) LPWSAOVERLAPPED_COMPLETION_ROUTINE;   
  80.     SOCKET WSASocketW(intintint, LPWSAPROTOCOL_INFOW, GROUP, DWORD);       
  81.     bool GetQueuedCompletionStatus(HANDLE, PDWORD, PDWORD, LPOVERLAPPED*, DWORD);      
  82.     HANDLE CreateIoCompletionPort(HANDLE, HANDLE, DWORD, DWORD);       
  83.     int WSASend(SOCKET, LPWSABUF, DWORD, LPDWORD, DWORD, LPWSAOVERLAPPED, LPWSAOVERLAPPED_COMPLETION_ROUTINE);   
  84.     int WSARecv(SOCKET, LPWSABUF, DWORD, LPDWORD, LPDWORD, LPWSAOVERLAPPED, LPWSAOVERLAPPED_COMPLETION_ROUTINE);   
  85. }   
  86.   
  87. alias WSASocketW WSASocket;   
  88.   
  89. // Maximum Buffer Size   
  90. const int BUFFERSIZE =  128;   
  91.   
  92. enum IO_OPERATION   
  93. {   ACCEPT,READ,WRITE };   
  94.   
  95. struct PIO_DATA   
  96. {     
  97.     WSAOVERLAPPED               ol;   
  98.     char                        Buffer[BUFFERSIZE];   
  99.     WSABUF                      wsabuf;   
  100.     int                         nTotalBytes;   
  101.     int                         nSentBytes;   
  102.     IO_OPERATION                opCode;      
  103.     SOCKET                        activeSocket;    
  104. }   
  105.   
  106.   
  107.   
  108. int max_ThreadCount;   
  109. HANDLE iocpHandle = INVALID_HANDLE_VALUE;   
  110. SOCKET serverSock;   
  111.   
  112.   
  113. int WorkerThread (void * context)   
  114. {   
  115.     LPWSAOVERLAPPED lpol = null;   
  116.   PIO_DATA* lpIOContext  = null;    
  117.   WSABUF buffSend;   
  118.   uint dwRecvNumBytes = 0;   
  119.   uint dwSendNumBytes = 0;   
  120.   uint dwFlags = 0;   
  121.   uint dwIoSize = 0;   
  122.   bool bSuccess = false;   
  123.   int nRet = 0;   
  124.   
  125.    while( 1 ) {   
  126.        void * lpCompletionKey = null;   
  127.        bSuccess = GetQueuedCompletionStatus(iocpHandle, &dwIoSize,cast(LPDWORD)&lpCompletionKey,cast(LPOVERLAPPED *)&lpol, INFINITE);   
  128.        if( !bSuccess )   
  129.        {   
  130.           writefln("GetQueuedCompletionStatus() failed: %s.",GetLastError());   
  131.           break;   
  132.        }        
  133.        lpIOContext = cast(PIO_DATA * )lpol;         
  134.        if(dwIoSize == 0) //socket closed?   
  135.        {   
  136.            writefln("ClientSocket Disconnect!" );   
  137.            closesocket(lpIOContext.activeSocket);   
  138.           // delete lpIOContext;   
  139.            continue;   
  140.        }   
  141.   
  142.        if(lpIOContext.opCode == IO_OPERATION.READ) // a read operation complete   
  143.        {   
  144.                     char[] s = "Echo:" ~  std.string.toString(lpIOContext.wsabuf.buf);                         
  145.                     lpIOContext.wsabuf.buf = std.string.toStringz(s);   
  146.               lpIOContext.nTotalBytes  = lpIOContext.wsabuf.len;   
  147.               lpIOContext.nSentBytes   = 0;   
  148.               lpIOContext.opCode = IO_OPERATION.WRITE;   
  149.               dwFlags = 0;   
  150.               nRet = WSASend(   
  151.                             lpIOContext.activeSocket,   
  152.                             &lpIOContext.wsabuf, 1, &dwSendNumBytes,   
  153.                             dwFlags,   
  154.                             &(lpIOContext.ol) , null);   
  155.               if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) )    
  156.               {   
  157.                       writefln("1.WASSend Failed,Ret:%s." ,WSAGetLastError() );   
  158.                       closesocket(lpIOContext.activeSocket);                         
  159.                       continue;   
  160.               }   
  161.        }   
  162.        else if(lpIOContext.opCode == IO_OPERATION.WRITE) //a write operation complete   
  163.        {   
  164.               lpIOContext.nSentBytes  += dwIoSize;   
  165.               dwFlags = 0;   
  166.               if( lpIOContext.nSentBytes < lpIOContext.nTotalBytes ) {   
  167.                   lpIOContext.opCode = IO_OPERATION.WRITE;   
  168.                   // A Write operation has not completed yet, so post another   
  169.                   // Write operation to post remaining data.   
  170.                     buffSend.buf = lpIOContext.Buffer.ptr + lpIOContext.nSentBytes; //offset.   
  171.                   buffSend.len = lpIOContext.nTotalBytes - lpIOContext.nSentBytes;   
  172.                   nRet = WSASend (lpIOContext.activeSocket,   
  173.                                  &buffSend, 1, &dwSendNumBytes,   
  174.                                  dwFlags,   
  175.                                  &(lpIOContext.ol), null);   
  176.   
  177.                   if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) {   
  178.                       writefln("2.WASSend Failed,Ret:%s.",WSAGetLastError());   
  179.                       closesocket(lpIOContext.activeSocket);                         
  180.                       continue;   
  181.                   }   
  182.               }    
  183.               else    
  184.               {   
  185.                   // Write operation completed, so post Read operation.   
  186.                   lpIOContext.opCode = IO_OPERATION.READ;    
  187.                   dwRecvNumBytes = 0;   
  188.                   dwFlags = 0;   
  189.                   lpIOContext.wsabuf.buf = lpIOContext.Buffer.ptr,                     
  190.                   lpIOContext.ol.Internal = 0;   
  191.                   lpIOContext.ol.InternalHigh = 0;   
  192.                   lpIOContext.ol.Offset = 0;   
  193.                   lpIOContext.ol.OffsetHigh = 0;   
  194.                   lpIOContext.ol.hEvent = null;   
  195.                   lpIOContext.wsabuf.len = BUFFERSIZE;   
  196.                   nRet = WSARecv(   
  197.                                 lpIOContext.activeSocket,   
  198.                                 &lpIOContext.wsabuf, 1, &dwRecvNumBytes,   
  199.                                 &dwFlags,   
  200.                                 &lpIOContext.ol, null);   
  201.                   if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) {   
  202.                       writefln("1.WASRecv Failed,Ret:%s.",WSAGetLastError()  );   
  203.                       closesocket(lpIOContext.activeSocket);                         
  204.                       continue;   
  205.                   }    
  206.               }   
  207.        }   
  208.   }   
  209.        
  210.     return 0;   
  211. }   
  212.   
  213. int main ()   
  214. {   
  215.        
  216.    { // Init winsock2.2   
  217.         WSADATA wsaData;           
  218.         int retVal = -1;   
  219.         if( (retVal = WSAStartup(0x2020, &wsaData)) != 0 ) {   
  220.             writefln("WSAStartup Failed,Ret: %s" ,retVal);   
  221.             return 1;   
  222.         }   
  223.     }   
  224.     writefln("WSAStartup Init OK!" );   
  225.   
  226.     {  //Create socket   
  227.         serverSock = WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, null,0,WSA_FLAG_OVERLAPPED);   
  228.   
  229.         if( serverSock == INVALID_SOCKET ) {   
  230.             writefln("Server Socket Creation Failed,Ret:%s." , WSAGetLastError() );   
  231.             return 1;   
  232.         }       
  233.     }   
  234.        
  235.     writefln("Create socket OK!" );   
  236.   
  237.     {   //bind   
  238.         sockaddr_in service;   
  239.         service.sin_family = AF_INET;   
  240.         service.sin_addr.s_addr = htonl(INADDR_ANY);   
  241.         service.sin_port = htons(9001);   
  242.         int retVal = bind(serverSock,cast(sockaddr *)&service,service.sizeof);   
  243.         if( retVal == SOCKET_ERROR ) {   
  244.             writefln("Server Soket Bind Failed,Ret:%s." , WSAGetLastError() );;   
  245.             return 1;   
  246.         }   
  247.     }   
  248.        
  249.     writefln("Binding ServerSocket OK!" );   
  250.   
  251.     {   //listen   
  252.         int retVal = listen(serverSock, 8);   
  253.         if( retVal == SOCKET_ERROR ) {   
  254.             writefln("Server Socket Listen Failed,Ret:%s." , WSAGetLastError() );;   
  255.             return 1;   
  256.         }   
  257.     }   
  258.            
  259.         writefln("ServerSocket listen OK!" );   
  260.            
  261.         //create iocp & binding serverSocket to iocp   
  262.     {   // Create IOCP   
  263.           
  264.         max_ThreadCount = 1 * 2;   
  265.         iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE,null,0,max_ThreadCount);   
  266.         if (iocpHandle == null) {   
  267.             writefln("CreateIoCompletionPort() Failed,Ret:%s." , GetLastError() );   
  268.             return 1;               
  269.         }   
  270.                    
  271.         if (CreateIoCompletionPort(cast(HANDLE)serverSock,iocpHandle,0,0) == null){   
  272.             writefln("Binding Server Socket to IO Completion Port Failed,Ret:%s." , GetLastError() );   
  273.             return 1;       
  274.         }   
  275.     }   
  276.            
  277.         writefln("Create IOCP & binding ServerSocket to IOCP OK!" );   
  278.            
  279.     {          
  280.         Thread worker = new Thread(&WorkerThread, cast(void *)0);   
  281.         worker.start();   
  282.     }   
  283.        
  284.     writefln("Create Worker threads OK, Waitting Client Connect..." );   
  285.   
  286.     { //accept new connection   
  287.         while(1)   
  288.         {   
  289.             SOCKET clientsock = accept( serverSock, nullnull );   
  290.             if(clientsock == SOCKET_ERROR)  break;   
  291.             writefln("Client connected." );   
  292.   
  293.             { //diable buffer to improve performance   
  294.                 int nZero = 0;   
  295.                 setsockopt(clientsock, SOL_SOCKET, SO_SNDBUF, cast(char *)&nZero, nZero.sizeof);   
  296.             }   
  297.                            
  298.                         //binding ClientSocket to IOCP   
  299.             if (CreateIoCompletionPort(cast(HANDLE)clientsock,iocpHandle,0,0) == null){   
  300.                 writefln("Binding Client Socket to IO Completion Port Failed,Ret:%s." , GetLastError() );   
  301.                 closesocket(clientsock);   
  302.             }   
  303.             else {    
  304.             writefln("binding ClientSocket to IOCP OK!" );   
  305.             //post a recv request   
  306.                 PIO_DATA data;   
  307.                 data.opCode = IO_OPERATION.READ;   
  308.                 data.nTotalBytes = 0;   
  309.                 data.nSentBytes  = 0;   
  310.                 data.wsabuf.buf  = data.Buffer.ptr;   
  311.                 data.wsabuf.len  = data.Buffer.sizeof;   
  312.                 data.activeSocket = clientsock;   
  313.                 uint dwRecvNumBytes=0,dwFlags=0;   
  314.                 int nRet = WSARecv(clientsock,&data.wsabuf, 1, &dwRecvNumBytes,&dwFlags,&data.ol, null);   
  315.                    
  316.                 if(nRet == SOCKET_ERROR  && (ERROR_IO_PENDING != WSAGetLastError())){   
  317.                     writefln("3.WASRecv Failed,Ret:%s." , WSAGetLastError() );;   
  318.                     closesocket(clientsock);   
  319.                     //delete data;   
  320.                 }   
  321.                    
  322.                writefln("Post a recv request IOCP OK!" );   
  323.             }   
  324.         }   
  325.     }   
  326.         //close & Cleanup   
  327.     closesocket(serverSock);   
  328.     WSACleanup();   
  329.         return 0;          
  330. }   
分享到:
评论
9 楼 ahadf 2007-10-22  
"iocp的使用例子(哪怕是C方式的)在dsource上都找不到一个,仅此一条ideage的做法就值得称道."
---不想抬杠,不过偶手边的资料有5,6本书上都有类似的。网上一搜IOCP更是一大把。也许写个boost.asio D 语言简化版本还比较有价值。
8 楼 h_rain 2007-10-22  
不错了.都这个样子了,进行封装也很简单了.
如果需要一个好的封装,需要考虑一些方面:IOCP句柄与状态查询的封装;线程的完善处理("逻辑"暂停,启动...);Socket句柄的重叠IO操作封装等;
如果想要一个完整的封装,这些东西的处理其实还是很复杂的.
7 楼 tomqyp 2007-10-22  
iocp的使用例子(哪怕是C方式的)在dsource上都找不到一个,仅此一条ideage的做法就值得称道。
6 楼 ahadf 2007-10-22  
偶还是要说,又没用到D的特性,又没用到D的库,用C不挺好,为啥还要用D写呢?演示?这种程序书上网上一抓一大把,学C的比学D的人多多了,直接copy下来看的人指不定还多些。
5 楼 ideage 2007-10-22  
这个例子关键是演示了IOCP在D语言中的应用:过程,方法。

已经写的IOCP的类,用在公司的程序,不能公开
4 楼 dayn9 2007-10-22  
这只能说是批着D马甲的C程序吧?

oldrev老大,这点我持有异议。D也应该是多范式的语言,不能只有OO,OB,GP,否则会损失潜在用户,SP范式恰恰是用户最大的群体。
3 楼 ahadf 2007-10-21  
我还以为是一个IOCP的类。。。
2 楼 oldrev 2007-10-21  
这只能说是批着D马甲的C程序吧?
1 楼 tomqyp 2007-10-20  
收藏~

相关推荐

Global site tag (gtag.js) - Google Analytics