pip install wxpy -i "https://pypi.doubanio.com/simple/" pip install wechat_sender -i "https://pypi.doubanio.com/simple/"
from wxpy import *
bot = Bot()
bot.file_helper.send('hello world!')
print("ending")
from wxpy import *
bot = Bot()
# 获取所有好友
friends = bot.friends()
# 遍历输出好友名称
for friend in friends:
print(friend)
# 找到好友
friend = bot.friends.search('被单')[0]
print(friend)
friend.send("hello world!")
# 获取所有聊天群
groups = bot.groups()
for group in groups:
print(group)
# 找到目标群
group = groups.search("409")[0]
group.send("hello world!")
from wxpy import *
bot = Bot()
# 获取好友
my_friend = bot.friends().search('被单')[0]
# 搜索信息
messages = bot.messages.search(keywords='测试', sender=bot.self)
for message in messages:
print(message)
# 发送文本
my_friend.send('Hello, WeChat!')
# 发送图片
my_friend.send_image('my_picture.png')
# 发送视频
my_friend.send_video('my_video.mov')
# 发送文件
my_friend.send_file('my_file.zip')
# 以动态的方式发送图片
my_friend.send('@img@my_picture.png')
# 发送公众号
my_friend.send_raw_msg(
# 名片的原始消息类型
raw_type=42,
# 注意 `username` 在这里应为微信 ID,且被发送的名片必须为自己的好友
raw_content='<msg username="wxpy_bot" nickname="wxpy 机器人"/>'
)
# 消息接收监听器
@bot.register()
def print_others(msg):
# 输出监听到的消息
print(msg)
# 回复消息
msg.reply("hello world")
embed()
from wxpy import *
bot = Bot()
# 获取好友
dear = bot.friends().search('被单')[0]
# 注册获得个人的图灵机器人key 填入
tuling = Tuling(api_key='******')
# 使用图灵机器人自动与指定好友聊天
@bot.register(dear)
def reply_my_friend(msg):
print(msg)
tuling.do_reply(msg)
embed()
# login.py
from wxpy import *
from wechat_sender import *
bot = Bot()
friend = bot.friends().search('被单')[0]
listen(bot, token='test', receivers=[friend])
# sender.py coding: utf-8
from wechat_sender import Sender
sender = Sender(token='test')
sender.send('hello world!')
from wechat_sender import *
from wxpy import *
bot = Bot(qr_path="qr.png")
group = bot.groups().search('监控报警')[0]
print("微信登陆成功!进行监控报警功能!")
print(group)
#
listen(bot, token='test', receivers=[group])
import redis
from wechat_sender import *
sender = Sender(token='test', receivers='监控报警')
while true:
# do anything
sender.send(message=data)
# do anything
p.unsubscribe('cardniu-monitor')
print('取消订阅')
import redis
r = redis.Redis(host='ip', port=6379, db=15, password='****')
r.set('name', 'Jaycekon')
value = r.get('name')
print(value)
from threading import Thread
queues = queue.Queue(10)
class Producer(Thread):
def run(self):
while True:
elem = random.randrange(9)
queues.put(elem)
print("厨师 {} 做了 {} 饭 --- 还剩 {} 饭没卖完".format(self.name, elem, queues.qsize()))
time.sleep(random.random())
class Consumer(Thread):
def run(self):
while True:
elem = queues.get()
print("吃货{} 吃了 {} 饭 --- 还有 {} 饭可以吃".format(self.name, elem, queues.qsize()))
time.sleep(random.random())
def main():
for i in range(3):
p = Producer()
p.start()
for i in range(2):
c = Consumer()
c.start()
if __name__ == '__main__':
main()
import redis
pool = redis.ConnectionPool(host='ip', port=6379, db=4, password='****')
r = redis.StrictRedis(connection_pool=pool)
while True:
inputs = input("publish:")
r.publish('spub', inputs)
if inputs == 'over':
print('停止发布')
break
import redis
pool = redis.ConnectionPool(host='ip', port=6379, db=4, password='***')
r = redis.StrictRedis(connection_pool=pool)
p = r.pubsub()
p.subscribe('cardniu-monitor')
for item in p.listen():
print(item)
if item['type'] == 'message':
data = item['data']
print("消息队列中接收到信息:", data)if item['data'] == 'over':
break
p.unsubscribe('cardniu-monitor')
print('取消订阅')
from wechat_sender import *
from wxpy import *
bot = Bot(qr_path="qr.png")
group = bot.groups().search('监控报警')[0]
print("微信登陆成功!进行监控报警功能!")
print(group)
#
listen(bot, token='test', receivers=[group])
import redis
from wechat_sender import *
sender = Sender(token='test', receivers='监控报警')
pool = redis.ConnectionPool(host='10.201.3.18', port=6379, db=4, password='kntest%pw_@dk2')
r = redis.StrictRedis(connection_pool=pool)
p = r.pubsub()
p.subscribe('cardniu-monitor')
for item in p.listen():
print(item)
if item['type'] == 'message':
data = item['data']
print("消息队列中接收到信息:", data)
sender.send(message=data)
if item['data'] == 'over':
break
p.unsubscribe('cardniu-monitor')
print('取消订阅')
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.1.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.1.xsd">
<!-- redis连接池的配置 -->
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxTotal" value="${redis.pool.maxTotal}" />
<property name="maxIdle" value="${redis.pool.maxIdle}" />
<property name="minIdle" value="${redis.pool.minIdle}" />
<property name="maxWaitMillis" value="${redis.pool.maxWaitMillis}" />
<property name="testOnBorrow" value="${redis.pool.testOnBorrow}" />
<property name="testOnReturn" value="${redis.pool.testOnReturn}" />
</bean>
<bean id="sentinelJedisPool" class="redis.clients.jedis.JedisSentinelPool">
<constructor-arg index="0" value="${redis.sentinel.masterName}" />
<constructor-arg index="1"
value="#{'${redis.sentinels}'.split(',')}" />
<constructor-arg index="2" ref="jedisPoolConfig" />
<constructor-arg index="3" value="${redis.sentinel.timeout}"
type="int" />
<constructor-arg index="4" value="${redis.sentinel.password}" />
<constructor-arg index="5" value="${redis.sentinel.database}"
type="int" />
</bean>
</beans>
@Autowired
private JedisSentinelPool jedisPool;
@PostConstruct
private void init() throws Exception {
/* 缓存初始化 */
JedisUtils.setJedisPool(jedisPool);
}
public static void setJedisPool(Pool<Jedis> jedisPool) throws Exception {
JedisCache.jedisPool = jedisPool;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
isInitSuc = true;
logger.info("redis start success!");
} catch (Exception e) {
if (null != jedis)
jedisPool.returnBrokenResource(jedis);
logger.error("redis start exception!!!error:{}", e.getMessage(), e);
if (e instanceof redis.clients.jedis.exceptions.JedisConnectionException) {
throw e;
}
} finally {
if (null != jedis)
jedisPool.returnResource(jedis);
}
}
public static Long publish(String chanel, String value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
return jedis.publish(chanel,value);
} catch (Exception e) {
if (null != jedis) {
jedisPool.returnBrokenResource(jedis);
jedis = null;
}
logger.error("redis exception:{}", e.getMessage(), e);
return 0L;
} finally {
if (null != jedis)
jedisPool.returnResource(jedis);
}
}
@Scheduled(cron = "*/5 * * * * ? ")
public void runMonitor() {
try {
List<T> notices;
List<EbankNotice> result;
while ((notices = QueueHolder.noticeBlockingQueue.take()) != null) { //消费
if (notices.isEmpty()) {
continue;
}
result = service.calculateNotice(notices);
result.forEach(notice -> {
JedisUtils.publish("cardniu-monitor", notice.getTitle() + "," +
DateUtils.format(notice.getPostTime(), "yyyy-MM-dd hh:mm:ss") + "," + notice.getLinkAddress());
});
}
} catch (InterruptedException e) {
logger.error("发送邮件定时任务异常,{}", e.getMessage(), e);
}
}
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有