1.項(xiàng)目介紹
本項(xiàng)目基于物聯(lián)量平臺(tái)遠(yuǎn)程的視頻監(jiān)控項(xiàng)目,通過(guò)MQTT協(xié)議實(shí)現(xiàn)兩個(gè)設(shè)備間的數(shù)據(jù)上報(bào)與訂閱。通過(guò)這個(gè)項(xiàng)目來(lái)演示,兩個(gè)MQTT設(shè)備如何互相訂閱,進(jìn)行消息流轉(zhuǎn)。在阿里云服務(wù)器上創(chuàng)建2個(gè)設(shè)備,分為為設(shè)備A和設(shè)備B;設(shè)備A負(fù)責(zé)采集本地?cái)z像頭畫面上傳,設(shè)備B負(fù)責(zé)接收設(shè)備A上傳的數(shù)據(jù)然后解析顯示出來(lái)。在阿里云服務(wù)器上需要配置云產(chǎn)品流轉(zhuǎn),讓設(shè)備A的數(shù)據(jù)上傳后自動(dòng)發(fā)送給設(shè)備B。這樣就完成了視頻畫面數(shù)據(jù)的流轉(zhuǎn)。不過(guò)因?yàn)榘⒗镌频淖畲髷?shù)據(jù)限制,每次最大發(fā)送10240字節(jié)的數(shù)據(jù)。
2.Linux 下 socket 編程連接阿里云物聯(lián)網(wǎng)平臺(tái)
#define SERVER_IP "asfdda.iot-as-mqtt.cn-shanghai.aliyuncs.com"http://服務(wù)器IP #define SERVER_PORT 1883 //端口號(hào) #define ClientID "aasfsaXABf.Imasfas|securemode=2,signmethod=hmacsha256,timestamp=1678323607797|" #define Username "ImsfeA&a1sadf8XABf" #define Password "15566ab496e81da728a3792ebe532fd4a3f4026a2b831df5af24da06"http://密文 #define SET_TOPIC "/sys/a14dXABf/ImagfA/thing/service/property/set" //訂閱 #define POST_TOPIC "/sys/a14sdf8XABf/ImdfeA/thing/event/property/post" //發(fā)布 int main() { pthread_t id; signal(SIGPIPE,SIG_IGN);/*忽略SIGPIPE信號(hào)*/ signal(SIGALRM,signal_func);/*鬧鐘信號(hào)*/ sockfd=socket(AF_INET,SOCK_STREAM,0); if(sockfd==-1) { printf("網(wǎng)絡(luò)套接字打開(kāi)失敗n"); return 0; } /*設(shè)置發(fā)送緩沖區(qū)大小*/ int nSendBuf=40*1024;//設(shè)置為 20K if(setsockopt(sockfd,SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int))) { printf("setsockopt(SO_SNDBUF) 設(shè)置錯(cuò)誤!n"); return 0; } /*域名解析*/ struct hostent *hostent; while(1) { hostent=gethostbyname(SERVER_IP); if(hostent==NULL) { printf("域名解析失敗n"); sleep(1); } else break; } printf("主機(jī)名:%sn",hostent->h_name); printf("協(xié)議類型:%sn",(hostent->h_addrtype == AF_INET)?"AF_INET":"AF_INET6"); printf("IP地址長(zhǎng)度:%dn",hostent->h_length); char *ip; for(int i=0;hostent->h_addr_list[i];i++) { ip=inet_ntoa(*(struct in_addr *)hostent->h_addr_list[i]); printf("ip=%sn",ip); } /*連接服務(wù)器*/ struct sockaddr_in addr; addr.sin_family=AF_INET;//IPV4 addr.sin_port=htons(SERVER_PORT);/*端口號(hào)*/ addr.sin_addr.s_addr=inet_addr(ip);//服務(wù)器IP if(connect(sockfd, (struct sockaddr *)&addr,sizeof(struct sockaddr_in))==0) { printf("服務(wù)器連接成功n"); while(1) { MQTT_Init(); /*登錄服務(wù)器*/ if(MQTT_Connect(ClientID,Username,Password)==0) { break; } sleep(1); printf("服務(wù)器連接中....n"); } printf("連接成功rn"); //訂閱物聯(lián)網(wǎng)平臺(tái)數(shù)據(jù) stat=MQTT_SubscribeTopic(SET_TOPIC,1,1); if(stat) { close(sockfd); printf("訂閱失敗rn"); exit(0); } printf("訂閱成功rn"); /*創(chuàng)建線程*/ pthread_create(&id, NULL,pth_work_func,NULL); pthread_detach(id);//設(shè)置分離屬性 alarm(3);//鬧鐘函數(shù),時(shí)間到達(dá)會(huì)產(chǎn)生SIGALRM信號(hào) int a=0; while(1) { sprintf(mqtt_message,"{"method":"thing.event.property.post","params":{"image":"阿里云物聯(lián)網(wǎng)平臺(tái)測(cè)試"}}"); MQTT_PublishData(POST_TOPIC,mqtt_message,0);//發(fā)布數(shù)據(jù) } } }

