forked from pneymrl2f/nightingale
parent
9e881bc9a5
commit
a443710669
@ -0,0 +1,39 @@
|
||||
SHELL := /bin/bash
|
||||
|
||||
build: machine.go
|
||||
|
||||
images: docs/urn.png
|
||||
|
||||
machine.go: machine.go.rl
|
||||
ragel -Z -G2 -e -o $@ $<
|
||||
@sed -i '/^\/\/line/d' $@
|
||||
@$(MAKE) -s file=$@ snake2camel
|
||||
@gofmt -w -s $@
|
||||
|
||||
docs/urn.dot: machine.go.rl
|
||||
@mkdir -p docs
|
||||
ragel -Z -e -Vp $< -o $@
|
||||
|
||||
docs/urn.png: docs/urn.dot
|
||||
dot $< -Tpng -o $@
|
||||
|
||||
.PHONY: bench
|
||||
bench: *_test.go machine.go
|
||||
go test -bench=. -benchmem -benchtime=5s ./...
|
||||
|
||||
.PHONY: tests
|
||||
tests: *_test.go machine.go
|
||||
go test -race -timeout 10s -coverprofile=coverage.out -covermode=atomic -v ./...
|
||||
|
||||
.PHONY: clean
|
||||
clean:
|
||||
@rm -rf docs
|
||||
@rm -f machine.go
|
||||
|
||||
.PHONY: snake2camel
|
||||
snake2camel:
|
||||
@awk -i inplace '{ \
|
||||
while ( match($$0, /(.*)([a-z]+[0-9]*)_([a-zA-Z0-9])(.*)/, cap) ) \
|
||||
$$0 = cap[1] cap[2] toupper(cap[3]) cap[4]; \
|
||||
print \
|
||||
}' $(file)
|
@ -0,0 +1,12 @@
|
||||
certs/*
|
||||
spec/spec
|
||||
examples/simple-consumer/simple-consumer
|
||||
examples/simple-producer/simple-producer
|
||||
|
||||
.idea/**/workspace.xml
|
||||
.idea/**/tasks.xml
|
||||
.idea/**/usage.statistics.xml
|
||||
.idea/**/dictionaries
|
||||
.idea/**/shelf
|
||||
|
||||
.idea/**/contentModel.xml
|
@ -0,0 +1,25 @@
|
||||
language: go
|
||||
|
||||
go:
|
||||
- 1.10.x
|
||||
- 1.11.x
|
||||
- 1.12.x
|
||||
- 1.13.x
|
||||
|
||||
addons:
|
||||
apt:
|
||||
packages:
|
||||
- rabbitmq-server
|
||||
|
||||
services:
|
||||
- rabbitmq
|
||||
|
||||
env:
|
||||
- GO111MODULE=on AMQP_URL=amqp://guest:guest@127.0.0.1:5672/
|
||||
|
||||
before_install:
|
||||
- go get -v golang.org/x/lint/golint
|
||||
|
||||
script:
|
||||
- ./pre-commit
|
||||
- go test -cpu=1,2 -v -tags integration ./...
|
@ -0,0 +1,35 @@
|
||||
## Prequisites
|
||||
|
||||
1. Go: [https://golang.org/dl/](https://golang.org/dl/)
|
||||
1. Golint `go get -u -v github.com/golang/lint/golint`
|
||||
|
||||
## Contributing
|
||||
|
||||
The workflow is pretty standard:
|
||||
|
||||
1. Fork github.com/streadway/amqp
|
||||
1. Add the pre-commit hook: `ln -s ../../pre-commit .git/hooks/pre-commit`
|
||||
1. Create your feature branch (`git checkout -b my-new-feature`)
|
||||
1. Run integration tests (see below)
|
||||
1. **Implement tests**
|
||||
1. Implement fixs
|
||||
1. Commit your changes (`git commit -am 'Add some feature'`)
|
||||
1. Push to a branch (`git push -u origin my-new-feature`)
|
||||
1. Submit a pull request
|
||||
|
||||
## Running Tests
|
||||
|
||||
The test suite assumes that:
|
||||
|
||||
* A RabbitMQ node is running on localhost with all defaults: [https://www.rabbitmq.com/download.html](https://www.rabbitmq.com/download.html)
|
||||
* `AMQP_URL` is exported to `amqp://guest:guest@127.0.0.1:5672/`
|
||||
|
||||
### Integration Tests
|
||||
|
||||
After starting a local RabbitMQ, run integration tests with the following:
|
||||
|
||||
env AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ go test -v -cpu 2 -tags integration -race
|
||||
|
||||
All integration tests should use the `integrationConnection(...)` test
|
||||
helpers defined in `integration_test.go` to setup the integration environment
|
||||
and logging.
|
@ -0,0 +1,23 @@
|
||||
Copyright (c) 2012-2019, Sean Treadway, SoundCloud Ltd.
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
Redistributions of source code must retain the above copyright notice, this
|
||||
list of conditions and the following disclaimer.
|
||||
|
||||
Redistributions in binary form must reproduce the above copyright notice, this
|
||||
list of conditions and the following disclaimer in the documentation and/or
|
||||
other materials provided with the distribution.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
@ -0,0 +1,93 @@
|
||||
[![Build Status](https://api.travis-ci.org/streadway/amqp.svg)](http://travis-ci.org/streadway/amqp) [![GoDoc](https://godoc.org/github.com/streadway/amqp?status.svg)](http://godoc.org/github.com/streadway/amqp)
|
||||
|
||||
# Go RabbitMQ Client Library
|
||||
|
||||
This is an AMQP 0.9.1 client with RabbitMQ extensions in Go.
|
||||
|
||||
## Project Maturity
|
||||
|
||||
This project has been used in production systems for many years. It is reasonably mature
|
||||
and feature complete, and as of November 2016 has [a team of maintainers](https://github.com/streadway/amqp/issues/215).
|
||||
|
||||
Future API changes are unlikely but possible. They will be discussed on [Github
|
||||
issues](https://github.com/streadway/amqp/issues) along with any bugs or
|
||||
enhancements.
|
||||
|
||||
## Supported Go Versions
|
||||
|
||||
This library supports two most recent Go release series, currently 1.10 and 1.11.
|
||||
|
||||
|
||||
## Supported RabbitMQ Versions
|
||||
|
||||
This project supports RabbitMQ versions starting with `2.0` but primarily tested
|
||||
against reasonably recent `3.x` releases. Some features and behaviours may be
|
||||
server version-specific.
|
||||
|
||||
## Goals
|
||||
|
||||
Provide a functional interface that closely represents the AMQP 0.9.1 model
|
||||
targeted to RabbitMQ as a server. This includes the minimum necessary to
|
||||
interact the semantics of the protocol.
|
||||
|
||||
## Non-goals
|
||||
|
||||
Things not intended to be supported.
|
||||
|
||||
* Auto reconnect and re-synchronization of client and server topologies.
|
||||
* Reconnection would require understanding the error paths when the
|
||||
topology cannot be declared on reconnect. This would require a new set
|
||||
of types and code paths that are best suited at the call-site of this
|
||||
package. AMQP has a dynamic topology that needs all peers to agree. If
|
||||
this doesn't happen, the behavior is undefined. Instead of producing a
|
||||
possible interface with undefined behavior, this package is designed to
|
||||
be simple for the caller to implement the necessary connection-time
|
||||
topology declaration so that reconnection is trivial and encapsulated in
|
||||
the caller's application code.
|
||||
* AMQP Protocol negotiation for forward or backward compatibility.
|
||||
* 0.9.1 is stable and widely deployed. Versions 0.10 and 1.0 are divergent
|
||||
specifications that change the semantics and wire format of the protocol.
|
||||
We will accept patches for other protocol support but have no plans for
|
||||
implementation ourselves.
|
||||
* Anything other than PLAIN and EXTERNAL authentication mechanisms.
|
||||
* Keeping the mechanisms interface modular makes it possible to extend
|
||||
outside of this package. If other mechanisms prove to be popular, then
|
||||
we would accept patches to include them in this package.
|
||||
|
||||
## Usage
|
||||
|
||||
See the 'examples' subdirectory for simple producers and consumers executables.
|
||||
If you have a use-case in mind which isn't well-represented by the examples,
|
||||
please file an issue.
|
||||
|
||||
## Documentation
|
||||
|
||||
Use [Godoc documentation](http://godoc.org/github.com/streadway/amqp) for
|
||||
reference and usage.
|
||||
|
||||
[RabbitMQ tutorials in
|
||||
Go](https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go) are also
|
||||
available.
|
||||
|
||||
## Contributing
|
||||
|
||||
Pull requests are very much welcomed. Create your pull request on a non-master
|
||||
branch, make sure a test or example is included that covers your change and
|
||||
your commits represent coherent changes that include a reason for the change.
|
||||
|
||||
To run the integration tests, make sure you have RabbitMQ running on any host,
|
||||
export the environment variable `AMQP_URL=amqp://host/` and run `go test -tags
|
||||
integration`. TravisCI will also run the integration tests.
|
||||
|
||||
Thanks to the [community of contributors](https://github.com/streadway/amqp/graphs/contributors).
|
||||
|
||||
## External packages
|
||||
|
||||
* [Google App Engine Dialer support](https://github.com/soundtrackyourbrand/gaeamqp)
|
||||
* [RabbitMQ examples in Go](https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go)
|
||||
|
||||
## License
|
||||
|
||||
BSD 2 clause - see LICENSE for more details.
|
||||
|
||||
|
@ -0,0 +1,106 @@
|
||||
package amqp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math/big"
|
||||
)
|
||||
|
||||
const (
|
||||
free = 0
|
||||
allocated = 1
|
||||
)
|
||||
|
||||
// allocator maintains a bitset of allocated numbers.
|
||||
type allocator struct {
|
||||
pool *big.Int
|
||||
last int
|
||||
low int
|
||||
high int
|
||||
}
|
||||
|
||||
// NewAllocator reserves and frees integers out of a range between low and
|
||||
// high.
|
||||
//
|
||||
// O(N) worst case space used, where N is maximum allocated, divided by
|
||||
// sizeof(big.Word)
|
||||
func newAllocator(low, high int) *allocator {
|
||||
return &allocator{
|
||||
pool: big.NewInt(0),
|
||||
last: low,
|
||||
low: low,
|
||||
high: high,
|
||||
}
|
||||
}
|
||||
|
||||
// String returns a string describing the contents of the allocator like
|
||||
// "allocator[low..high] reserved..until"
|
||||
//
|
||||
// O(N) where N is high-low
|
||||
func (a allocator) String() string {
|
||||
b := &bytes.Buffer{}
|
||||
fmt.Fprintf(b, "allocator[%d..%d]", a.low, a.high)
|
||||
|
||||
for low := a.low; low <= a.high; low++ {
|
||||
high := low
|
||||
for a.reserved(high) && high <= a.high {
|
||||
high++
|
||||
}
|
||||
|
||||
if high > low+1 {
|
||||
fmt.Fprintf(b, " %d..%d", low, high-1)
|
||||
} else if high > low {
|
||||
fmt.Fprintf(b, " %d", high-1)
|
||||
}
|
||||
|
||||
low = high
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// Next reserves and returns the next available number out of the range between
|
||||
// low and high. If no number is available, false is returned.
|
||||
//
|
||||
// O(N) worst case runtime where N is allocated, but usually O(1) due to a
|
||||
// rolling index into the oldest allocation.
|
||||
func (a *allocator) next() (int, bool) {
|
||||
wrapped := a.last
|
||||
|
||||
// Find trailing bit
|
||||
for ; a.last <= a.high; a.last++ {
|
||||
if a.reserve(a.last) {
|
||||
return a.last, true
|
||||
}
|
||||
}
|
||||
|
||||
// Find preceding free'd pool
|
||||
a.last = a.low
|
||||
|
||||
for ; a.last < wrapped; a.last++ {
|
||||
if a.reserve(a.last) {
|
||||
return a.last, true
|
||||
}
|
||||
}
|
||||
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// reserve claims the bit if it is not already claimed, returning true if
|
||||
// successfully claimed.
|
||||
func (a *allocator) reserve(n int) bool {
|
||||
if a.reserved(n) {
|
||||
return false
|
||||
}
|
||||
a.pool.SetBit(a.pool, n-a.low, allocated)
|
||||
return true
|
||||
}
|
||||
|
||||
// reserved returns true if the integer has been allocated
|
||||
func (a *allocator) reserved(n int) bool {
|
||||
return a.pool.Bit(n-a.low) == allocated
|
||||
}
|
||||
|
||||
// release frees the use of the number for another allocation
|
||||
func (a *allocator) release(n int) {
|
||||
a.pool.SetBit(a.pool, n-a.low, free)
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
// Source code and contact info at http://github.com/streadway/amqp
|
||||
|
||||
package amqp
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Authentication interface provides a means for different SASL authentication
|
||||
// mechanisms to be used during connection tuning.
|
||||
type Authentication interface {
|
||||
Mechanism() string
|
||||
Response() string
|
||||
}
|
||||
|
||||
// PlainAuth is a similar to Basic Auth in HTTP.
|
||||
type PlainAuth struct {
|
||||
Username string
|
||||
Password string
|
||||
}
|
||||
|
||||
// Mechanism returns "PLAIN"
|
||||
func (auth *PlainAuth) Mechanism() string {
|
||||
return "PLAIN"
|
||||
}
|
||||
|
||||
// Response returns the null character delimited encoding for the SASL PLAIN Mechanism.
|
||||
func (auth *PlainAuth) Response() string {
|
||||
return fmt.Sprintf("\000%s\000%s", auth.Username, auth.Password)
|
||||
}
|
||||
|
||||
// AMQPlainAuth is similar to PlainAuth
|
||||
type AMQPlainAuth struct {
|
||||
Username string
|
||||
Password string
|
||||
}
|
||||
|
||||
// Mechanism returns "AMQPLAIN"
|
||||
func (auth *AMQPlainAuth) Mechanism() string {
|
||||
return "AMQPLAIN"
|
||||
}
|
||||
|
||||
// Response returns the null character delimited encoding for the SASL PLAIN Mechanism.
|
||||
func (auth *AMQPlainAuth) Response() string {
|
||||
return fmt.Sprintf("LOGIN:%sPASSWORD:%s", auth.Username, auth.Password)
|
||||
}
|
||||
|
||||
// Finds the first mechanism preferred by the client that the server supports.
|
||||
func pickSASLMechanism(client []Authentication, serverMechanisms []string) (auth Authentication, ok bool) {
|
||||
for _, auth = range client {
|
||||
for _, mech := range serverMechanisms {
|
||||
if auth.Mechanism() == mech {
|
||||
return auth, true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
@ -0,0 +1,159 @@
|
||||
#!/bin/sh
|
||||
#
|
||||
# Creates the CA, server and client certs to be used by tls_test.go
|
||||
# http://www.rabbitmq.com/ssl.html
|
||||
#
|
||||
# Copy stdout into the const section of tls_test.go or use for RabbitMQ
|
||||
#
|
||||
root=$PWD/certs
|
||||
|
||||
if [ -f $root/ca/serial ]; then
|
||||
echo >&2 "Previous installation found"
|
||||
echo >&2 "Remove $root/ca and rerun to overwrite"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
mkdir -p $root/ca/private
|
||||
mkdir -p $root/ca/certs
|
||||
mkdir -p $root/server
|
||||
mkdir -p $root/client
|
||||
|
||||
cd $root/ca
|
||||
|
||||
chmod 700 private
|
||||
touch index.txt
|
||||
echo 'unique_subject = no' > index.txt.attr
|
||||
echo '01' > serial
|
||||
echo >openssl.cnf '
|
||||
[ ca ]
|
||||
default_ca = testca
|
||||
|
||||
[ testca ]
|
||||
dir = .
|
||||
certificate = $dir/cacert.pem
|
||||
database = $dir/index.txt
|
||||
new_certs_dir = $dir/certs
|
||||
private_key = $dir/private/cakey.pem
|
||||
serial = $dir/serial
|
||||
|
||||
default_crl_days = 7
|
||||
default_days = 3650
|
||||
default_md = sha1
|
||||
|
||||
policy = testca_policy
|
||||
x509_extensions = certificate_extensions
|
||||
|
||||
[ testca_policy ]
|
||||
commonName = supplied
|
||||
stateOrProvinceName = optional
|
||||
countryName = optional
|
||||
emailAddress = optional
|
||||
organizationName = optional
|
||||
organizationalUnitName = optional
|
||||
|
||||
[ certificate_extensions ]
|
||||
basicConstraints = CA:false
|
||||
|
||||
[ req ]
|
||||
default_bits = 2048
|
||||
default_keyfile = ./private/cakey.pem
|
||||
default_md = sha1
|
||||
prompt = yes
|
||||
distinguished_name = root_ca_distinguished_name
|
||||
x509_extensions = root_ca_extensions
|
||||
|
||||
[ root_ca_distinguished_name ]
|
||||
commonName = hostname
|
||||
|
||||
[ root_ca_extensions ]
|
||||
basicConstraints = CA:true
|
||||
keyUsage = keyCertSign, cRLSign
|
||||
|
||||
[ client_ca_extensions ]
|
||||
basicConstraints = CA:false
|
||||
keyUsage = digitalSignature
|
||||
extendedKeyUsage = 1.3.6.1.5.5.7.3.2
|
||||
|
||||
[ server_ca_extensions ]
|
||||
basicConstraints = CA:false
|
||||
keyUsage = keyEncipherment
|
||||
extendedKeyUsage = 1.3.6.1.5.5.7.3.1
|
||||
subjectAltName = @alt_names
|
||||
|
||||
[ alt_names ]
|
||||
IP.1 = 127.0.0.1
|
||||
'
|
||||
|
||||
openssl req \
|
||||
-x509 \
|
||||
-nodes \
|
||||
-config openssl.cnf \
|
||||
-newkey rsa:2048 \
|
||||
-days 3650 \
|
||||
-subj "/CN=MyTestCA/" \
|
||||
-out cacert.pem \
|
||||
-outform PEM
|
||||
|
||||
openssl x509 \
|
||||
-in cacert.pem \
|
||||
-out cacert.cer \
|
||||
-outform DER
|
||||
|
||||
openssl genrsa -out $root/server/key.pem 2048
|
||||
openssl genrsa -out $root/client/key.pem 2048
|
||||
|
||||
openssl req \
|
||||
-new \
|
||||
-nodes \
|
||||
-config openssl.cnf \
|
||||
-subj "/CN=127.0.0.1/O=server/" \
|
||||
-key $root/server/key.pem \
|
||||
-out $root/server/req.pem \
|
||||
-outform PEM
|
||||
|
||||
openssl req \
|
||||
-new \
|
||||
-nodes \
|
||||
-config openssl.cnf \
|
||||
-subj "/CN=127.0.0.1/O=client/" \
|
||||
-key $root/client/key.pem \
|
||||
-out $root/client/req.pem \
|
||||
-outform PEM
|
||||
|
||||
openssl ca \
|
||||
-config openssl.cnf \
|
||||
-in $root/server/req.pem \
|
||||
-out $root/server/cert.pem \
|
||||
-notext \
|
||||
-batch \
|
||||
-extensions server_ca_extensions
|
||||
|
||||
openssl ca \
|
||||
-config openssl.cnf \
|
||||
-in $root/client/req.pem \
|
||||
-out $root/client/cert.pem \
|
||||
-notext \
|
||||
-batch \
|
||||
-extensions client_ca_extensions
|
||||
|
||||
cat <<-END
|
||||
const caCert = \`
|
||||
`cat $root/ca/cacert.pem`
|
||||
\`
|
||||
|
||||
const serverCert = \`
|
||||
`cat $root/server/cert.pem`
|
||||
\`
|
||||
|
||||
const serverKey = \`
|
||||
`cat $root/server/key.pem`
|
||||
\`
|
||||
|
||||
const clientCert = \`
|
||||
`cat $root/client/cert.pem`
|
||||
\`
|
||||
|
||||
const clientKey = \`
|
||||
`cat $root/client/key.pem`
|
||||
\`
|
||||
END
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,94 @@
|
||||
package amqp
|
||||
|
||||
import "sync"
|
||||
|
||||
// confirms resequences and notifies one or multiple publisher confirmation listeners
|
||||
type confirms struct {
|
||||
m sync.Mutex
|
||||
listeners []chan Confirmation
|
||||
sequencer map[uint64]Confirmation
|
||||
published uint64
|
||||
expecting uint64
|
||||
}
|
||||
|
||||
// newConfirms allocates a confirms
|
||||
func newConfirms() *confirms {
|
||||
return &confirms{
|
||||
sequencer: map[uint64]Confirmation{},
|
||||
published: 0,
|
||||
expecting: 1,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *confirms) Listen(l chan Confirmation) {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
|
||||
c.listeners = append(c.listeners, l)
|
||||
}
|
||||
|
||||
// publish increments the publishing counter
|
||||
func (c *confirms) Publish() uint64 {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
|
||||
c.published++
|
||||
return c.published
|
||||
}
|
||||
|
||||
// confirm confirms one publishing, increments the expecting delivery tag, and
|
||||
// removes bookkeeping for that delivery tag.
|
||||
func (c *confirms) confirm(confirmation Confirmation) {
|
||||
delete(c.sequencer, c.expecting)
|
||||
c.expecting++
|
||||
for _, l := range c.listeners {
|
||||
l <- confirmation
|
||||
}
|
||||
}
|
||||
|
||||
// resequence confirms any out of order delivered confirmations
|
||||
func (c *confirms) resequence() {
|
||||
for c.expecting <= c.published {
|
||||
sequenced, found := c.sequencer[c.expecting]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
c.confirm(sequenced)
|
||||
}
|
||||
}
|
||||
|
||||
// one confirms one publishing and all following in the publishing sequence
|
||||
func (c *confirms) One(confirmed Confirmation) {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
|
||||
if c.expecting == confirmed.DeliveryTag {
|
||||
c.confirm(confirmed)
|
||||
} else {
|
||||
c.sequencer[confirmed.DeliveryTag] = confirmed
|
||||
}
|
||||
c.resequence()
|
||||
}
|
||||
|
||||
// multiple confirms all publishings up until the delivery tag
|
||||
func (c *confirms) Multiple(confirmed Confirmation) {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
|
||||
for c.expecting <= confirmed.DeliveryTag {
|
||||
c.confirm(Confirmation{c.expecting, confirmed.Ack})
|
||||
}
|
||||
c.resequence()
|
||||
}
|
||||
|
||||
// Close closes all listeners, discarding any out of sequence confirmations
|
||||
func (c *confirms) Close() error {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
|
||||
for _, l := range c.listeners {
|
||||
close(l)
|
||||
}
|
||||
c.listeners = nil
|
||||
return nil
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,142 @@
|
||||
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
// Source code and contact info at http://github.com/streadway/amqp
|
||||
|
||||
package amqp
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
var consumerSeq uint64
|
||||
|
||||
const consumerTagLengthMax = 0xFF // see writeShortstr
|
||||
|
||||
func uniqueConsumerTag() string {
|
||||
return commandNameBasedUniqueConsumerTag(os.Args[0])
|
||||
}
|
||||
|
||||
func commandNameBasedUniqueConsumerTag(commandName string) string {
|
||||
tagPrefix := "ctag-"
|
||||
tagInfix := commandName
|
||||
tagSuffix := "-" + strconv.FormatUint(atomic.AddUint64(&consumerSeq, 1), 10)
|
||||
|
||||
if len(tagPrefix)+len(tagInfix)+len(tagSuffix) > consumerTagLengthMax {
|
||||
tagInfix = "streadway/amqp"
|
||||
}
|
||||
|
||||
return tagPrefix + tagInfix + tagSuffix
|
||||
}
|
||||
|
||||
type consumerBuffers map[string]chan *Delivery
|
||||
|
||||
// Concurrent type that manages the consumerTag ->
|
||||
// ingress consumerBuffer mapping
|
||||
type consumers struct {
|
||||
sync.WaitGroup // one for buffer
|
||||
closed chan struct{} // signal buffer
|
||||
|
||||
sync.Mutex // protects below
|
||||
chans consumerBuffers
|
||||
}
|
||||
|
||||
func makeConsumers() *consumers {
|
||||
return &consumers{
|
||||
closed: make(chan struct{}),
|
||||
chans: make(consumerBuffers),
|
||||
}
|
||||
}
|
||||
|
||||
func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) {
|
||||
defer close(out)
|
||||
defer subs.Done()
|
||||
|
||||
var inflight = in
|
||||
var queue []*Delivery
|
||||
|
||||
for delivery := range in {
|
||||
queue = append(queue, delivery)
|
||||
|
||||
for len(queue) > 0 {
|
||||
select {
|
||||
case <-subs.closed:
|
||||
// closed before drained, drop in-flight
|
||||
return
|
||||
|
||||
case delivery, consuming := <-inflight:
|
||||
if consuming {
|
||||
queue = append(queue, delivery)
|
||||
} else {
|
||||
inflight = nil
|
||||
}
|
||||
|
||||
case out <- *queue[0]:
|
||||
queue = queue[1:]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// On key conflict, close the previous channel.
|
||||
func (subs *consumers) add(tag string, consumer chan Delivery) {
|
||||
subs.Lock()
|
||||
defer subs.Unlock()
|
||||
|
||||
if prev, found := subs.chans[tag]; found {
|
||||
close(prev)
|
||||
}
|
||||
|
||||
in := make(chan *Delivery)
|
||||
subs.chans[tag] = in
|
||||
|
||||
subs.Add(1)
|
||||
go subs.buffer(in, consumer)
|
||||
}
|
||||
|
||||
func (subs *consumers) cancel(tag string) (found bool) {
|
||||
subs.Lock()
|
||||
defer subs.Unlock()
|
||||
|
||||
ch, found := subs.chans[tag]
|
||||
|
||||
if found {
|
||||
delete(subs.chans, tag)
|
||||
close(ch)
|
||||
}
|
||||
|
||||
return found
|
||||
}
|
||||
|
||||
func (subs *consumers) close() {
|
||||
subs.Lock()
|
||||
defer subs.Unlock()
|
||||
|
||||
close(subs.closed)
|
||||
|
||||
for tag, ch := range subs.chans {
|
||||
delete(subs.chans, tag)
|
||||
close(ch)
|
||||
}
|
||||
|
||||
subs.Wait()
|
||||
}
|
||||
|
||||
// Sends a delivery to a the consumer identified by `tag`.
|
||||
// If unbuffered channels are used for Consume this method
|
||||
// could block all deliveries until the consumer
|
||||
// receives on the other end of the channel.
|
||||
func (subs *consumers) send(tag string, msg *Delivery) bool {
|
||||
subs.Lock()
|
||||
defer subs.Unlock()
|
||||
|
||||
buffer, found := subs.chans[tag]
|
||||
if found {
|
||||
buffer <- msg
|
||||
}
|
||||
|
||||
return found
|
||||
}
|
@ -0,0 +1,173 @@
|
||||
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
// Source code and contact info at http://github.com/streadway/amqp
|
||||
|
||||
package amqp
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
var errDeliveryNotInitialized = errors.New("delivery not initialized")
|
||||
|
||||
// Acknowledger notifies the server of successful or failed consumption of
|
||||
// delivieries via identifier found in the Delivery.DeliveryTag field.
|
||||
//
|
||||
// Applications can provide mock implementations in tests of Delivery handlers.
|
||||
type Acknowledger interface {
|
||||
Ack(tag uint64, multiple bool) error
|
||||
Nack(tag uint64, multiple bool, requeue bool) error
|
||||
Reject(tag uint64, requeue bool) error
|
||||
}
|
||||
|
||||
// Delivery captures the fields for a previously delivered message resident in
|
||||
// a queue to be delivered by the server to a consumer from Channel.Consume or
|
||||
// Channel.Get.
|
||||
type Delivery struct {
|
||||
Acknowledger Acknowledger // the channel from which this delivery arrived
|
||||
|
||||
Headers Table // Application or header exchange table
|
||||
|
||||
// Properties
|
||||
ContentType string // MIME content type
|
||||
ContentEncoding string // MIME content encoding
|
||||
DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2)
|
||||
Priority uint8 // queue implementation use - 0 to 9
|
||||
CorrelationId string // application use - correlation identifier
|
||||
ReplyTo string // application use - address to reply to (ex: RPC)
|
||||
Expiration string // implementation use - message expiration spec
|
||||
MessageId string // application use - message identifier
|
||||
Timestamp time.Time // application use - message timestamp
|
||||
Type string // application use - message type name
|
||||
UserId string // application use - creating user - should be authenticated user
|
||||
AppId string // application use - creating application id
|
||||
|
||||
// Valid only with Channel.Consume
|
||||
ConsumerTag string
|
||||
|
||||
// Valid only with Channel.Get
|
||||
MessageCount uint32
|
||||
|
||||
DeliveryTag uint64
|
||||
Redelivered bool
|
||||
Exchange string // basic.publish exchange
|
||||
RoutingKey string // basic.publish routing key
|
||||
|
||||
Body []byte
|
||||
}
|
||||
|
||||
func newDelivery(channel *Channel, msg messageWithContent) *Delivery {
|
||||
props, body := msg.getContent()
|
||||
|
||||
delivery := Delivery{
|
||||
Acknowledger: channel,
|
||||
|
||||
Headers: props.Headers,
|
||||
ContentType: props.ContentType,
|
||||
ContentEncoding: props.ContentEncoding,
|
||||
DeliveryMode: props.DeliveryMode,
|
||||
Priority: props.Priority,
|
||||
CorrelationId: props.CorrelationId,
|
||||
ReplyTo: props.ReplyTo,
|
||||
Expiration: props.Expiration,
|
||||
MessageId: props.MessageId,
|
||||
Timestamp: props.Timestamp,
|
||||
Type: props.Type,
|
||||
UserId: props.UserId,
|
||||
AppId: props.AppId,
|
||||
|
||||
Body: body,
|
||||
}
|
||||
|
||||
// Properties for the delivery types
|
||||
switch m := msg.(type) {
|
||||
case *basicDeliver:
|
||||
delivery.ConsumerTag = m.ConsumerTag
|
||||
delivery.DeliveryTag = m.DeliveryTag
|
||||
delivery.Redelivered = m.Redelivered
|
||||
delivery.Exchange = m.Exchange
|
||||
delivery.RoutingKey = m.RoutingKey
|
||||
|
||||
case *basicGetOk:
|
||||
delivery.MessageCount = m.MessageCount
|
||||
delivery.DeliveryTag = m.DeliveryTag
|
||||
delivery.Redelivered = m.Redelivered
|
||||
delivery.Exchange = m.Exchange
|
||||
delivery.RoutingKey = m.RoutingKey
|
||||
}
|
||||
|
||||
return &delivery
|
||||
}
|
||||
|
||||
/*
|
||||
Ack delegates an acknowledgement through the Acknowledger interface that the
|
||||
client or server has finished work on a delivery.
|
||||
|
||||
All deliveries in AMQP must be acknowledged. If you called Channel.Consume
|
||||
with autoAck true then the server will be automatically ack each message and
|
||||
this method should not be called. Otherwise, you must call Delivery.Ack after
|
||||
you have successfully processed this delivery.
|
||||
|
||||
When multiple is true, this delivery and all prior unacknowledged deliveries
|
||||
on the same channel will be acknowledged. This is useful for batch processing
|
||||
of deliveries.
|
||||
|
||||
An error will indicate that the acknowledge could not be delivered to the
|
||||
channel it was sent from.
|
||||
|
||||
Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
|
||||
delivery that is not automatically acknowledged.
|
||||
*/
|
||||
func (d Delivery) Ack(multiple bool) error {
|
||||
if d.Acknowledger == nil {
|
||||
return errDeliveryNotInitialized
|
||||
}
|
||||
return d.Acknowledger.Ack(d.DeliveryTag, multiple)
|
||||
}
|
||||
|
||||
/*
|
||||
Reject delegates a negatively acknowledgement through the Acknowledger interface.
|
||||
|
||||
When requeue is true, queue this message to be delivered to a consumer on a
|
||||
different channel. When requeue is false or the server is unable to queue this
|
||||
message, it will be dropped.
|
||||
|
||||
If you are batch processing deliveries, and your server supports it, prefer
|
||||
Delivery.Nack.
|
||||
|
||||
Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
|
||||
delivery that is not automatically acknowledged.
|
||||
*/
|
||||
func (d Delivery) Reject(requeue bool) error {
|
||||
if d.Acknowledger == nil {
|
||||
return errDeliveryNotInitialized
|
||||
}
|
||||
return d.Acknowledger.Reject(d.DeliveryTag, requeue)
|
||||
}
|
||||
|
||||
/*
|
||||
Nack negatively acknowledge the delivery of message(s) identified by the
|
||||
delivery tag from either the client or server.
|
||||
|
||||
When multiple is true, nack messages up to and including delivered messages up
|
||||
until the delivery tag delivered on the same channel.
|
||||
|
||||
When requeue is true, request the server to deliver this message to a different
|
||||
consumer. If it is not possible or requeue is false, the message will be
|
||||
dropped or delivered to a server configured dead-letter queue.
|
||||
|
||||
This method must not be used to select or requeue messages the client wishes
|
||||
not to handle, rather it is to inform the server that the client is incapable
|
||||
of handling this message at this time.
|
||||
|
||||
Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
|
||||
delivery that is not automatically acknowledged.
|
||||
*/
|
||||
func (d Delivery) Nack(multiple, requeue bool) error {
|
||||
if d.Acknowledger == nil {
|
||||
return errDeliveryNotInitialized
|
||||
}
|
||||
return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue)
|
||||
}
|
@ -0,0 +1,108 @@
|
||||
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
// Source code and contact info at http://github.com/streadway/amqp
|
||||
|
||||
/*
|
||||
Package amqp is an AMQP 0.9.1 client with RabbitMQ extensions
|
||||
|
||||
Understand the AMQP 0.9.1 messaging model by reviewing these links first. Much
|
||||
of the terminology in this library directly relates to AMQP concepts.
|
||||
|
||||
Resources
|
||||
|
||||
http://www.rabbitmq.com/tutorials/amqp-concepts.html
|
||||
http://www.rabbitmq.com/getstarted.html
|
||||
http://www.rabbitmq.com/amqp-0-9-1-reference.html
|
||||
|
||||
Design
|
||||
|
||||
Most other broker clients publish to queues, but in AMQP, clients publish
|
||||
Exchanges instead. AMQP is programmable, meaning that both the producers and
|
||||
consumers agree on the configuration of the broker, instead of requiring an
|
||||
operator or system configuration that declares the logical topology in the
|
||||
broker. The routing between producers and consumer queues is via Bindings.
|
||||
These bindings form the logical topology of the broker.
|
||||
|
||||
In this library, a message sent from publisher is called a "Publishing" and a
|
||||
message received to a consumer is called a "Delivery". The fields of
|
||||
Publishings and Deliveries are close but not exact mappings to the underlying
|
||||
wire format to maintain stronger types. Many other libraries will combine
|
||||
message properties with message headers. In this library, the message well
|
||||
known properties are strongly typed fields on the Publishings and Deliveries,
|
||||
whereas the user defined headers are in the Headers field.
|
||||
|
||||
The method naming closely matches the protocol's method name with positional
|
||||
parameters mapping to named protocol message fields. The motivation here is to
|
||||
present a comprehensive view over all possible interactions with the server.
|
||||
|
||||
Generally, methods that map to protocol methods of the "basic" class will be
|
||||
elided in this interface, and "select" methods of various channel mode selectors
|
||||
will be elided for example Channel.Confirm and Channel.Tx.
|
||||
|
||||
The library is intentionally designed to be synchronous, where responses for
|
||||
each protocol message are required to be received in an RPC manner. Some
|
||||
methods have a noWait parameter like Channel.QueueDeclare, and some methods are
|
||||
asynchronous like Channel.Publish. The error values should still be checked for
|
||||
these methods as they will indicate IO failures like when the underlying
|
||||
connection closes.
|
||||
|
||||
Asynchronous Events
|
||||
|
||||
Clients of this library may be interested in receiving some of the protocol
|
||||
messages other than Deliveries like basic.ack methods while a channel is in
|
||||
confirm mode.
|
||||
|
||||
The Notify* methods with Connection and Channel receivers model the pattern of
|
||||
asynchronous events like closes due to exceptions, or messages that are sent out
|
||||
of band from an RPC call like basic.ack or basic.flow.
|
||||
|
||||
Any asynchronous events, including Deliveries and Publishings must always have
|
||||
a receiver until the corresponding chans are closed. Without asynchronous
|
||||
receivers, the sychronous methods will block.
|
||||
|
||||
Use Case
|
||||
|
||||
It's important as a client to an AMQP topology to ensure the state of the
|
||||
broker matches your expectations. For both publish and consume use cases,
|
||||
make sure you declare the queues, exchanges and bindings you expect to exist
|
||||
prior to calling Channel.Publish or Channel.Consume.
|
||||
|
||||
// Connections start with amqp.Dial() typically from a command line argument
|
||||
// or environment variable.
|
||||
connection, err := amqp.Dial(os.Getenv("AMQP_URL"))
|
||||
|
||||
// To cleanly shutdown by flushing kernel buffers, make sure to close and
|
||||
// wait for the response.
|
||||
defer connection.Close()
|
||||
|
||||
// Most operations happen on a channel. If any error is returned on a
|
||||
// channel, the channel will no longer be valid, throw it away and try with
|
||||
// a different channel. If you use many channels, it's useful for the
|
||||
// server to
|
||||
channel, err := connection.Channel()
|
||||
|
||||
// Declare your topology here, if it doesn't exist, it will be created, if
|
||||
// it existed already and is not what you expect, then that's considered an
|
||||
// error.
|
||||
|
||||
// Use your connection on this topology with either Publish or Consume, or
|
||||
// inspect your queues with QueueInspect. It's unwise to mix Publish and
|
||||
// Consume to let TCP do its job well.
|
||||
|
||||
SSL/TLS - Secure connections
|
||||
|
||||
When Dial encounters an amqps:// scheme, it will use the zero value of a
|
||||
tls.Config. This will only perform server certificate and host verification.
|
||||
|
||||
Use DialTLS when you wish to provide a client certificate (recommended),
|
||||
include a private certificate authority's certificate in the cert chain for
|
||||
server validity, or run insecure by not verifying the server certificate dial
|
||||
your own connection. DialTLS will use the provided tls.Config when it
|
||||
encounters an amqps:// scheme and will dial a plain connection when it
|
||||
encounters an amqp:// scheme.
|
||||
|
||||
SSL/TLS in RabbitMQ is documented here: http://www.rabbitmq.com/ssl.html
|
||||
|
||||
*/
|
||||
package amqp
|
@ -0,0 +1,17 @@
|
||||
// +build gofuzz
|
||||
|
||||
package amqp
|
||||
|
||||
import "bytes"
|
||||
|
||||
func Fuzz(data []byte) int {
|
||||
r := reader{bytes.NewReader(data)}
|
||||
frame, err := r.ReadFrame()
|
||||
if err != nil {
|
||||
if frame != nil {
|
||||
panic("frame is not nil")
|
||||
}
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
@ -0,0 +1,2 @@
|
||||
#!/bin/sh
|
||||
go run spec/gen.go < spec/amqp0-9-1.stripped.extended.xml | gofmt > spec091.go
|
@ -0,0 +1,3 @@
|
||||
module github.com/streadway/amqp
|
||||
|
||||
go 1.10
|
@ -0,0 +1,67 @@
|
||||
#!/bin/sh
|
||||
|
||||
LATEST_STABLE_SUPPORTED_GO_VERSION="1.11"
|
||||
|
||||
main() {
|
||||
if local_go_version_is_latest_stable
|
||||
then
|
||||
run_gofmt
|
||||
run_golint
|
||||
run_govet
|
||||
fi
|
||||
run_unit_tests
|
||||
}
|
||||
|
||||
local_go_version_is_latest_stable() {
|
||||
go version | grep -q $LATEST_STABLE_SUPPORTED_GO_VERSION
|
||||
}
|
||||
|
||||
log_error() {
|
||||
echo "$*" 1>&2
|
||||
}
|
||||
|
||||
run_gofmt() {
|
||||
GOFMT_FILES=$(gofmt -l .)
|
||||
if [ -n "$GOFMT_FILES" ]
|
||||
then
|
||||
log_error "gofmt failed for the following files:
|
||||
$GOFMT_FILES
|
||||
|
||||
please run 'gofmt -w .' on your changes before committing."
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
run_golint() {
|
||||
GOLINT_ERRORS=$(golint ./... | grep -v "Id should be")
|
||||
if [ -n "$GOLINT_ERRORS" ]
|
||||
then
|
||||
log_error "golint failed for the following reasons:
|
||||
$GOLINT_ERRORS
|
||||
|
||||
please run 'golint ./...' on your changes before committing."
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
run_govet() {
|
||||
GOVET_ERRORS=$(go tool vet ./*.go 2>&1)
|
||||
if [ -n "$GOVET_ERRORS" ]
|
||||
then
|
||||
log_error "go vet failed for the following reasons:
|
||||
$GOVET_ERRORS
|
||||
|
||||
please run 'go tool vet ./*.go' on your changes before committing."
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
run_unit_tests() {
|
||||
if [ -z "$NOTEST" ]
|
||||
then
|
||||
log_error 'Running short tests...'
|
||||
env AMQP_URL= go test -short
|
||||
fi
|
||||
}
|
||||
|
||||
main
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,64 @@
|
||||
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
// Source code and contact info at http://github.com/streadway/amqp
|
||||
|
||||
package amqp
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Return captures a flattened struct of fields returned by the server when a
|
||||
// Publishing is unable to be delivered either due to the `mandatory` flag set
|
||||
// and no route found, or `immediate` flag set and no free consumer.
|
||||
type Return struct {
|
||||
ReplyCode uint16 // reason
|
||||
ReplyText string // description
|
||||
Exchange string // basic.publish exchange
|
||||
RoutingKey string // basic.publish routing key
|
||||
|
||||
// Properties
|
||||
ContentType string // MIME content type
|
||||
ContentEncoding string // MIME content encoding
|
||||
Headers Table // Application or header exchange table
|
||||
DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2)
|
||||
Priority uint8 // queue implementation use - 0 to 9
|
||||
CorrelationId string // application use - correlation identifier
|
||||
ReplyTo string // application use - address to to reply to (ex: RPC)
|
||||
Expiration string // implementation use - message expiration spec
|
||||
MessageId string // application use - message identifier
|
||||
Timestamp time.Time // application use - message timestamp
|
||||
Type string // application use - message type name
|
||||
UserId string // application use - creating user id
|
||||
AppId string // application use - creating application
|
||||
|
||||
Body []byte
|
||||
}
|
||||
|
||||
func newReturn(msg basicReturn) *Return {
|
||||
props, body := msg.getContent()
|
||||
|
||||
return &Return{
|
||||
ReplyCode: msg.ReplyCode,
|
||||
ReplyText: msg.ReplyText,
|
||||
Exchange: msg.Exchange,
|
||||
RoutingKey: msg.RoutingKey,
|
||||
|
||||
Headers: props.Headers,
|
||||
ContentType: props.ContentType,
|
||||
ContentEncoding: props.ContentEncoding,
|
||||
DeliveryMode: props.DeliveryMode,
|
||||
Priority: props.Priority,
|
||||
CorrelationId: props.CorrelationId,
|
||||
ReplyTo: props.ReplyTo,
|
||||
Expiration: props.Expiration,
|
||||
MessageId: props.MessageId,
|
||||
Timestamp: props.Timestamp,
|
||||
Type: props.Type,
|
||||
UserId: props.UserId,
|
||||
AppId: props.AppId,
|
||||
|
||||
Body: body,
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,176 @@
|
||||
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
// Source code and contact info at http://github.com/streadway/amqp
|
||||
|
||||
package amqp
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var errURIScheme = errors.New("AMQP scheme must be either 'amqp://' or 'amqps://'")
|
||||
var errURIWhitespace = errors.New("URI must not contain whitespace")
|
||||
|
||||
var schemePorts = map[string]int{
|
||||
"amqp": 5672,
|
||||
"amqps": 5671,
|
||||
}
|
||||
|
||||
var defaultURI = URI{
|
||||
Scheme: "amqp",
|
||||
Host: "localhost",
|
||||
Port: 5672,
|
||||
Username: "guest",
|
||||
Password: "guest",
|
||||
Vhost: "/",
|
||||
}
|
||||
|
||||
// URI represents a parsed AMQP URI string.
|
||||
type URI struct {
|
||||
Scheme string
|
||||
Host string
|
||||
Port int
|
||||
Username string
|
||||
Password string
|
||||
Vhost string
|
||||
}
|
||||
|
||||
// ParseURI attempts to parse the given AMQP URI according to the spec.
|
||||
// See http://www.rabbitmq.com/uri-spec.html.
|
||||
//
|
||||
// Default values for the fields are:
|
||||
//
|
||||
// Scheme: amqp
|
||||
// Host: localhost
|
||||
// Port: 5672
|
||||
// Username: guest
|
||||
// Password: guest
|
||||
// Vhost: /
|
||||
//
|
||||
func ParseURI(uri string) (URI, error) {
|
||||
builder := defaultURI
|
||||
|
||||
if strings.Contains(uri, " ") == true {
|
||||
return builder, errURIWhitespace
|
||||
}
|
||||
|
||||
u, err := url.Parse(uri)
|
||||
if err != nil {
|
||||
return builder, err
|
||||
}
|
||||
|
||||
defaultPort, okScheme := schemePorts[u.Scheme]
|
||||
|
||||
if okScheme {
|
||||
builder.Scheme = u.Scheme
|
||||
} else {
|
||||
return builder, errURIScheme
|
||||
}
|
||||
|
||||
host := u.Hostname()
|
||||
port := u.Port()
|
||||
|
||||
if host != "" {
|
||||
builder.Host = host
|
||||
}
|
||||
|
||||
if port != "" {
|
||||
port32, err := strconv.ParseInt(port, 10, 32)
|
||||
if err != nil {
|
||||
return builder, err
|
||||
}
|
||||
builder.Port = int(port32)
|
||||
} else {
|
||||
builder.Port = defaultPort
|
||||
}
|
||||
|
||||
if u.User != nil {
|
||||
builder.Username = u.User.Username()
|
||||
if password, ok := u.User.Password(); ok {
|
||||
builder.Password = password
|
||||
}
|
||||
}
|
||||
|
||||
if u.Path != "" {
|
||||
if strings.HasPrefix(u.Path, "/") {
|
||||
if u.Host == "" && strings.HasPrefix(u.Path, "///") {
|
||||
// net/url doesn't handle local context authorities and leaves that up
|
||||
// to the scheme handler. In our case, we translate amqp:/// into the
|
||||
// default host and whatever the vhost should be
|
||||
if len(u.Path) > 3 {
|
||||
builder.Vhost = u.Path[3:]
|
||||
}
|
||||
} else if len(u.Path) > 1 {
|
||||
builder.Vhost = u.Path[1:]
|
||||
}
|
||||
} else {
|
||||
builder.Vhost = u.Path
|
||||
}
|
||||
}
|
||||
|
||||
return builder, nil
|
||||
}
|
||||
|
||||
// PlainAuth returns a PlainAuth structure based on the parsed URI's
|
||||
// Username and Password fields.
|
||||
func (uri URI) PlainAuth() *PlainAuth {
|
||||
return &PlainAuth{
|
||||
Username: uri.Username,
|
||||
Password: uri.Password,
|
||||
}
|
||||
}
|
||||
|
||||
// AMQPlainAuth returns a PlainAuth structure based on the parsed URI's
|
||||
// Username and Password fields.
|
||||
func (uri URI) AMQPlainAuth() *AMQPlainAuth {
|
||||
return &AMQPlainAuth{
|
||||
Username: uri.Username,
|
||||
Password: uri.Password,
|
||||
}
|
||||
}
|
||||
|
||||
func (uri URI) String() string {
|
||||
authority, err := url.Parse("")
|
||||
if err != nil {
|
||||
return err.Error()
|
||||
}
|
||||
|
||||
authority.Scheme = uri.Scheme
|
||||
|
||||
if uri.Username != defaultURI.Username || uri.Password != defaultURI.Password {
|
||||
authority.User = url.User(uri.Username)
|
||||
|
||||
if uri.Password != defaultURI.Password {
|
||||
authority.User = url.UserPassword(uri.Username, uri.Password)
|
||||
}
|
||||
}
|
||||
|
||||
authority.Host = net.JoinHostPort(uri.Host, strconv.Itoa(uri.Port))
|
||||
|
||||
if defaultPort, found := schemePorts[uri.Scheme]; !found || defaultPort != uri.Port {
|
||||
authority.Host = net.JoinHostPort(uri.Host, strconv.Itoa(uri.Port))
|
||||
} else {
|
||||
// JoinHostPort() automatically add brackets to the host if it's
|
||||
// an IPv6 address.
|
||||
//
|
||||
// If not port is specified, JoinHostPort() return an IP address in the
|
||||
// form of "[::1]:", so we use TrimSuffix() to remove the extra ":".
|
||||
authority.Host = strings.TrimSuffix(net.JoinHostPort(uri.Host, ""), ":")
|
||||
}
|
||||
|
||||
if uri.Vhost != defaultURI.Vhost {
|
||||
// Make sure net/url does not double escape, e.g.
|
||||
// "%2F" does not become "%252F".
|
||||
authority.Path = uri.Vhost
|
||||
authority.RawPath = url.QueryEscape(uri.Vhost)
|
||||
} else {
|
||||
authority.Path = "/"
|
||||
}
|
||||
|
||||
return authority.String()
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue