• 需要用到jq命令,jq是一个json解析工具:jq官网
  • 注意应用权限对应的部门或者用户是否正确,不然会发送失败,可以选择只按部门或用户发送
  • token需要缓存的话,需redis支持,然后去掉redis_token()和redis_token()代码的注释,把第20行token变量代码加上注释[/c-alert]

安装jq命令

  • centos
yum install jq -y
  • ubuntu
apt-get install jq -y

脚本如下:

#!/bin/bash
# 发送消息到企业微信应用里面
# 企业id
    id="xxxxxxxxxxxxxxxxxx"
# 应用id
    agentid="xxxxxxx"
# 应用secret
    secret="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
# 部门id
    toparty=x
# 用户id [也就是用户账号,多个用户用|符号分开]
    touser=""

# API接口
    url="https://qyapi.weixin.qq.com/cgi-bin"

# 获取token [这里token没做缓存,如果频繁调用gettoken接口,会受到频率拦截,官方默认token值有效时间2小时]
    token=`curl -s "$url/gettoken?corpid=$id&corpsecret=$secret"|jq -r .access_token`

# 获取token写入Redis缓存
#function get_token(){
#    token=`curl -s "$url/gettoken?corpid=$id&corpsecret=$secret"|jq -r .access_token`
#    redis-cli set token $token >/dev/null 2>&1
#    redis-cli expire token 3000 >/dev/null 2>&1
#}

# 从Redis缓存读取token
#function redis_token(){
#    ttl_token=`redis-cli ttl token|awk '{print $1}'`
#    if [ "$ttl_token" -le "-2" ]
#    then
#        get_token
#    else
#        token=`redis-cli get token`
#        test_token=`curl -s "$url/agent/get?access_token=$token&agentid=$agentid"|jq -r .errcode`
#        if [ "$test_token" -eq "0" ]
#        then
#            sleep 0.1
#        elif [ "$test_token" -ne "0" ]
#            get_token
#        fi
#    fi
#}
#redis_token

# 发送消息参数
    part="message/send?access_token=$token"

# 上传临时素材参数
    upload="media/upload?access_token=$token&type=$1"

# 执行提示
function tips(){
    code=`jq -r .errcode`
    if [ "$code" == "0" ]
    then 
        echo -e "\033[32mSend successfully\033[0m"
    else
        echo -e "\033[31mSend fail,errcode:$code\033[0m"
    fi
}

# 判断输入文件路径
function file_dir(){
    while true
    do
        read -p "请输入[ $1 ]的路径:" f
        [[ -f $f ]] && break || echo -e "\033[31m请输入正确的 $1 路径!\033[0m"
    done
    upload_file_dir="$f"
  
}

# 上传临时素材
function upload_file() {
    # 发送的文件路径
    file_dir $1
  
    # post 参数
    upload_post="-H 'Content-Type:multipart/form-data' -F 'filename=@$upload_file_dir;type=application/octet-stream' $url/$upload$1"
  
    # 获取临时素材id
    get_upload_part=`curl -s -X POST $upload_post`
    upload_status=`echo "$get_upload_part"|jq -r .errcode`
    if [[ $upload_status == 0 ]];then media_id=`echo "$get_upload_part"|jq -r .media_id`;else echo -e "\033[31mFailed to upload file [$f]\033[0m";exit;fi
}

# 发送文本消息 [这里按部门发送toparty]
function send_t(){
    read -p "输入文本内容:" t
    textmsg='{"touser":"'$touser'","toparty":"'$toparty'","msgtype":"'text'","agentid":"'$agentid'","'text'":{"content":"'$t'"},"safe":0}'
    curl -s -X POST -d "$textmsg"  "$url/$part"|tips
}

# 发送文件图片消息 [这里按部门发送toparty]
function send(){
    msg='{"touser":"'$touser'","toparty":"'$toparty'","msgtype":"'$1'","agentid":"'$agentid'","'$1'":{"media_id":"'$media_id'"},"safe":0}'
    curl -s -X POST -d "$msg" "$url/$part"|tips
}

# 执行脚本
echo "
1、文本消息
2、文件消息
3、图片消息
"
read -p "选择发送消息类型: " x
case $x in
1)
    send_t
;;
2)
    upload_file file && send file
;;
3)
    upload_file image && send image
;;
*)
    echo "输入错误,执行结束"
;;
esac

Python版本:

import urllib.request
import json
import requests
import logging
import os
import sys
import getopt