3.云產(chǎn)品流轉(zhuǎn)
云產(chǎn)品流轉(zhuǎn)文檔: https: //help. aliyun. com/document_detail/68677. html
3.1 什么是云產(chǎn)品流轉(zhuǎn)
設(shè)備基于 Topic 與物聯(lián)網(wǎng)平臺(tái)進(jìn)行通信時(shí), 您可以在數(shù)據(jù)流轉(zhuǎn)中, 編寫 SQL 對(duì) Topic 中的數(shù)據(jù)進(jìn)行處理, 并配置轉(zhuǎn)發(fā)規(guī)則將處理后的數(shù)據(jù)轉(zhuǎn)發(fā)到其他設(shè)備 Topic 或阿里云其他服務(wù)。

3.2 云產(chǎn)品流轉(zhuǎn)配置
1 .創(chuàng)建解析器



2.關(guān)聯(lián)數(shù)據(jù)源



3.關(guān)聯(lián)數(shù)據(jù)目的



4.編寫解析器腳本
解析器說(shuō)明文檔: https: //help. aliyun. com/document_detail/270931. html


格式示例:
//通過(guò) payload 函數(shù), 獲取設(shè)備上報(bào)的消息內(nèi)容, 并按照 JSON 格式轉(zhuǎn)換。
var data = payload("json"); //直接流轉(zhuǎn)物模型上報(bào)數(shù)據(jù)。
writeIotTopic(1000, topic, data);
topic 如下:


