一、CPU密集型任務(wù)開發(fā)指導(dǎo)
CPU密集型任務(wù)是指需要占用系統(tǒng)資源處理大量計算能力的任務(wù),需要長時間運行,這段時間會阻塞線程其它事件的處理,不適宜放在主線程進行。例如圖像處理、視頻編碼、數(shù)據(jù)分析等。
基于多線程并發(fā)機制處理CPU密集型任務(wù)可以提高CPU利用率,提升應(yīng)用程序響應(yīng)速度。
當(dāng)進行一系列同步任務(wù)時,推薦使用Worker;而進行大量或調(diào)度點較為分散的獨立任務(wù)時,不方便使用8個Worker去做負載管理,推薦采用TaskPool。接下來將以圖像直方圖處理以及后臺長時間的模型預(yù)測任務(wù)分別進行舉例。
使用TaskPool進行圖像直方圖處理
? 1. 實現(xiàn)圖像處理的業(yè)務(wù)邏輯。
? 2. 數(shù)據(jù)分段,將各段數(shù)據(jù)通過不同任務(wù)的執(zhí)行完成圖像處理。
創(chuàng)建Task,通過execute()執(zhí)行任務(wù),在當(dāng)前任務(wù)結(jié)束后,會將直方圖處理結(jié)果同時返回。
? 3. 結(jié)果數(shù)組匯總處理。
import taskpool from '@ohos.taskpool'; @Concurrent function imageProcessing(dataSlice: ArrayBuffer) { // 步驟1: 具體的圖像處理操作及其他耗時操作 return dataSlice; } function histogramStatistic(pixelBuffer: ArrayBuffer) { // 步驟2: 分成三段并發(fā)調(diào)度 let number = pixelBuffer.byteLength / 3; let buffer1 = pixelBuffer.slice(0, number); let buffer2 = pixelBuffer.slice(number, number * 2); let buffer3 = pixelBuffer.slice(number * 2); let task1 = new taskpool.Task(imageProcessing, buffer1); let task2 = new taskpool.Task(imageProcessing, buffer2); let task3 = new taskpool.Task(imageProcessing, buffer3); taskpool.execute(task1).then((ret: ArrayBuffer[]) => { // 步驟3: 結(jié)果處理 }); taskpool.execute(task2).then((ret: ArrayBuffer[]) => { // 步驟3: 結(jié)果處理 }); taskpool.execute(task3).then((ret: ArrayBuffer[]) => { // 步驟3: 結(jié)果處理 }); } @Entry @Component struct Index { @State message: string = 'Hello World' build() { Row() { Column() { Text(this.message) .fontSize(50) .fontWeight(FontWeight.Bold) .onClick(() => { let data: ArrayBuffer; histogramStatistic(data); }) } .width('100%') } .height('100%') } }
使用Worker進行長時間數(shù)據(jù)分析
本文通過某地區(qū)提供的房價數(shù)據(jù)訓(xùn)練一個簡易的房價預(yù)測模型,該模型支持通過輸入房屋面積和房間數(shù)量去預(yù)測該區(qū)域的房價,模型需要長時間運行,房價預(yù)測需要使用前面的模型運行結(jié)果,因此需要使用Worker。
? 1. DevEco Studio提供了Worker創(chuàng)建的模板,新建一個Worker線程,例如命名為“MyWorker”。


? 2. 在主線程中通過調(diào)用ThreadWorker的constructor()方法創(chuàng)建Worker對象,當(dāng)前線程為宿主線程。
import worker from '@ohos.worker';
const workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
? 3. 在宿主線程中通過調(diào)用onmessage()方法接收Worker線程發(fā)送過來的消息,并通過調(diào)用postMessage()方法向Worker線程發(fā)送消息。
例如向Worker線程發(fā)送訓(xùn)練和預(yù)測的消息,同時接收Worker線程發(fā)送回來的消息。
// 接收Worker子線程的結(jié)果
workerInstance.onmessage = function(e) {
// data:主線程發(fā)送的信息
let data = e.data;
console.info('MyWorker.ts onmessage');
// 在Worker線程中進行耗時操作
}
workerInstance.onerror = function (d) {
// 接收Worker子線程的錯誤信息
}
// 向Worker子線程發(fā)送訓(xùn)練消息
workerInstance.postMessage({ 'type': 0 });
// 向Worker子線程發(fā)送預(yù)測消息
workerInstance.postMessage({ 'type': 1, 'value': [90, 5] });
? 4. 在MyWorker.ts文件中綁定Worker對象,當(dāng)前線程為Worker線程。
import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
? 5. 在Worker線程中通過調(diào)用onmessage()方法接收宿主線程發(fā)送的消息內(nèi)容,并通過調(diào)用postMessage()方法向宿主線程發(fā)送消息。
如在Worker線程中定義預(yù)測模型及其訓(xùn)練過程,同時與主線程進行信息交互。
import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
// 定義訓(xùn)練模型及結(jié)果
let result;
// 定義預(yù)測函數(shù)
function predict(x) {
return result[x];
}
// 定義優(yōu)化器訓(xùn)練過程
function optimize() {
result = {};
}
// Worker線程的onmessage邏輯
workerPort.onmessage = function (e: MessageEvents) {
let data = e.data
// 根據(jù)傳輸?shù)臄?shù)據(jù)的type選擇進行操作
switch (data.type) {
case 0:
// 進行訓(xùn)練
optimize();
// 訓(xùn)練之后發(fā)送主線程訓(xùn)練成功的消息
workerPort.postMessage({ type: 'message', value: 'train success.' });
break;
case 1:
// 執(zhí)行預(yù)測
const output = predict(data.value);
// 發(fā)送主線程預(yù)測的結(jié)果
workerPort.postMessage({ type: 'predict', value: output });
break;
default:
workerPort.postMessage({ type: 'message', value: 'send message is invalid' });
break;
}
}
? 6. 在Worker線程中完成任務(wù)之后,執(zhí)行Worker線程銷毀操作。銷毀線程的方式主要有兩種:根據(jù)需要可以在宿主線程中對Worker線程進行銷毀;也可以在Worker線程中主動銷毀Worker線程。
在宿主線程中通過調(diào)用onexit()方法定義Worker線程銷毀后的處理邏輯。
// Worker線程銷毀后,執(zhí)行onexit回調(diào)方法
workerInstance.onexit = function() {
console.info("main thread terminate");
}
方式一:在宿主線程中通過調(diào)用terminate()方法銷毀Worker線程,并終止Worker接收消息。
// 銷毀Worker線程 workerInstance.terminate();
方式二:在Worker線程中通過調(diào)用close()方法主動銷毀Worker線程,并終止Worker接收消息。
// 銷毀線程 workerPort.close();
二、 I/O密集型任務(wù)開發(fā)指導(dǎo)
使用異步并發(fā)可以解決單次I/O任務(wù)阻塞的問題,但是如果遇到I/O密集型任務(wù),同樣會阻塞線程中其它任務(wù)的執(zhí)行,這時需要使用多線程并發(fā)能力來進行解決。
I/O密集型任務(wù)的性能重點通常不在于CPU的處理能力,而在于I/O操作的速度和效率。這種任務(wù)通常需要頻繁地進行磁盤讀寫、網(wǎng)絡(luò)通信等操作。此處以頻繁讀寫系統(tǒng)文件來模擬I/O密集型并發(fā)任務(wù)的處理。
? 1. 定義并發(fā)函數(shù),內(nèi)部密集調(diào)用I/O能力。
import fs from '@ohos.file.fs';
// 定義并發(fā)函數(shù),內(nèi)部密集調(diào)用I/O能力
@Concurrent
async function concurrentTest(fileList: string[]) {
// 寫入文件的實現(xiàn)
async function write(data, filePath) {
let file = await fs.open(filePath, fs.OpenMode.READ_WRITE);
await fs.write(file.fd, data);
fs.close(file);
}
// 循環(huán)寫文件操作
for (let i = 0; i < fileList.length; i++) {
write('Hello World!', fileList[i]).then(() =?> {
console.info(`Succeeded in writing the file. FileList: ${fileList[i]}`);
}).catch((err) => {
console.error(`Failed to write the file. Code is ${err.code}, message is ${err.message}`)
return false;
})
}
return true;
}
? 2. 使用TaskPool執(zhí)行包含密集I/O的并發(fā)函數(shù):通過調(diào)用execute()方法執(zhí)行任務(wù),并在回調(diào)中進行調(diào)度結(jié)果處理。示例中的filePath1和filePath2的獲取方式請參見獲取應(yīng)用文件路徑。
import taskpool from '@ohos.taskpool';
let filePath1 = ...; // 應(yīng)用文件路徑
let filePath2 = ...;
// 使用TaskPool執(zhí)行包含密集I/O的并發(fā)函數(shù)
// 數(shù)組較大時,I/O密集型任務(wù)任務(wù)分發(fā)也會搶占主線程,需要使用多線程能力
taskpool.execute(concurrentTest, [filePath1, filePath2]).then((ret) => {
// 調(diào)度結(jié)果處理
console.info(`The result: ${ret}`);
})
審核編輯 黃宇
-
cpu
+關(guān)注
關(guān)注
68文章
11253瀏覽量
223888 -
圖像處理
+關(guān)注
關(guān)注
29文章
1340瀏覽量
59353 -
線程
+關(guān)注
關(guān)注
0文章
508瀏覽量
20796 -
HarmonyOS
+關(guān)注
關(guān)注
80文章
2152瀏覽量
35858
發(fā)布評論請先 登錄
鴻蒙OS開發(fā)實例:【ArkTS類庫多線程CPU密集型任務(wù)TaskPool】
鴻蒙原生應(yīng)用開發(fā)-ArkTS語言基礎(chǔ)類庫多線程CPU密集型任務(wù)TaskPool
鴻蒙原生應(yīng)用開發(fā)-ArkTS語言基礎(chǔ)類庫多線程I/O密集型任務(wù)開發(fā)
請問如何在Python中實現(xiàn)多線程與多進程的協(xié)作?
CPU密集型任務(wù)開發(fā)指導(dǎo)
計算密集型的程序簡析
HarmonyOS如何使用異步并發(fā)能力進行開發(fā)
HarmonyOS CPU與I/O密集型任務(wù)開發(fā)指導(dǎo)
HarmonyOS語言基礎(chǔ)類庫開發(fā)指南上線啦!
I/O密集型虛擬機的域間通信優(yōu)化方法
FPGA執(zhí)行計算密集型任務(wù)性能表現(xiàn)及優(yōu)勢有哪些
云優(yōu)化性能:使用基于閃存的存儲的I/O密集型工作負載
HarmonyOS CPU與I/O密集型任務(wù)開發(fā)指導(dǎo)
評論