class Qywx():
 '''
 #-----------发送企业微信的消息格式------------
 #图片(image):1MB,支持JPG,PNG格式
 #语音(voice):2MB,播放长度不超过60s,支持AMR格式
 #视频(video):10MB,支持MP4格式
 #普通文件(file):20MB
 #--------------------------------
 '''
 def __init__(self,corpid='yourid',corpsecret='yoursecret',is_log=True,log_path="qywx.log",log_level=0):
  """初始化,需要传入企业ID和密钥,在企业微信的页面上有显示"""
  url="https://qyapi.weixin.qq.com"
  self.corpid=corpid
  self.corpsecret=corpsecret
  self.is_log=is_log
  self.log_path=log_path
  self.log_level=log_level
  if is_log==True:
     LOG_FORMAT = "%(asctime)s - %(levelname)s: %(message)s"
     DATE_FORMAT = "%m/%d/%Y %H:%M:%S %p"
     if log_level==0:
        logging.basicConfig(level=logging.INFO,filename=log_path,format=LOG_FORMAT, datefmt=DATE_FORMAT)
     else:
        logging.basicConfig(level=logging.ERROR,filename=log_path,format=LOG_FORMAT, datefmt=DATE_FORMAT)

 def send_message(self,msg,msgtype,agid):
  upload_token=self.get_upload_token(self.corpid,self.corpsecret)
  if msgtype=="text":
     data=self.msg_messages(msg,agid,msgtype='text',msgid="content")
  elif msgtype=="image":
     media_id=self.get_media_ID(msg,upload_token,msgtype="image")
     data=self.msg_messages(media_id,agid,msgtype='image',msgid="media_id")
  elif msgtype=="voice":
     media_id=self.get_media_ID(msg,upload_token,msgtype="voice")
     data=self.msg_messages(media_id,agid,msgtype='voice',msgid="media_id")
  elif msgtype=="video":
     media_id=self.get_media_ID(msg,upload_token,msgtype="video")
     data=self.msg_messages(media_id,agid,msgtype='video',msgid="media_id")
  elif msgtype=="file":
     media_id=self.get_media_ID(msg,upload_token,msgtype="file")
     data=self.msg_messages(media_id,agid,msgtype='file',msgid="media_id")
  else:
     raise Exception("msgtype参数错误,参数只能是image或text或voice或video或file")
  url="https://qyapi.weixin.qq.com"
  token=self.get_token(url,self.corpid,self.corpsecret)
  send_url = '%s/cgi-bin/message/send?access_token=%s' % (url,token)
  respone=urllib.request.urlopen(urllib.request.Request(url=send_url, data=data)).read()
  x = json.loads(respone.decode())['errcode']
  if x == 0:
     print ("{} 发送成功".format(msg))
     if self.is_log==True:
        logging.info("{} 发送成功".format(msg))
  else:
     print ("{} 发送失败".format(msg))
     if self.is_log==True:
        logging.info("{} 发送失败".format(msg))

 def send_msg_message(self,msg,agid=1000002):
  try:
    self.send_message(msg,'text',agid)
  except Exception as e:
    if self.is_log==True:
       logging.error(e)
 def send_image_message(self,path,agid=1000002):
  if path.endswith("jpg")==False and path.endswith("png")==False:
    raise Exception("图片只能为jpg或png格式")
  if os.path.getsize(path)>1048576:
    raise Exception("图片大小不能超过1MB")
  try:
    self.send_message(path,'image',agid)
  except Exception as e:
    if self.is_log==True:
       logging.error(e)
 def send_voice_message(self,path,agid=1000002):
  if path.endswith("amr")==False:
    raise Exception("语音文件只能为amr格式,并且不能大于2MB,不能超过60s")
  if os.path.getsize(path)>2097152:
    raise Exception("语音文件大小不能超过2MB,并且不能超过60s,只能为amr格式")
  try:
    self.send_message(path,'voice',agid)
  except Exception as e:
    if self.is_log==True:
       logging.error(e)
 def send_video_message(self,path,agid=1000002):
  if path.endswith("mp4")==False:
    raise Exception("视频文件只能为mp4格式,并且不能大于10MB")
  if os.path.getsize(path)>10485760:
    raise Exception("视频文件大小不能超过10MB,只能为mp4格式")
  try:
    self.send_message(path,'video',agid)
  except Exception as e:
    if self.is_log==True:
       logging.error(e)
 def send_file_message(self,path,agid=1000002):
  if os.path.getsize(path)>20971520:
    raise Exception("文件大小不能超过20MB")
  try:
    self.send_message(path,'file',agid)
  except Exception as e:
    if self.is_log==True:
       logging.error(e)

 def get_token(self,url, corpid, corpsecret):
  token_url = '%s/cgi-bin/gettoken?corpid=%s&corpsecret=%s' % (url, corpid, corpsecret)
  token = json.loads(urllib.request.urlopen(token_url).read().decode())['access_token']
  return token
 def get_upload_token(self,corid,corsec):
  gurl="https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}".format(corid,corsec)
  r=requests.get(gurl)
  dict_result=(r.json())
  return dict_result['access_token']

 def get_media_ID(self,path,token,msgtype="image"):
  """上传资源到企业微信的存储上,msgtype有image,voice,video,file"""
  media_url = "https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token={}&type={}".format(token,msgtype)
  with open(path, 'rb') as f:
    files = {msgtype: f}
    r = requests.post(media_url, files=files)
    re = json.loads(r.text)
    return re['media_id']

 def msg_messages(self,msg,agid,msgtype='text',msgid="content"):
  """
  msgtype有text,image,voice,video,file;如果msgytpe为text,msgid为content,如果是其他,msgid为media_id。
  msg为消息的实际内容,如果是文本消息,则为字符串,如果是其他类型,则传递media_id的值。

  """
  values = {
        "touser": '@all',
        "msgtype": msgtype,
        "agentid": agid,
        msgtype: {msgid: msg},
        "safe": 0
        }
  msges=(bytes(json.dumps(values), 'utf-8'))
  return msges