編輯好后發(fā)布即可, 至此, 阿里物聯(lián)網(wǎng)平臺(tái)配置完成。
4.代碼實(shí)現(xiàn)
4.1 設(shè)備 A 發(fā)送方
1 .USB 攝像頭應(yīng)用編程
采用 Linux 下 V4L2 框架初始化 USB 攝像頭, 采集圖像數(shù)據(jù)。
/*
攝像頭初始化
返回值:成功返回?cái)z像頭描述符,失敗返回負(fù)數(shù)
*/
int Video_Init(struct CAMERA *camera)
{
int video_fd;
int i=0;
/*1.打開(kāi)設(shè)備節(jié)點(diǎn)*/
video_fd=open(VIDEO_DEV,O_RDWR);
if(video_fd==-1)return -1;
/*2.設(shè)置攝像頭格式*/
struct v4l2_format format;
memset(&format,0,sizeof(format));
format.type=V4L2_BUF_TYPE_VIDEO_CAPTURE;//視頻捕獲格式
format.fmt.pix.width=320;
format.fmt.pix.height=240;
format.fmt.pix.pixelformat=V4L2_PIX_FMT_YUYV;//圖像數(shù)據(jù)格式y(tǒng)uyv
if(ioctl(video_fd,VIDIOC_S_FMT,&format))return -2;
printf("圖像尺寸:%d * %dn",format.fmt.pix.width,format.fmt.pix.height);
camera->image_w=format.fmt.pix.width;
camera->image_h=format.fmt.pix.height;
/*3.向內(nèi)核請(qǐng)求緩沖區(qū)*/
struct v4l2_requestbuffers reqbuf;
memset(&reqbuf,0,sizeof(reqbuf));
reqbuf.count=4;/*緩沖區(qū)個(gè)數(shù)*/
reqbuf.type=V4L2_BUF_TYPE_VIDEO_CAPTURE;//視頻捕獲格式
reqbuf.memory=V4L2_MEMORY_MMAP;/*內(nèi)存映射*/
if(ioctl(video_fd,VIDIOC_REQBUFS,&reqbuf))return -3;
printf("緩沖區(qū)個(gè)數(shù):%dn",reqbuf.count);
/*4.將緩沖區(qū)映射到進(jìn)程空間*/
struct v4l2_buffer quebuff;
for(i=0;imamp_buff[i]=mmap(NULL,quebuff.length,PROT_READ|PROT_WRITE,MAP_SHARED,video_fd,quebuff.m.offset);
printf("buff[%d]=%pn",i,camera->mamp_buff[i]);
camera->mmap_size=quebuff.length;
}
/*5.將緩沖區(qū)添加到采集隊(duì)列*/
for(i=0;i
2.圖片編碼處理
實(shí)時(shí)采集圖像數(shù)據(jù), 將圖片數(shù)據(jù)編碼為 jpg 圖像格式, 再進(jìn) base64 格式編碼。Base64 編碼是一種將二進(jìn)制數(shù)據(jù)轉(zhuǎn)換為 ASCII 字符的方法, 它使用 64 個(gè)字符來(lái)表示任意序列的二進(jìn)制數(shù)據(jù)。 Base64 編碼后的數(shù)據(jù)長(zhǎng)度會(huì)比原始二進(jìn)制數(shù)據(jù)略長(zhǎng), 但可以方便地被轉(zhuǎn)換為文本格式并在網(wǎng)絡(luò)上進(jìn)行傳輸。
3.base64 格式編碼
#include
static const char * base64char = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
/*
函數(shù)功能:將圖片編碼為base64格式
形參:bindata 源圖片數(shù)據(jù)
base64 編碼后的數(shù)據(jù)
binlength --源文件大小
返回值:返回編碼后的base64數(shù)據(jù)
*/
char * base64_encode( const unsigned char * bindata, char * base64, int binlength )
{
int i, j;
unsigned char current;
for ( i = 0, j = 0 ; i < binlength ; i += 3 )
? ? {
? ? ? ? current = (bindata[i] >> 2) ;
current &= (unsigned char)0x3F;
base64[j++] = base64char[(int)current];
current = ( (unsigned char)(bindata[i] << 4 ) ) & ( (unsigned char)0x30 ) ;
? ? ? ? if ( i + 1 >= binlength )
{
base64[j++] = base64char[(int)current];
base64[j++] = '=';
base64[j++] = '=';
break;
}
current |= ( (unsigned char)(bindata[i+1] >> 4) ) & ( (unsigned char) 0x0F );
base64[j++] = base64char[(int)current];
current = ( (unsigned char)(bindata[i+1] << 2) ) & ( (unsigned char)0x3C ) ;
? ? ? ? if ( i + 2 >= binlength )
{
base64[j++] = base64char[(int)current];
base64[j++] = '=';
break;
}
current |= ( (unsigned char)(bindata[i+2] >> 6) ) & ( (unsigned char) 0x03 );
base64[j++] = base64char[(int)current];
current = ( (unsigned char)bindata[i+2] ) & ( (unsigned char)0x3F ) ;
base64[j++] = base64char[(int)current];
}
base64[j] = '';
return base64;
}
4. base64 格式解碼
/*
函數(shù)功能:base64格式數(shù)據(jù)解碼
形參:base64 base64格式數(shù)據(jù)
bindata 保存解碼成功的圖像數(shù)據(jù)
返回值:成功返回解碼的圖像大小
*/
static const char * base64char = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
int base64_decode( const char * base64, unsigned char * bindata )
{
int i, j;
unsigned char k;
unsigned char temp[4];
for ( i = 0, j = 0; base64[i] != '' ; i += 4 )
{
memset( temp, 0xFF, sizeof(temp) );
for ( k = 0 ; k < 64 ; k ++ )
? ? ? ? {
? ? ? ? ? ? if ( base64char[k] == base64[i] )
? ? ? ? ? ? ? ? temp[0]= k;
? ? ? ? }
? ? ? ? for ( k = 0 ; k < 64 ; k ++ )
? ? ? ? {
? ? ? ? ? ? if ( base64char[k] == base64[i+1] )
? ? ? ? ? ? ? ? temp[1]= k;
? ? ? ? }
? ? ? ? for ( k = 0 ; k < 64 ; k ++ )
? ? ? ? {
? ? ? ? ? ? if ( base64char[k] == base64[i+2] )
? ? ? ? ? ? ? ? temp[2]= k;
? ? ? ? }
? ? ? ? for ( k = 0 ; k < 64 ; k ++ )
? ? ? ? {
? ? ? ? ? ? if ( base64char[k] == base64[i+3] )
? ? ? ? ? ? ? ? temp[3]= k;
? ? ? ? }
? ? ? ? bindata[j++] = ((unsigned char)(((unsigned char)(temp[0] << 2))&0xFC)) |
? ? ? ? ? ? ? ? ((unsigned char)((unsigned char)(temp[1]>>4)&0x03));
if ( base64[i+2] == '=' )
break;
bindata[j++] = ((unsigned char)(((unsigned char)(temp[1] << 4))&0xF0)) |
? ? ? ? ? ? ? ? ((unsigned char)((unsigned char)(temp[2]>>2)&0x0F));
if ( base64[i+3] == '=' )
break;
bindata[j++] = ((unsigned char)(((unsigned char)(temp[2] << 6))&0xF0)) |
? ? ? ? ? ? ? ? ((unsigned char)(temp[3]&0x3F));
? ? }
? ? return j;
}
5.數(shù)據(jù)上報(bào)
Linux 下 socket 網(wǎng)絡(luò)編程, 連接阿里云服務(wù)器, 接入阿里云物聯(lián)網(wǎng)平臺(tái), 通過(guò) MQTT 協(xié)議實(shí)時(shí)上報(bào)數(shù)據(jù)。
4.2 設(shè)備 B 訂閱方
1 .數(shù)據(jù)獲取
Linux 下 socket 網(wǎng)絡(luò)編程, 連接阿里云服務(wù)器, 接入阿里云物聯(lián)網(wǎng)平臺(tái), 訂閱設(shè)備 A 端發(fā)送的消息。
2.數(shù)據(jù)解析
物聯(lián)網(wǎng)平臺(tái)下發(fā)消息格式為 JSON 格式, 進(jìn)行消息數(shù)據(jù)解析, 提取圖像數(shù)據(jù), 將圖像數(shù)據(jù)進(jìn)行 base64 格式解碼得到 JPG 圖片數(shù)據(jù)。
3.JPG 圖片解析
解析 JPG 圖片獲取到 RGB 顏色數(shù)據(jù)。
//顯示JPEG 編譯時(shí)加-ljpeg
int LCD_ShowJPEG(unsigned char *jpg_buffer,int size,struct ImageDecodingInfo *image_rgb)
{
struct jpeg_decompress_struct cinfo; //存放圖像的數(shù)據(jù)
struct jpeg_error_mgr jerr; //存放錯(cuò)誤信息
unsigned char *buffer;
unsigned int i,j;
unsigned int color;
static int written;
/*init jpeg壓縮對(duì)象錯(cuò)誤處理程序*/
cinfo.err = jpeg_std_error(&jerr); //初始化標(biāo)準(zhǔn)錯(cuò)誤,用來(lái)存放錯(cuò)誤信息
jpeg_create_decompress(&cinfo); //創(chuàng)建解壓縮結(jié)構(gòu)信息
jpeg_mem_src(&cinfo, (unsigned char *)jpg_buffer, size);
//jpeg_stdio_src(&cinfo, infile);
/*讀jpeg頭*/
jpeg_read_header(&cinfo, TRUE);
/*開(kāi)始解壓*/
jpeg_start_decompress(&cinfo);
#if 0
printf("JPEG圖片高度: %dn",cinfo.output_height);
printf("JPEG圖片寬度: %dn",cinfo.output_width);
printf("JPEG圖片顏色位數(shù)(字節(jié)單位): %dn",cinfo.output_components);
#endif
image_rgb->Height=cinfo.output_height;
image_rgb->Width=cinfo.output_width;
unsigned char *rgb_data=image_rgb->rgb;
/*為一條掃描線上的像素點(diǎn)分配存儲(chǔ)空間,一行一行的解碼*/
int row_stride = cinfo.output_width * cinfo.output_components;
buffer = (unsigned char *)malloc(row_stride);
//將圖片內(nèi)容顯示到framebuffer上,cinfo.output_scanline表示當(dāng)前行的位置,讀取數(shù)據(jù)是會(huì)自動(dòng)增加
i=0;
while(cinfo.output_scanline < cinfo.output_height)
? ? {
? ? ? ? //讀取一行的數(shù)據(jù) ? ?
? ? ? ? jpeg_read_scanlines(&cinfo,&buffer,1);
? ? ? ? memcpy(rgb_data + i * cinfo.output_width * 3, buffer, row_stride);
? ? ? ? i++;
? ? } ? ?
? ? /*完成解壓,摧毀解壓對(duì)象*/
? ? jpeg_finish_decompress(&cinfo); //結(jié)束解壓
? ? jpeg_destroy_decompress(&cinfo); //釋放結(jié)構(gòu)體占用的空間
? ? /*釋放內(nèi)存緩沖區(qū)*/
? ? free(buffer);
? ? return 0; ? ? ?
}
4.GTK 窗口渲染
創(chuàng)建 GTK 窗口, 將原始圖片進(jìn)行縮放, 實(shí)時(shí)渲染圖像數(shù)據(jù)。
/*****************************BMP圖片放大縮小************************
**image_rgb --圖像結(jié)構(gòu)體信息
**int lcd_width,int lcd_hight --屏幕大小
**返回值:0 -- 成功; 其它值 -- 失敗
*********************************************************************/
int ZoomInandOut(struct ImageDecodingInfo *image_rgb,int lcd_width,int lcd_hight)
{
//printf("源圖片寬:%dn",image_rgb->Width);
//printf("源圖片高:%dn",image_rgb->Height);
u32 w=image_rgb->Width;
u32 h=image_rgb->Height;
u8 *src_rgb=image_rgb->rgb;//源圖片RGB值
unsigned long oneline_byte=w*3;//一行字節(jié)數(shù)
float zoom_count=0;
/*按比例縮放*/
zoom_count=(lcd_width/(w*1.0)) > (lcd_hight/(h*1.0)) ? (lcd_hight/(h*1.0)):(lcd_width/(w*1.0));
int new_w,new_h;
new_w=zoom_count*w;//新圖片寬
new_h=zoom_count*h;//新圖片高
//printf("新圖片寬:%dn",new_w);
//printf("新圖片高:%dn", new_h);
//printf("縮放比例:%.0f%%n",(new_w*1.0/w)*100);
unsigned long new_oneline_byte=new_w*3;
unsigned char *newbmp_buff=(unsigned char *)malloc(new_h*new_oneline_byte);//動(dòng)態(tài)分配新圖片RGB顏色數(shù)據(jù)緩沖區(qū)
if(newbmp_buff==NULL)
{
printf("[%s line %d]動(dòng)態(tài)分配空間失敗n",__FUNCTION__,__LINE__);
return -1;
}
memset(newbmp_buff, 0, new_h*new_oneline_byte);
/************************圖像處理算法(雙線性插值)*******************************/
int i,j;
for(i=0;irgb,newbmp_buff,new_h*new_oneline_byte);//新圖像RGB數(shù)據(jù)
image_rgb->Width=new_w;//新圖像寬
image_rgb->Height=new_h;//新圖像高
free(newbmp_buff);
return 0;
}
4.3 項(xiàng)目效果


審核編輯黃宇
-
物聯(lián)網(wǎng)
+關(guān)注
關(guān)注
2943文章
47652瀏覽量
411793 -
阿里云
+關(guān)注
關(guān)注
3文章
1033瀏覽量
45593 -
MQTT
+關(guān)注
關(guān)注
5文章
730瀏覽量
24932
發(fā)布評(píng)論請(qǐng)先 登錄
基于阿里云HiTSDB搭建工業(yè)物聯(lián)網(wǎng)平臺(tái)實(shí)踐
物聯(lián)網(wǎng)之阿里云教程(1)——MQTT連接阿里云教程
基于鴻蒙Hi3861V100 MQTT協(xié)議 對(duì)接阿里云物聯(lián)網(wǎng)平臺(tái)
踏入物聯(lián)網(wǎng)第一篇——STM32F103開(kāi)發(fā)板接入阿里云IOT平臺(tái) 精選資料分享
微信小程序連接阿里云物聯(lián)網(wǎng)平臺(tái)云端API實(shí)現(xiàn)物聯(lián)操控
STM32F103板子是如何通過(guò)MQTT協(xié)議連接阿里云物聯(lián)網(wǎng)平臺(tái)的呢
阿里云物聯(lián)網(wǎng)平臺(tái)接入NodeMCU的方法
NodeMCU開(kāi)發(fā)板接入阿里云物聯(lián)網(wǎng)平臺(tái)和百度天工物聯(lián)網(wǎng)平臺(tái)的注意事項(xiàng)
M5311模塊MQTT協(xié)議連接阿里云物聯(lián)網(wǎng)平臺(tái)相關(guān)資料下載
NodeMCU是怎樣通過(guò)MQTT連接阿里云物聯(lián)網(wǎng)的
阿里云開(kāi)放物聯(lián)網(wǎng)技術(shù)開(kāi)發(fā)平臺(tái)
阿里物聯(lián)網(wǎng)平臺(tái)產(chǎn)品和設(shè)備的創(chuàng)建過(guò)程
微信小程序使用MQTT遠(yuǎn)程控制單片機(jī)——阿里云物聯(lián)網(wǎng)平臺(tái)
基于阿里云MQTT物聯(lián)網(wǎng)平臺(tái)視頻監(jiān)控(上)
【教程】必看!手把手教你學(xué)會(huì)MQTT工作模式下阿里云物聯(lián)網(wǎng)平臺(tái)的配置
基于阿里云MQTT物聯(lián)網(wǎng)平臺(tái)視頻監(jiān)控(下)
評(píng)論