Добавлять поток к пулу (подключать его к обработке запросов) можно с помощью следующей функции:
BOOL GetQueuedCompletionStatus( // хендл порта завершения ввода/вывода HANDLE CompletionPort, // количество переданных байт LPDWORD lpNumberOfBytes, // ключ завершения PULONG_PTR lpCompletionKey, // структура OVERLAPPED LPOVERLAPPED *lpOverlapped, // значение таймаута DWORD dwMilliseconds); |
Эта функция блокирует поток до тех пор, пока порт не передаст потоку пакет запроса или не истечет таймаут.
Поместить пакет запроса в порт можно с помощью функции PostQueuedCompletionStatus.
BOOL PostQueuedCompletionStatus( HANDLE CompletionPort, // хендл порта завершения ввода/вывода DWORD dwNumberOfBytesTransferred, // количество переданных байт ULONG_PTR dwCompletionKey, // ключ завершения LPOVERLAPPED lpOverlapped // структура OVERLAPPED); |
Пакет запроса не обязательно должен быть структурой OVERLAPPED или производной от нее [2].
Давайте соберем всю информацию воедино. Порт завершения – объект, организующий несколько очередей из клиентских запросов и потоков, их обрабатывающих. Поток добавляется в очередь ожидающих запрос потоков порта при вызове функции GetQueuedCompletionStatus. При поступлении запроса порт разблокирует первый поток в очереди ждущих потоков и передает ему этот запрос (в виде структуры OVERLAPPED и ключа завершения). Поток при этом перемещается в очередь активных потоков (число активных потоков увеличивается на 1). Предположим, у нас максимальное число активных потоков равно 1, тогда при поступлении следующего запроса другой поток из очереди ожидающих активирован не будет. После обработки клиентского запроса поток вновь вызывает GetQueuedCompletionStatus и ставится в начало списка ожидающих потоков. Почему поток ставится именно в начало списка? Дело в том, что потоки берутся из начала списка, и при низкой активности могут использоваться не все потоки. При этом стеки и контексты не используемых потоков могут быть выгружены на диск за ненадобностью.
Если в процессе обработки запроса поток обратился к блокирующей функции, число активных потоков уменьшается на 1, как если бы поток перешел снова в очередь ожидающих потоков. Это дает возможность при приходе следующего клиентского запроса задействовать следующий поток из очереди ожидающих. Когда первый поток закончит блокирующую операцию, число активных потоков превысит максимальное, и при следующем вызове функции GetQueuedCompletionStatus один из этих потоков заблокируется, а второй получит пакет запроса (если он имеется).
Очередь | Запись добавляется при: | Запись удаляется при: |
Список устройств, ассоциированных с портом | вызове CreateIoCompletionPort | закрытии хенда файла |
Очередь клиентских запросов (FIFO) | завершении асинхронной операции файла, ассоциированного с портом, или вызове функции PostQueuedCompletionStatus | передаче портом запроса потоку на обработку |
Очередь ожидающих потоков | вызове функции GetQueuedCompletionStatus | начале обработки клиентского запроса потоком |
Список работающих потоков | начале обработки клиентского запроса потоком | вызове потоком GetQueuedCompletionStatus или какую-либо блокирующей функции |
Список приостановленных потоков | вызове потоком какой-либо блокирующей функции | выходе потока из какой-либо блокирующей функции |
Таблица 1. Список очередей порта завершения ввода/вывода [1].
Недокументированные возможности порта и его низкоуровневое устройство
Как всегда это бывает у Microsoft, порт завершения обладает многими недокументированными возможностями:
У порта завершения ввода/вывода может быть имя, и соответственно, он доступен для других процессов. Совершенно непонятно, почему разработчики решили скрыть эту, на мой взгляд, нужную особенность порта. Имя можно задать в параметре ObjectAttributes функции NtCreateIoCompletion.
Вторая особенность вытекает из первой: с портом может быть связан дескриптор безопасности, который также задается в параметре ObjectAttributes функции NtCreateIoCompletion.
Открывается порт с помощью функции NtOpenIoCompletion. При вызове функции нужно указать имя порта и уровень доступа. В качестве уровня доступа можно указывать все стандартные и следующие специальные права [2] (таблица 2).
Символическое обозначение | Константа | Описание |
IO_COMPLETION_QUERY_STATE | 1 | Необходим для запроса состояния объекта "порт" |
IO_COMPLETION_MODIFY_STATE | 2 | Необходим для изменения состояния объекта "порт" |
Таблица 2.
У порта можно запрашивать количество необработанных запросов с помощью функции NtQueryIoCompletion. Хотя в [3] утверждается, что эта функция определяет, находится ли порт в сигнальном состоянии, на самом деле она возвращает количество клиентских запросов в очереди. Это довольно важная информация, которую почему-то опять решили от нас скрыть.
Давайте более детально рассмотрим, как создается и функционирует порт завершения ввода/вывода [4].
При создании порта функцией CreateIoCompletionPort вызывается внутренний сервис NtCreateIoCompletion. Объект "порт" представлен следующей структурой [5]:
typedef stuct _IO_COMPLETION{ KQUEUE Queue;} IO_COMPLETION; |
То есть, по существу, объект "порт завершения" является объектом "очередь исполнительной системы" (KQUEUE). Вот как представлена очередь:
typedef stuct _KQUEUE{ DISPATCHER_HEADER Header; LIST_ENTRY EnrtyListHead; //очередьпакетов DWORD CurrentCount; DWORD MaximumCount; LIST_ENTRY ThreadListHead; //очередьожидающихпотоков} KQUEUE; |
Итак, для порта выделяется память, и затем происходит его инициализация с помощью функции KeInitializeQueue. (все, что касается такого супернизкого устройства порта, взято из [4], остальное – из DDK и [3]).
Когда происходит связывание порта с объектом "файл", Win32-функция CreateIoCompletionPort вызывает NtSetInformationFile. Класс информации для этой функции устанавливается как FileCompletionInformation, а в качестве параметра FileInformation передается указатель на структуру IO_COMPLETION_CONTEXT [5] или FILE_COMPLETION_INFORMATION [3].
typedef struct _IO_COMPLETION_CONTEXT{ PVOID Port; PVOID Key;} IO_COMPLETION_CONTEXT;typedef struct _FILE_COMPLETION_INFORMATION { HANDLE IoCompletionHandle; ULONG CompletionKey;} FILE_COMPLETION_INFORMATION, *PFILE_COMPLETION_INFORMATION; |
Указатель на эту структуру заносится в поле CompletionConext структуры FILE_OBJECT (смещение 0x6C).
После завершения асинхронной операции ввода/вывода для ассоциированного файла диспетчер ввода/вывода проверяет поле CompletionConext и, если оно не равно 0, создает пакет запроса (из структуры OVERLAPPED и ключа завершения) и помещает его в очередь с помощью вызова KeInsertQueue. Когда поток вызывает функцию GetQueuedCompletionStatus, на самом деле вызывается функция NtRemoveIoCompletion. NtRemoveIoCompletion проверяет параметры и вызывает функцию KeRemoveQueue, которая блокирует поток, если в очереди отсутствуют запросы, или поле CurrentCount структуры KQUEUE больше или равно MaximumCount. Если запросы есть, и число активных потоков меньше максимального, KeRemoveQueue удаляет вызвавший ее поток из очереди ожидающих потоков и увеличивает число активных потоков на 1. При занесении потока в очередь ожидающих потоков поле Queue структуры KTHREAD (смещение 0xE0) устанавливается равным адресу очереди (порта завершения). Зачем это нужно? Когда вызываются функции блокировки потока (WaitForSingleObject и др.), планировщик проверяет это поле, и если оно не равно 0, вызывает функцию KeActivateWaiterQueue, которая уменьшает число активных потоков порта на 1. Когда поток пробуждается после вызова блокирующих функций, планировщик выполняет те же действия, только вызывает при этом функцию KeUnwaitThread, которая увеличивает счетчик активных потоков на 1.
Когда вы помещаете запрос в порт завершения функцией PostQueuedCompletionStatus, на самом деле вызывается функция NtSetIoCompletion, которая после проверки параметров и преобразования хендла порта в указатель, вызывает KeInsertQueue.
Итак, мы знаем, как работает порт завершения ввода/вывода, когда потоки добавляются в пул и когда удаляются. Но сколько потоков должно быть в пуле? В два раза больше, чем число процессоров. Это очень общая рекомендация, и для некоторых задач она не подходит. По большому счету имеется только два критерия, по которым можно определять, нужно создавать новый поток или нет. Эти критерии – загруженность процессора и число пакетов запросов. Если число пакетов превышает определенное количество, и загруженность процессора невысока, есть смысл создать новый поток. Если пакетов мало, или процессор занят более чем на 90 процентов, дополнительный поток создавать не следует. Удалять поток из пула нужно, если он давно не обрабатывал клиентские запросы (просто подсчитать, сколько раз GetQueuedCompletionStatus вернула управление по таймауту). При удалении потока нужно следить, чтобы закончились все асинхронные операции ввода/вывода, начатые этим потоком.
Надо сказать, что определение загруженности процессора, количества пакетов в очереди порта и наличия у потока незавершенных операций ввода/вывода – задачи не самые простые. Например, вы можете использовать WMI для определения загруженности процессора, но при этом не сможете определить, есть ли у потока незавершенные операции ввода/вывода. Ниже я приведу функции получения вышеперечисленных показателей только недокументированными способами (здесь используется заголовочный файл ntdll.h из [3]):
// Функция получения загруженности процессораdouble GetCPUUsage(){ #define Li2Double(x) ((double)((x).HighPart) * 4.294967296E9 \ + (double)((x).LowPart)) typedef NTSTATUS (NTAPI ZwQuerySystemInformation_t)( IN NT::SYSTEM_INFORMATION_CLASS SystemInformationClass, OUT PVOID SystemInformation, IN ULONG SystemInformationLength, OUT PULONG ReturnLength OPTIONAL ); static ZwQuerySystemInformation_t* ZwQuerySystemInformation = 0; if(!ZwQuerySystemInformation) { ZwQuerySystemInformation = (ZwQuerySystemInformation_t*)GetProcAddress( GetModuleHandle(_T("ntdll.dll")), _T("NtQuerySystemInformation")); } double dbIdleTime = 0; static NT::LARGE_INTEGER liOldIdleTime = {0, 0}; static NT::LARGE_INTEGER liOldSystemTime = {0, 0}; // Получаемчислопроцессоров NT::SYSTEM_BASIC_INFORMATION sysinfo = {0}; NT::NTSTATUS status = ZwQuerySystemInformation(NT::SystemBasicInformation, &sysinfo, sizeof sysinfo, 0); if(status != NO_ERROR) return -1; // Получаем системное времяNT::SYSTEM_TIME_OF_DAY_INFORMATION timeinfo = {0}; status = ZwQuerySystemInformation(NT::SystemTimeOfDayInformation, &timeinfo, sizeof timeinfo, 0); if(status!=NO_ERROR) return -1; // Получаемвремяпростоя NT::SYSTEM_PERFORMANCE_INFORMATION perfinfo = {0}; status = ZwQuerySystemInformation(NT::SystemPerformanceInformation, &perfinfo, sizeof perfinfo, 0); if(status != NO_ERROR) return -1;// если это первый вызов, значение вычислить нельзяif(liOldIdleTime.QuadPart != 0) { // Времяпростоя dbIdleTime = Li2Double(perfinfo.IdleTime) - Li2Double(liOldIdleTime); // Системноевремя const double dbSystemTime = Li2Double(timeinfo.CurrentTime) - Li2Double(liOldSystemTime); dbIdleTime = dbIdleTime / dbSystemTime; dbIdleTime = 100.0 - dbIdleTime * 100.0 / (double)sysinfo.NumberProcessors + 0.5; } // сохраняемполученныезначения liOldIdleTime = perfinfo.IdleTime;liOldSystemTime = timeinfo.CurrentTime; // Если это первый вызов, получаем загруженность CPU за последние // 200 милисекунд if(dbIdleTime == 0) { Sleep(200); dbIdleTime = GetCPUUsage();} return dbIdleTime;}// Возвращает true, если поток имеет незавершенные операции ввода/выводаbool HasThreadIoPending(HANDLE hThread = GetCurrentThread()){ typedef NTSTATUS (NTAPI ZwQueryInformationThread_t)( IN HANDLE ThreadHandle, IN NT::THREADINFOCLASS ThreadInformationClass, OUT PVOID ThreadInformation, IN ULONG ThreadInformationLength, OUT PULONG ReturnLength OPTIONAL ); static ZwQueryInformationThread_t* ZwQueryInformationThread = 0; if(!ZwQueryInformationThread) { ZwQueryInformationThread = (ZwQueryInformationThread_t*)GetProcAddress( GetModuleHandle(_T("ntdll.dll")), _T("NtQueryInformationThread")); } ULONG io = 0; ZwQueryInformationThread(hThread, NT::ThreadIsIoPending, &io, 4, 0);return io > 0;}// Возвращает количество необработанных запросов в очереди портаDWORD GetIoCompletionLen(HANDLE hIoPort){ typedef NTSTATUS (NTAPI ZwQueryIoCompletion_t)( IN HANDLE IoCompletionHandle, IN NT::IO_COMPLETION_INFORMATION_CLASS IoCompletionInformationClass, OUT PVOID IoCompletionInformation, IN ULONG IoCompletionInformationLength, OUT PULONG ResultLength OPTIONAL ); static ZwQueryIoCompletion_t* ZwQueryIoCompletion = 0; if(!ZwQueryIoCompletion) { ZwQueryIoCompletion = (ZwQueryIoCompletion_t*)GetProcAddress( GetModuleHandle(_T("ntdll.dll")), _T("NtQueryIoCompletion")); } NT::IO_COMPLETION_BASIC_INFORMATION ioinfo = {0}; DWORD dwRetLen = 0; ZwQueryIoCompletion(hIoPort, NT::IoCompletionBasicInformation, &ioinfo, sizeof ioinfo, &dwRetLen); return ioinfo.SignalState;} |
Как видите, не простое это дело – создавать эффективный пул потоков, однако кое-что ребята из Microsoft могут нам предложить. В Windows 2000 появились новые функции, которые полностью берут на себя всю черновую работу по созданию и удалению потоков в пуле. О них – следующий раздел.