def usage():
  str="""
  ********************调用格式如下**********************
  发送文本消息:
  python qywx.py -t text -m 你要发送的消息
  发送文本消息,指定要发送的应用的ID: 
  python qywx.py -t text -m 你要发送的消息 -a 1000005 
  发送图片消息: 
  python qywx.py -t image -m 图片的全路径 
  发送图片消息,指定要发送的应用的ID: 
  python qywx.py -t image -m 图片的全路径 -a 1000005 
  发送语音消息:
  python qywx.py -t voice -m 语音的路径 
  发送视频消息: 
  python qywx.py -t video -m 视频的路径 
  发送文件消息: 
  python qywx.py -t file -m 文件的路径 
  *********************注意事项***************************
  图片只能是png或jpg图片,大小不能超过1MB
  语音只能是amr格式,播放长度不能超过60s,大小不能超过2MB
  视频只能是mp4格式,大小不能超过10MB 
  普通文件大小不能超过20MB
  ******************************************************** 
  *********************例子*******************************
  python qywx.py -t image -m /root/test.png
  python qywx.py -t text -m 测试消息
  ********************************************************
  """
  print(str)
if __name__=="__main__":
   qywx=Qywx()  #Qywx(corpid='yourid',corpsecret='yoursecret',is_log=True,log_path='yourpath')
   try:
     options,args = getopt.getopt(sys.argv[1:],"ht:m:a:",["help","type=","message=","agentid="])
   except Except as e:
     print(e)
   a=1000002
   t=None
   m=None
   for name,value in options:
     if name in ("-h","--help"):
        usage()
     elif name in ("-t","--type"):
        t=value
     elif name in ("-m","--message"):
        m=value
     elif name in ("-a","--agentid"):
        a=value
     else:
        usage()
   if t and m:
     if t=="text":
        qywx.send_msg_message(m,agid=a)
     elif t=="image":
        qywx.send_image_message(m,agid=a)
     elif t=="voice":
        qywx.send_voice_message(m,agid=a)
     elif t=="video":
        qywx.send_video_message(m,agid=a)
     elif t=="file":
        qywx.send_file_message(m,agid=a)
     else:
        usage()
#-----------代码中直接调用---------------------
#   qywx=Qywx()  #Qywx(corpid='yourid',corpsecret='yoursecret',is_log=True,log_path='yourpath')
#   qywx.send_msg_message("test")
#   qywx.send_image_message("/root/test.png")
#   qywx.send_voice_message("/root/test.amr")
#   qywx.send_video_message("/root/test.mp4")
#   qywx.send_file_message("/root/test.mp3")
#   qywx.send_msg_message("test",agid=1000003)
#-----------代码中直接调用---------------------
最后修改:2022 年 04 月 05 日
如果觉得我的文章对你有用,请随意赞赏
END
本文作者:
文章标题:使用脚本发送消息到企业微信的应用
本文地址:https://blog.abcdl.cn/index.php/archives/8/
版权说明:若无注明,本文皆iStar Blog原创,转载请保留文章出处。