########## AMQP连接参数(RabbitMQ) ########## amqp : host : "localhost" # RabbitMQ服务器主机 port : 5672 # RabbitMQ服务端口号 vhost : "/" # 虚拟主机 login : "ZhangSan" # 账号 password : "*************" # 密码 exchange : "demo.exchange" # 交换机名称 routing : "demo.routing" # 路由键 queue : "demo.queue" # 队列名称
package main import ( "encoding/json" "fmt" "github.com/streadway/amqp" "gopkg.in/yaml.v3" "math/rand" "os" "path/filepath" "time" ) // GetDateTime // @description 获取日期时间字符串 // @return string 日期时间字符串 func GetDateTime() string { location, _ := time.LoadLocation("Asia/Shanghai") return time.Unix(time.Now().In(location).Unix(), 0).In(location).Format(time.DateTime) } // RandSlice // @description 随机获取切片里的一个元素 // @param slice []any 切片 // @return any 切片元素 func RandSlice(slice []any) any { return slice[rand.New(rand.NewSource(time.Now().UnixNano())).Intn(len(slice))] } // GetAMQPConf // @description 获取AMQP配置 // @return map[string]string AMQP配置 func GetAMQPConf() map[string]string { // 从配置yaml文件获取RabbitMQ的连接参数 path, _ := filepath.Abs(".") file := path + "/amqp.yaml" // 读取yaml配置文件 contents, _ := os.ReadFile(file) // 把读取的yaml配置文件内容放入map var conf map[string]map[string]string _ = yaml.Unmarshal(contents, &conf) return conf["amqp"] } // GetConnectionAndChannel // @description 获取RabbitMQ的连接对象和信道对象(生产端和消费端都需要用到这两个对象,故写成公共函数) // @return connection *amqp.Connection 连接对象 // @return channel *amqp.Channel 信道对象 // @return err error 错误信息,若无错误则为nil func GetConnectionAndChannel() (connection *amqp.Connection, channel *amqp.Channel, err error) { conf := GetAMQPConf() url := fmt.Sprintf( "amqp://%s:%s@%s:%s/", conf["login"], conf["password"], conf["host"], conf["port"], ) // 连接RabbitMQ服务器 connection, err = amqp.Dial(url) if err != nil { return } // 重要提醒:这里不能关闭连接,因为Connection对象是要返回给调用方使用的,关闭连接由调用方去执行。 // 创建信道 channel, err = connection.Channel() if err != nil { return } // 重要提醒:这里不能关闭信道,因为Channel对象是要返回给调用方使用的,关闭信道由调用方去执行。 return } // Producer // @description 生产消息 // @param message map[string]any 消息数据 // @return err error 是否发布成功 func Producer(message map[string]any) (err error) { var connection *amqp.Connection var channel *amqp.Channel connection, channel, err = GetConnectionAndChannel() if err != nil { return } // 关闭和RabbitMQ服务器的连接 defer func(connection *amqp.Connection) { _ = connection.Close() }(connection) // 关闭信道 defer func(channel *amqp.Channel) { _ = channel.Close() }(channel) conf := GetAMQPConf() // 声明交换机 err = channel.ExchangeDeclare(conf["exchange"], amqp.ExchangeDirect, true, false, false, false, nil) if err != nil { return } defer func(channel *amqp.Channel) { _ = channel.Close() }(channel) // 声明队列 _, err = channel.QueueDeclare(conf["queue"], true, false, false, false, nil) if err != nil { return } // 交换机绑定队列 err = channel.QueueBind(conf["queue"], conf["routing"], conf["exchange"], false, nil) if err != nil { return } // 发布消息 body, _ := json.Marshal(message) msg := amqp.Publishing{Headers: amqp.Table{}, ContentType: "text/plain", ContentEncoding: "", Body: body} err = channel.Publish(conf["exchange"], conf["routing"], false, false, msg) return } // Consumer // @description 消息消费 // @return error 是否启动成功 func Consumer() error { errChan := make(chan error) go func() { var connection *amqp.Connection var channel *amqp.Channel var err error connection, channel, err = GetConnectionAndChannel() if err != nil { errChan <- err return } // 关闭和RabbitMQ服务器的连接 defer func(connection *amqp.Connection) { _ = connection.Close() }(connection) // 关闭信道 defer func(channel *amqp.Channel) { _ = channel.Close() }(channel) conf := GetAMQPConf() queue, err := channel.QueueDeclare(conf["queue"], true, false, false, false, nil) if err != nil { errChan <- err return } // 创建消费者,强烈建议关闭自动应答(参数autoAck设为false即可关闭自动应答),采用手动应答的方式 deliveryChan, err := channel.Consume(queue.Name, "", false, false, false, false, nil) if err != nil { errChan <- err return } errChan <- nil for message := range deliveryChan { // 使用协程处理消息,防止读取消息被阻塞 go func(message amqp.Delivery) { // 获取消息体的数据 body := map[string]any{} _ = json.Unmarshal(message.Body, &body) id := body["id"] name := body["name"] // 处理消息(这里休眠100毫秒模拟处理消息的耗时操作) time.Sleep(time.Millisecond * 100) err = message.Ack(false) // 由于上面设置了关闭自动应答,所以这里必须手动应答 if err != nil { fmt.Printf("[%s] 应答消息(%s)失败:%s \n", GetDateTime(), id, err.Error()) } else { fmt.Printf("[%s] 应答消息(%s)成功:name = %s \n", GetDateTime(), id, name) } }(message) } }() return <-errChan // 阻塞函数执行完,直到消费端初始化完成(无论成功还是失败) } func main() { err := Consumer() if err != nil { panic("启动消费端失败:" + err.Error()) } fmt.Printf("[%s] ========== 启动消费端成功 ========== \n", GetDateTime()) var name []any name = append(name, "平天大圣·牛魔王·Go") name = append(name, "覆海大圣·蛟魔王·Go") name = append(name, "混天大圣·鹏魔王·Go") name = append(name, "移山大圣·狮驼王·Go") name = append(name, "通风大圣·猕猴王·Go") name = append(name, "驱神大圣·禺狨王·Go") name = append(name, "齐天大圣·美猴王·Go") id := 0 message := map[string]any{} // 使用Ticker定时向RabbitMQ发消息 ticker := time.NewTicker(time.Second * 1) for range ticker.C { id++ message["id"] = fmt.Sprintf("G%05d", id) // 消息ID message["name"] = RandSlice(name) err = Producer(message) if err != nil { // fmt.Printf("[%s] 发布消息(%s)失败:%s \n", GetDateTime(), message["id"], err.Error()) } else { // fmt.Printf("[%s] 发布消息(%s)成功:name = %s \n", GetDateTime(), message["id"], message["name"]) } } }
<?php // 重要提醒:当前PHP脚本文件需在命令行运行。 // 从配置yaml文件获取RabbitMQ的连接参数 $file = __DIR__ . '/amqp.yaml'; $conf = yaml_parse_file($file)['amqp']; // RabbitMQ服务器连接参数 $credentials = [ 'host' => $conf['host'], 'port' => $conf['port'], 'vhost' => $conf['vhost'], 'login' => $conf['login'], 'password' => $conf['password'], ]; $connection = new AMQPConnection($credentials); // 连接RabbitMQ服务器 try { if ($connection->connect() !== true) { exit('连接RabbitMQ服务器失败'); } } catch (AMQPConnectionException $e) { exit('连接RabbitMQ服务器出现异常:' . $e->getMessage()); } // 建立网络信道 try { $channel = new AMQPChannel($connection); } catch (AMQPConnectionException $e) { exit('建立网络信道出现异常:' . $e->getMessage()); } // 创建交换机 try { $exchange = new AMQPExchange($channel); // 在指定网络信道创建交换机 } catch (Exception $e) { exit('创建交换机出现异常:' . $e->getMessage()); } $exchange->setName($conf['exchange']); // 设置交换机名称 $exchange->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型 $exchange->setFlags(AMQP_DURABLE); // 开启交换机持久化 try { $exchange->declareExchange(); } catch (Exception $e) { exit('声明交换机出现异常:' . $e->getMessage()); } $name = [ '平天大圣·牛魔王·PHP', '覆海大圣·蛟魔王·PHP', '混天大圣·鹏魔王·PHP', '移山大圣·狮驼王·PHP', '通风大圣·猕猴王·PHP', '驱神大圣·禺狨王·PHP', '齐天大圣·美猴王·PHP', ]; for ($i = 1; $i <= 999999; $i++) { $id = sprintf('P%05d', $i); $nm = $name[(int)array_rand($name)]; $message = json_encode(['id' => $id, 'name' => $nm]); // 发布消息 try { $publish = $exchange->publish($message, $conf['routing']); } catch (Exception $e) { $publish = false; unset($e); } $datetime = date('Y-m-d H:i:s'); if ($publish === true) { echo "[{$datetime}] 发布消息({$id})成功:name = {$nm}" . PHP_EOL; } else { echo "[{$datetime}] 发布消息({$id})失败" . PHP_EOL; } sleep(1); }
Copyright © 2024 码农人生. All Rights Reserved