forked from pneymrl2f/nightingale
transfer support kafka (#227)
* 修改: etc/transfer.yml 修改: go.mod 修改: go.sum 修改: src/modules/transfer/backend/init.go 新文件: src/modules/transfer/backend/kafka.go 修改: src/modules/transfer/backend/sender.go 修改: src/modules/transfer/http/routes/push_router.go 修改: src/modules/transfer/rpc/push.go 新文件: vendor/github.com/Shopify/sarama/.gitignore 新文件: vendor/github.com/Shopify/sarama/.golangci.yml 新文件: vendor/github.com/Shopify/sarama/CHANGELOG.md 新文件: vendor/github.com/Shopify/sarama/LICENSE 新文件: vendor/github.com/Shopify/sarama/Makefile 新文件: vendor/github.com/Shopify/sarama/README.md 新文件: vendor/github.com/Shopify/sarama/Vagrantfile 新文件: vendor/github.com/Shopify/sarama/acl_bindings.go 新文件: vendor/github.com/Shopify/sarama/acl_create_request.go 新文件: vendor/github.com/Shopify/sarama/acl_create_response.go 新文件: vendor/github.com/Shopify/sarama/acl_delete_request.go 新文件: vendor/github.com/Shopify/sarama/acl_delete_response.go 新文件: vendor/github.com/Shopify/sarama/acl_describe_request.go 新文件: vendor/github.com/Shopify/sarama/acl_describe_response.go 新文件: vendor/github.com/Shopify/sarama/acl_filter.go 新文件: vendor/github.com/Shopify/sarama/acl_types.go 新文件: vendor/github.com/Shopify/sarama/add_offsets_to_txn_request.go 新文件: vendor/github.com/Shopify/sarama/add_offsets_to_txn_response.go 新文件: vendor/github.com/Shopify/sarama/add_partitions_to_txn_request.go 新文件: vendor/github.com/Shopify/sarama/add_partitions_to_txn_response.go 新文件: vendor/github.com/Shopify/sarama/admin.go 新文件: vendor/github.com/Shopify/sarama/alter_configs_request.go 新文件: vendor/github.com/Shopify/sarama/alter_configs_response.go 新文件: vendor/github.com/Shopify/sarama/alter_partition_reassignments_request.go 新文件: vendor/github.com/Shopify/sarama/alter_partition_reassignments_response.go 新文件: vendor/github.com/Shopify/sarama/api_versions_request.go 新文件: vendor/github.com/Shopify/sarama/api_versions_response.go 新文件: vendor/github.com/Shopify/sarama/async_producer.go 新文件: vendor/github.com/Shopify/sarama/balance_strategy.go 新文件: vendor/github.com/Shopify/sarama/broker.go 新文件: vendor/github.com/Shopify/sarama/client.go 新文件: vendor/github.com/Shopify/sarama/compress.go 新文件: vendor/github.com/Shopify/sarama/config.go 新文件: vendor/github.com/Shopify/sarama/config_resource_type.go 新文件: vendor/github.com/Shopify/sarama/consumer.go 新文件: vendor/github.com/Shopify/sarama/consumer_group.go 新文件: vendor/github.com/Shopify/sarama/consumer_group_members.go 新文件: vendor/github.com/Shopify/sarama/consumer_metadata_request.go 新文件: vendor/github.com/Shopify/sarama/consumer_metadata_response.go 新文件: vendor/github.com/Shopify/sarama/control_record.go 新文件: vendor/github.com/Shopify/sarama/crc32_field.go 新文件: vendor/github.com/Shopify/sarama/create_partitions_request.go 新文件: vendor/github.com/Shopify/sarama/create_partitions_response.go 新文件: vendor/github.com/Shopify/sarama/create_topics_request.go 新文件: vendor/github.com/Shopify/sarama/create_topics_response.go 新文件: vendor/github.com/Shopify/sarama/decompress.go 新文件: vendor/github.com/Shopify/sarama/delete_groups_request.go 新文件: vendor/github.com/Shopify/sarama/delete_groups_response.go 新文件: vendor/github.com/Shopify/sarama/delete_records_request.go 新文件: vendor/github.com/Shopify/sarama/delete_records_response.go 新文件: vendor/github.com/Shopify/sarama/delete_topics_request.go 新文件: vendor/github.com/Shopify/sarama/delete_topics_response.go 新文件: vendor/github.com/Shopify/sarama/describe_configs_request.go 新文件: vendor/github.com/Shopify/sarama/describe_configs_response.go 新文件: vendor/github.com/Shopify/sarama/describe_groups_request.go 新文件: vendor/github.com/Shopify/sarama/describe_groups_response.go 新文件: vendor/github.com/Shopify/sarama/describe_log_dirs_request.go 新文件: vendor/github.com/Shopify/sarama/describe_log_dirs_response.go 新文件: vendor/github.com/Shopify/sarama/dev.yml 新文件: vendor/github.com/Shopify/sarama/encoder_decoder.go 新文件: vendor/github.com/Shopify/sarama/end_txn_request.go 新文件: vendor/github.com/Shopify/sarama/end_txn_response.go 新文件: vendor/github.com/Shopify/sarama/errors.go 新文件: vendor/github.com/Shopify/sarama/fetch_request.go 新文件: vendor/github.com/Shopify/sarama/fetch_response.go 新文件: vendor/github.com/Shopify/sarama/find_coordinator_request.go 新文件: vendor/github.com/Shopify/sarama/find_coordinator_response.go 新文件: vendor/github.com/Shopify/sarama/go.mod 新文件: vendor/github.com/Shopify/sarama/go.sum 新文件: vendor/github.com/Shopify/sarama/gssapi_kerberos.go 新文件: vendor/github.com/Shopify/sarama/heartbeat_request.go 新文件: vendor/github.com/Shopify/sarama/heartbeat_response.go 新文件: vendor/github.com/Shopify/sarama/init_producer_id_request.go 新文件: vendor/github.com/Shopify/sarama/init_producer_id_response.go 新文件: vendor/github.com/Shopify/sarama/join_group_request.go 新文件: vendor/github.com/Shopify/sarama/join_group_response.go 新文件: vendor/github.com/Shopify/sarama/kerberos_client.go 新文件: vendor/github.com/Shopify/sarama/leave_group_request.go 新文件: vendor/github.com/Shopify/sarama/leave_group_response.go 新文件: vendor/github.com/Shopify/sarama/length_field.go 新文件: vendor/github.com/Shopify/sarama/list_groups_request.go 新文件: vendor/github.com/Shopify/sarama/list_groups_response.go 新文件: vendor/github.com/Shopify/sarama/list_partition_reassignments_request.go 新文件: vendor/github.com/Shopify/sarama/list_partition_reassignments_response.go 新文件: vendor/github.com/Shopify/sarama/message.go 新文件: vendor/github.com/Shopify/sarama/message_set.go 新文件: vendor/github.com/Shopify/sarama/metadata_request.go 新文件: vendor/github.com/Shopify/sarama/metadata_response.go 新文件: vendor/github.com/Shopify/sarama/metrics.go 新文件: vendor/github.com/Shopify/sarama/mockbroker.go 新文件: vendor/github.com/Shopify/sarama/mockkerberos.go 新文件: vendor/github.com/Shopify/sarama/mockresponses.go 新文件: vendor/github.com/Shopify/sarama/offset_commit_request.go 新文件: vendor/github.com/Shopify/sarama/offset_commit_response.go 新文件: vendor/github.com/Shopify/sarama/offset_fetch_request.go 新文件: vendor/github.com/Shopify/sarama/offset_fetch_response.go 新文件: vendor/github.com/Shopify/sarama/offset_manager.go 新文件: vendor/github.com/Shopify/sarama/offset_request.go 新文件: vendor/github.com/Shopify/sarama/offset_response.go 新文件: vendor/github.com/Shopify/sarama/packet_decoder.go 新文件: vendor/github.com/Shopify/sarama/packet_encoder.go 新文件: vendor/github.com/Shopify/sarama/partitioner.go 新文件: vendor/github.com/Shopify/sarama/prep_encoder.go 新文件: vendor/github.com/Shopify/sarama/produce_request.go 新文件: vendor/github.com/Shopify/sarama/produce_response.go 新文件: vendor/github.com/Shopify/sarama/produce_set.go 新文件: vendor/github.com/Shopify/sarama/real_decoder.go 新文件: vendor/github.com/Shopify/sarama/real_encoder.go 新文件: vendor/github.com/Shopify/sarama/record.go 新文件: vendor/github.com/Shopify/sarama/record_batch.go 新文件: vendor/github.com/Shopify/sarama/records.go 新文件: vendor/github.com/Shopify/sarama/request.go 新文件: vendor/github.com/Shopify/sarama/response_header.go 新文件: vendor/github.com/Shopify/sarama/sarama.go 新文件: vendor/github.com/Shopify/sarama/sasl_authenticate_request.go 新文件: vendor/github.com/Shopify/sarama/sasl_authenticate_response.go 新文件: vendor/github.com/Shopify/sarama/sasl_handshake_request.go 新文件: vendor/github.com/Shopify/sarama/sasl_handshake_response.go 新文件: vendor/github.com/Shopify/sarama/sticky_assignor_user_data.go 新文件: vendor/github.com/Shopify/sarama/sync_group_request.go 新文件: vendor/github.com/Shopify/sarama/sync_group_response.go 新文件: vendor/github.com/Shopify/sarama/sync_producer.go 新文件: vendor/github.com/Shopify/sarama/timestamp.go 新文件: vendor/github.com/Shopify/sarama/txn_offset_commit_request.go 新文件: vendor/github.com/Shopify/sarama/txn_offset_commit_response.go 新文件: vendor/github.com/Shopify/sarama/utils.go 新文件: vendor/github.com/Shopify/sarama/zstd.go 新文件: vendor/github.com/eapache/go-resiliency/LICENSE 新文件: vendor/github.com/eapache/go-resiliency/breaker/README.md 新文件: vendor/github.com/eapache/go-resiliency/breaker/breaker.go 新文件: vendor/github.com/eapache/go-xerial-snappy/.gitignore 新文件: vendor/github.com/eapache/go-xerial-snappy/.travis.yml 新文件: vendor/github.com/eapache/go-xerial-snappy/LICENSE 新文件: vendor/github.com/eapache/go-xerial-snappy/README.md 新文件: vendor/github.com/eapache/go-xerial-snappy/fuzz.go 新文件: vendor/github.com/eapache/go-xerial-snappy/snappy.go 新文件: vendor/github.com/eapache/queue/.gitignore 新文件: vendor/github.com/eapache/queue/.travis.yml 新文件: vendor/github.com/eapache/queue/LICENSE 新文件: vendor/github.com/eapache/queue/README.md 新文件: vendor/github.com/eapache/queue/queue.go 新文件: vendor/github.com/golang/snappy/.gitignore 新文件: vendor/github.com/golang/snappy/AUTHORS 新文件: vendor/github.com/golang/snappy/CONTRIBUTORS 新文件: vendor/github.com/golang/snappy/LICENSE 新文件: vendor/github.com/golang/snappy/README 新文件: vendor/github.com/golang/snappy/decode.go 新文件: vendor/github.com/golang/snappy/decode_amd64.go 新文件: vendor/github.com/golang/snappy/decode_amd64.s 新文件: vendor/github.com/golang/snappy/decode_other.go 新文件: vendor/github.com/golang/snappy/encode.go 新文件: vendor/github.com/golang/snappy/encode_amd64.go 新文件: vendor/github.com/golang/snappy/encode_amd64.s 新文件: vendor/github.com/golang/snappy/encode_other.go 新文件: vendor/github.com/golang/snappy/go.mod 新文件: vendor/github.com/golang/snappy/snappy.go 新文件: vendor/github.com/hashicorp/go-uuid/.travis.yml 新文件: vendor/github.com/hashicorp/go-uuid/LICENSE 新文件: vendor/github.com/hashicorp/go-uuid/README.md 新文件: vendor/github.com/hashicorp/go-uuid/go.mod 新文件: vendor/github.com/hashicorp/go-uuid/uuid.go 新文件: vendor/github.com/jcmturner/gofork/LICENSE 新文件: vendor/github.com/jcmturner/gofork/encoding/asn1/README.md 新文件: vendor/github.com/jcmturner/gofork/encoding/asn1/asn1.go 新文件: vendor/github.com/jcmturner/gofork/encoding/asn1/common.go 新文件: vendor/github.com/jcmturner/gofork/encoding/asn1/marshal.go 新文件: vendor/github.com/jcmturner/gofork/x/crypto/pbkdf2/pbkdf2.go 新文件: vendor/github.com/klauspost/compress/LICENSE 新文件: vendor/github.com/klauspost/compress/fse/README.md 新文件: vendor/github.com/klauspost/compress/fse/bitreader.go 新文件: vendor/github.com/klauspost/compress/fse/bitwriter.go 新文件: vendor/github.com/klauspost/compress/fse/bytereader.go 新文件: vendor/github.com/klauspost/compress/fse/compress.go 新文件: vendor/github.com/klauspost/compress/fse/decompress.go 新文件: vendor/github.com/klauspost/compress/fse/fse.go 新文件: vendor/github.com/klauspost/compress/huff0/.gitignore 新文件: vendor/github.com/klauspost/compress/huff0/README.md 新文件: vendor/github.com/klauspost/compress/huff0/bitreader.go 新文件: vendor/github.com/klauspost/compress/huff0/bitwriter.go 新文件: vendor/github.com/klauspost/compress/huff0/bytereader.go 新文件: vendor/github.com/klauspost/compress/huff0/compress.go 新文件: vendor/github.com/klauspost/compress/huff0/decompress.go 新文件: vendor/github.com/klauspost/compress/huff0/huff0.go 新文件: vendor/github.com/klauspost/compress/snappy/.gitignore 新文件: vendor/github.com/klauspost/compress/snappy/AUTHORS 新文件: vendor/github.com/klauspost/compress/snappy/CONTRIBUTORS 新文件: vendor/github.com/klauspost/compress/snappy/LICENSE 新文件: vendor/github.com/klauspost/compress/snappy/README 新文件: vendor/github.com/klauspost/compress/snappy/decode.go 新文件: vendor/github.com/klauspost/compress/snappy/decode_amd64.go 新文件: vendor/github.com/klauspost/compress/snappy/decode_amd64.s 新文件: vendor/github.com/klauspost/compress/snappy/decode_other.go 新文件: vendor/github.com/klauspost/compress/snappy/encode.go 新文件: vendor/github.com/klauspost/compress/snappy/encode_amd64.go 新文件: vendor/github.com/klauspost/compress/snappy/encode_amd64.s 新文件: vendor/github.com/klauspost/compress/snappy/encode_other.go 新文件: vendor/github.com/klauspost/compress/snappy/runbench.cmd 新文件: vendor/github.com/klauspost/compress/snappy/snappy.go 新文件: vendor/github.com/klauspost/compress/zstd/README.md 新文件: vendor/github.com/klauspost/compress/zstd/bitreader.go 新文件: vendor/github.com/klauspost/compress/zstd/bitwriter.go 新文件: vendor/github.com/klauspost/compress/zstd/blockdec.go 新文件: vendor/github.com/klauspost/compress/zstd/blockenc.go 新文件: vendor/github.com/klauspost/compress/zstd/blocktype_string.go 新文件: vendor/github.com/klauspost/compress/zstd/bytebuf.go 新文件: vendor/github.com/klauspost/compress/zstd/bytereader.go 新文件: vendor/github.com/klauspost/compress/zstd/decoder.go 新文件: vendor/github.com/klauspost/compress/zstd/decoder_options.go 新文件: vendor/github.com/klauspost/compress/zstd/enc_dfast.go 新文件: vendor/github.com/klauspost/compress/zstd/enc_fast.go 新文件: vendor/github.com/klauspost/compress/zstd/enc_params.go 新文件: vendor/github.com/klauspost/compress/zstd/encoder.go 新文件: vendor/github.com/klauspost/compress/zstd/encoder_options.go 新文件: vendor/github.com/klauspost/compress/zstd/framedec.go 新文件: vendor/github.com/klauspost/compress/zstd/frameenc.go 新文件: vendor/github.com/klauspost/compress/zstd/fse_decoder.go 新文件: vendor/github.com/klauspost/compress/zstd/fse_encoder.go 新文件: vendor/github.com/klauspost/compress/zstd/fse_predefined.go 新文件: vendor/github.com/klauspost/compress/zstd/hash.go 新文件: vendor/github.com/klauspost/compress/zstd/history.go 新文件: vendor/github.com/klauspost/compress/zstd/internal/xxhash/LICENSE.txt 新文件: vendor/github.com/klauspost/compress/zstd/internal/xxhash/README.md 新文件: vendor/github.com/klauspost/compress/zstd/internal/xxhash/xxhash.go 新文件: vendor/github.com/klauspost/compress/zstd/internal/xxhash/xxhash_amd64.go 新文件: vendor/github.com/klauspost/compress/zstd/internal/xxhash/xxhash_amd64.s 新文件: vendor/github.com/klauspost/compress/zstd/internal/xxhash/xxhash_other.go 新文件: vendor/github.com/klauspost/compress/zstd/internal/xxhash/xxhash_safe.go 新文件: vendor/github.com/klauspost/compress/zstd/seqdec.go 新文件: vendor/github.com/klauspost/compress/zstd/seqenc.go 新文件: vendor/github.com/klauspost/compress/zstd/snappy.go 新文件: vendor/github.com/klauspost/compress/zstd/zstd.go 新文件: vendor/github.com/pierrec/lz4/.gitignore 新文件: vendor/github.com/pierrec/lz4/.travis.yml 新文件: vendor/github.com/pierrec/lz4/LICENSE 新文件: vendor/github.com/pierrec/lz4/README.md 新文件: vendor/github.com/pierrec/lz4/block.go 新文件: vendor/github.com/pierrec/lz4/debug.go 新文件: vendor/github.com/pierrec/lz4/debug_stub.go 新文件: vendor/github.com/pierrec/lz4/decode_amd64.go 新文件: vendor/github.com/pierrec/lz4/decode_amd64.s 新文件: vendor/github.com/pierrec/lz4/decode_other.go 新文件: vendor/github.com/pierrec/lz4/errors.go 新文件: vendor/github.com/pierrec/lz4/internal/xxh32/xxh32zero.go 新文件: vendor/github.com/pierrec/lz4/lz4.go 新文件: vendor/github.com/pierrec/lz4/lz4_go1.10.go 新文件: vendor/github.com/pierrec/lz4/lz4_notgo1.10.go 新文件: vendor/github.com/pierrec/lz4/reader.go 新文件: vendor/github.com/pierrec/lz4/writer.go 新文件: vendor/github.com/rcrowley/go-metrics/.gitignore 新文件: vendor/github.com/rcrowley/go-metrics/.travis.yml 新文件: vendor/github.com/rcrowley/go-metrics/LICENSE 新文件: vendor/github.com/rcrowley/go-metrics/README.md 新文件: vendor/github.com/rcrowley/go-metrics/counter.go 新文件: vendor/github.com/rcrowley/go-metrics/debug.go 新文件: vendor/github.com/rcrowley/go-metrics/ewma.go 新文件: vendor/github.com/rcrowley/go-metrics/gauge.go 新文件: vendor/github.com/rcrowley/go-metrics/gauge_float64.go 新文件: vendor/github.com/rcrowley/go-metrics/graphite.go 新文件: vendor/github.com/rcrowley/go-metrics/healthcheck.go 新文件: vendor/github.com/rcrowley/go-metrics/histogram.go 新文件: vendor/github.com/rcrowley/go-metrics/json.go 新文件: vendor/github.com/rcrowley/go-metrics/log.go 新文件: vendor/github.com/rcrowley/go-metrics/memory.md 新文件: vendor/github.com/rcrowley/go-metrics/meter.go 新文件: vendor/github.com/rcrowley/go-metrics/metrics.go 新文件: vendor/github.com/rcrowley/go-metrics/opentsdb.go 新文件: vendor/github.com/rcrowley/go-metrics/registry.go 新文件: vendor/github.com/rcrowley/go-metrics/runtime.go 新文件: vendor/github.com/rcrowley/go-metrics/runtime_cgo.go 新文件: vendor/github.com/rcrowley/go-metrics/runtime_gccpufraction.go 新文件: vendor/github.com/rcrowley/go-metrics/runtime_no_cgo.go 新文件: vendor/github.com/rcrowley/go-metrics/runtime_no_gccpufraction.go 新文件: vendor/github.com/rcrowley/go-metrics/sample.go 新文件: vendor/github.com/rcrowley/go-metrics/syslog.go 新文件: vendor/github.com/rcrowley/go-metrics/timer.go 新文件: vendor/github.com/rcrowley/go-metrics/validate.sh 新文件: vendor/github.com/rcrowley/go-metrics/writer.go 删除: vendor/github.com/shirou/gopsutil/mem/types_openbsd.go 删除: vendor/github.com/shirou/gopsutil/process/types_darwin.go 删除: vendor/github.com/shirou/gopsutil/process/types_freebsd.go 删除: vendor/github.com/shirou/gopsutil/process/types_openbsd.go 删除: vendor/github.com/ugorji/go/codec/xml.go 新文件: vendor/golang.org/x/crypto/AUTHORS 新文件: vendor/golang.org/x/crypto/CONTRIBUTORS 新文件: vendor/golang.org/x/crypto/LICENSE 新文件: vendor/golang.org/x/crypto/PATENTS 新文件: vendor/golang.org/x/crypto/md4/md4.go 新文件: vendor/golang.org/x/crypto/md4/md4block.go 新文件: vendor/golang.org/x/crypto/pbkdf2/pbkdf2.go 新文件: vendor/golang.org/x/net/AUTHORS 新文件: vendor/golang.org/x/net/CONTRIBUTORS 新文件: vendor/golang.org/x/net/LICENSE 新文件: vendor/golang.org/x/net/PATENTS 新文件: vendor/golang.org/x/net/internal/socks/client.go 新文件: vendor/golang.org/x/net/internal/socks/socks.go 新文件: vendor/golang.org/x/net/proxy/dial.go 新文件: vendor/golang.org/x/net/proxy/direct.go 新文件: vendor/golang.org/x/net/proxy/per_host.go 新文件: vendor/golang.org/x/net/proxy/proxy.go 新文件: vendor/golang.org/x/net/proxy/socks5.go 删除: vendor/golang.org/x/sys/unix/mkasm_darwin.go 删除: vendor/golang.org/x/sys/unix/mkpost.go 删除: vendor/golang.org/x/sys/unix/mksyscall.go 删除: vendor/golang.org/x/sys/unix/mksyscall_aix_ppc.go 删除: vendor/golang.org/x/sys/unix/mksyscall_aix_ppc64.go 删除: vendor/golang.org/x/sys/unix/mksyscall_solaris.go 删除: vendor/golang.org/x/sys/unix/mksysctl_openbsd.go 删除: vendor/golang.org/x/sys/unix/mksysnum.go 删除: vendor/golang.org/x/sys/unix/types_aix.go 删除: vendor/golang.org/x/sys/unix/types_darwin.go 删除: vendor/golang.org/x/sys/unix/types_dragonfly.go 删除: vendor/golang.org/x/sys/unix/types_freebsd.go 删除: vendor/golang.org/x/sys/unix/types_netbsd.go 删除: vendor/golang.org/x/sys/unix/types_openbsd.go 删除: vendor/golang.org/x/sys/unix/types_solaris.go 删除: vendor/golang.org/x/text/unicode/norm/maketables.go 删除: vendor/golang.org/x/text/unicode/norm/triegen.go 删除: vendor/golang.org/x/tools/go/gcexportdata/main.go 新文件: vendor/gopkg.in/jcmturner/aescts.v1/.gitignore 新文件: vendor/gopkg.in/jcmturner/aescts.v1/LICENSE 新文件: vendor/gopkg.in/jcmturner/aescts.v1/README.md 新文件: vendor/gopkg.in/jcmturner/aescts.v1/aescts.go 新文件: vendor/gopkg.in/jcmturner/dnsutils.v1/.gitignore 新文件: vendor/gopkg.in/jcmturner/dnsutils.v1/.travis.yml 新文件: vendor/gopkg.in/jcmturner/dnsutils.v1/LICENSE 新文件: vendor/gopkg.in/jcmturner/dnsutils.v1/srv.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/LICENSE 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/asn1tools/tools.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/client/ASExchange.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/client/TGSExchange.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/client/cache.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/client/client.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/client/network.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/client/passwd.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/client/session.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/client/settings.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/config/error.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/config/hosts.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/config/krb5conf.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/credentials/ccache.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/credentials/credentials.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/aes128-cts-hmac-sha1-96.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/aes128-cts-hmac-sha256-128.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/aes256-cts-hmac-sha1-96.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/aes256-cts-hmac-sha384-192.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/common/common.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/crypto.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/des3-cbc-sha1-kd.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/etype/etype.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/rc4-hmac.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/rfc3961/encryption.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/rfc3961/keyDerivation.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/rfc3961/nfold.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/rfc3962/encryption.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/rfc3962/keyDerivation.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/rfc4757/checksum.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/rfc4757/encryption.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/rfc4757/keyDerivation.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/rfc4757/msgtype.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/rfc8009/encryption.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/crypto/rfc8009/keyDerivation.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/gssapi/MICToken.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/gssapi/README.md 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/gssapi/contextFlags.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/gssapi/gssapi.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/gssapi/wrapToken.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/iana/addrtype/constants.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/iana/adtype/constants.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/iana/asnAppTag/constants.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/iana/chksumtype/constants.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/iana/constants.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/iana/errorcode/constants.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/iana/etypeID/constants.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/iana/flags/constants.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/iana/keyusage/constants.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/iana/msgtype/constants.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/iana/nametype/constants.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/iana/patype/constants.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/kadmin/changepasswddata.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/kadmin/message.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/kadmin/passwd.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/keytab/keytab.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/krberror/error.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/messages/APRep.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/messages/APReq.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/messages/KDCRep.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/messages/KDCReq.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/messages/KRBCred.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/messages/KRBError.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/messages/KRBPriv.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/messages/KRBSafe.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/messages/Ticket.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/pac/client_claims.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/pac/client_info.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/pac/credentials_info.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/pac/device_claims.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/pac/device_info.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/pac/kerb_validation_info.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/pac/pac_type.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/pac/s4u_delegation_info.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/pac/signature_data.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/pac/supplemental_cred.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/pac/upn_dns_info.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/types/Authenticator.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/types/AuthorizationData.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/types/Cryptosystem.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/types/HostAddress.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/types/KerberosFlags.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/types/PAData.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/types/PrincipalName.go 新文件: vendor/gopkg.in/jcmturner/gokrb5.v7/types/TypedData.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/LICENSE 新文件: vendor/gopkg.in/jcmturner/rpc.v1/mstypes/claims.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/mstypes/common.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/mstypes/filetime.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/mstypes/group_membership.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/mstypes/kerb_sid_and_attributes.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/mstypes/reader.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/mstypes/rpc_unicode_string.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/mstypes/sid.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/mstypes/user_session_key.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/ndr/arrays.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/ndr/decoder.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/ndr/error.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/ndr/header.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/ndr/pipe.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/ndr/primitives.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/ndr/rawbytes.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/ndr/strings.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/ndr/tags.go 新文件: vendor/gopkg.in/jcmturner/rpc.v1/ndr/union.go 修改: vendor/gopkg.in/yaml.v2/.travis.yml 修改: vendor/gopkg.in/yaml.v2/decode.go 修改: vendor/gopkg.in/yaml.v2/scannerc.go 修改: vendor/gopkg.in/yaml.v2/yaml.go 修改: vendor/gopkg.in/yaml.v2/yamlh.go 修改: vendor/modules.txt * Update sender.go * Update sender.go * Update kafka.go * Update kafka.go Co-authored-by: 马涛 <matao@staff.sina.com.cn>master
parent
7d5d791376
commit
163c116871
@ -0,0 +1,119 @@
|
||||
package backend
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type KafkaData map[string]interface{}
|
||||
type KfClient struct {
|
||||
producer sarama.AsyncProducer
|
||||
cfg *sarama.Config
|
||||
Topic string
|
||||
BrokersPeers []string
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
func NewKfClient(c KafkaSection) (kafkaSender *KfClient, err error) {
|
||||
topic := c.Topic
|
||||
if len(topic) == 0 {
|
||||
err = errors.New("topic is nil")
|
||||
return
|
||||
}
|
||||
brokers := strings.Split(c.BrokersPeers, ",")
|
||||
if len(brokers) == 0 {
|
||||
err = errors.New("brokers is nil")
|
||||
return
|
||||
}
|
||||
hostName, _ := os.Hostname()
|
||||
|
||||
cfg := sarama.NewConfig()
|
||||
cfg.Producer.Return.Successes = true
|
||||
cfg.Producer.Return.Errors = true
|
||||
if len(hostName) > 0 {
|
||||
cfg.ClientID = hostName
|
||||
}
|
||||
cfg.Producer.Partitioner = func(topic string) sarama.Partitioner { return sarama.NewRoundRobinPartitioner(topic) }
|
||||
if len(c.SaslUser) > 0 && len(c.SaslPasswd) > 0 {
|
||||
cfg.Net.SASL.Enable = true
|
||||
cfg.Net.SASL.User = c.SaslUser
|
||||
cfg.Net.SASL.Password = c.SaslPasswd
|
||||
}
|
||||
if c.Retry > 0 {
|
||||
cfg.Producer.Retry.Max = c.Retry
|
||||
}
|
||||
|
||||
cfg.Net.DialTimeout = time.Duration(connTimeout) * time.Millisecond
|
||||
|
||||
if c.KeepAlive > 0 {
|
||||
cfg.Net.KeepAlive = time.Duration(c.KeepAlive) * time.Millisecond
|
||||
}
|
||||
producer, err := sarama.NewAsyncProducer(brokers, cfg)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
kafkaSender = newSender(brokers, topic, cfg, producer)
|
||||
return
|
||||
}
|
||||
func newSender(brokers []string, topic string, cfg *sarama.Config, producer sarama.AsyncProducer) (kf *KfClient) {
|
||||
kf = &KfClient{
|
||||
producer: producer,
|
||||
Topic: topic,
|
||||
BrokersPeers: brokers,
|
||||
ticker: time.NewTicker(time.Millisecond * time.Duration(callTimeout)),
|
||||
}
|
||||
go kf.readMessageToErrorChan()
|
||||
return
|
||||
}
|
||||
|
||||
func (kf *KfClient) readMessageToErrorChan() {
|
||||
var producer = kf.producer
|
||||
for {
|
||||
select {
|
||||
case <-producer.Successes():
|
||||
case errMsg := <-producer.Errors():
|
||||
msg, _ := errMsg.Msg.Value.Encode()
|
||||
logger.Errorf("ReadMessageToErrorChan err:%v %v", errMsg.Error(), string(msg))
|
||||
}
|
||||
}
|
||||
}
|
||||
func (kf *KfClient) Send(data KafkaData) error {
|
||||
var producer = kf.producer
|
||||
message, err := kf.getEventMessage(data)
|
||||
if err != nil {
|
||||
logger.Errorf("Dropping event: %v", err)
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case producer.Input() <- message:
|
||||
case <-kf.ticker.C:
|
||||
return fmt.Errorf("send kafka failed:%v[%v]", kf.Topic, kf.BrokersPeers)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kf *KfClient) Close() error {
|
||||
logger.Infof("kafka sender(%s) was closed", kf.Topic, kf.BrokersPeers)
|
||||
_ = kf.producer.Close()
|
||||
kf.producer = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kf *KfClient) getEventMessage(event map[string]interface{}) (pm *sarama.ProducerMessage, err error) {
|
||||
value, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
pm = &sarama.ProducerMessage{
|
||||
Topic: kf.Topic,
|
||||
Value: sarama.StringEncoder(string(value)),
|
||||
}
|
||||
return
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
# Compiled Object files, Static and Dynamic libs (Shared Objects)
|
||||
*.o
|
||||
*.a
|
||||
*.so
|
||||
*.test
|
||||
|
||||
# Folders
|
||||
_obj
|
||||
_test
|
||||
.vagrant
|
||||
|
||||
# Architecture specific extensions/prefixes
|
||||
*.[568vq]
|
||||
[568vq].out
|
||||
|
||||
*.cgo1.go
|
||||
*.cgo2.c
|
||||
_cgo_defun.c
|
||||
_cgo_gotypes.go
|
||||
_cgo_export.*
|
||||
|
||||
_testmain.go
|
||||
|
||||
*.exe
|
||||
|
||||
coverage.txt
|
||||
profile.out
|
@ -0,0 +1,74 @@
|
||||
run:
|
||||
timeout: 5m
|
||||
deadline: 10m
|
||||
|
||||
linters-settings:
|
||||
govet:
|
||||
check-shadowing: false
|
||||
golint:
|
||||
min-confidence: 0
|
||||
gocyclo:
|
||||
min-complexity: 99
|
||||
maligned:
|
||||
suggest-new: true
|
||||
dupl:
|
||||
threshold: 100
|
||||
goconst:
|
||||
min-len: 2
|
||||
min-occurrences: 3
|
||||
misspell:
|
||||
locale: US
|
||||
goimports:
|
||||
local-prefixes: github.com/Shopify/sarama
|
||||
gocritic:
|
||||
enabled-tags:
|
||||
- diagnostic
|
||||
- experimental
|
||||
- opinionated
|
||||
- performance
|
||||
- style
|
||||
disabled-checks:
|
||||
- wrapperFunc
|
||||
- ifElseChain
|
||||
funlen:
|
||||
lines: 300
|
||||
statements: 300
|
||||
|
||||
linters:
|
||||
disable-all: true
|
||||
enable:
|
||||
- bodyclose
|
||||
- deadcode
|
||||
- depguard
|
||||
- dogsled
|
||||
# - dupl
|
||||
- errcheck
|
||||
- funlen
|
||||
# - gocritic
|
||||
- gocyclo
|
||||
- gofmt
|
||||
- goimports
|
||||
# - golint
|
||||
- gosec
|
||||
# - gosimple
|
||||
- govet
|
||||
# - ineffassign
|
||||
- interfacer
|
||||
# - misspell
|
||||
# - nakedret
|
||||
# - scopelint
|
||||
# - staticcheck
|
||||
- structcheck
|
||||
# - stylecheck
|
||||
- typecheck
|
||||
- unconvert
|
||||
- unused
|
||||
- varcheck
|
||||
- whitespace
|
||||
# - goconst
|
||||
# - gochecknoinits
|
||||
|
||||
issues:
|
||||
exclude:
|
||||
- consider giving a name to these results
|
||||
- include an explanation for nolint directive
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,20 @@
|
||||
Copyright (c) 2013 Shopify
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining
|
||||
a copy of this software and associated documentation files (the
|
||||
"Software"), to deal in the Software without restriction, including
|
||||
without limitation the rights to use, copy, modify, merge, publish,
|
||||
distribute, sublicense, and/or sell copies of the Software, and to
|
||||
permit persons to whom the Software is furnished to do so, subject to
|
||||
the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be
|
||||
included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
|
||||
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
||||
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
||||
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
@ -0,0 +1,27 @@
|
||||
default: fmt get update test lint
|
||||
|
||||
GO := GO111MODULE=on GOPRIVATE=github.com/linkedin GOSUMDB=off go
|
||||
GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
|
||||
GOTEST := $(GO) test -gcflags='-l' -p 3 -v -race -timeout 6m -coverprofile=profile.out -covermode=atomic
|
||||
|
||||
FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -not -name '*_test.go')
|
||||
TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go')
|
||||
|
||||
get:
|
||||
$(GO) get ./...
|
||||
$(GO) mod verify
|
||||
$(GO) mod tidy
|
||||
|
||||
update:
|
||||
$(GO) get -u -v all
|
||||
$(GO) mod verify
|
||||
$(GO) mod tidy
|
||||
|
||||
fmt:
|
||||
gofmt -s -l -w $(FILES) $(TESTS)
|
||||
|
||||
lint:
|
||||
golangci-lint run
|
||||
|
||||
test:
|
||||
$(GOTEST) ./...
|
@ -0,0 +1,36 @@
|
||||
# sarama
|
||||
|
||||
[![GoDoc](https://godoc.org/github.com/Shopify/sarama?status.svg)](https://godoc.org/github.com/Shopify/sarama)
|
||||
[![Build Status](https://travis-ci.org/Shopify/sarama.svg?branch=master)](https://travis-ci.org/Shopify/sarama)
|
||||
[![Coverage](https://codecov.io/gh/Shopify/sarama/branch/master/graph/badge.svg)](https://codecov.io/gh/Shopify/sarama)
|
||||
|
||||
Sarama is an MIT-licensed Go client library for [Apache Kafka](https://kafka.apache.org/) version 0.8 (and later).
|
||||
|
||||
## Getting started
|
||||
|
||||
- API documentation and examples are available via [godoc](https://godoc.org/github.com/Shopify/sarama).
|
||||
- Mocks for testing are available in the [mocks](./mocks) subpackage.
|
||||
- The [examples](./examples) directory contains more elaborate example applications.
|
||||
- The [tools](./tools) directory contains command line tools that can be useful for testing, diagnostics, and instrumentation.
|
||||
|
||||
You might also want to look at the [Frequently Asked Questions](https://github.com/Shopify/sarama/wiki/Frequently-Asked-Questions).
|
||||
|
||||
## Compatibility and API stability
|
||||
|
||||
Sarama provides a "2 releases + 2 months" compatibility guarantee: we support
|
||||
the two latest stable releases of Kafka and Go, and we provide a two month
|
||||
grace period for older releases. This means we currently officially support
|
||||
Go 1.12 through 1.14, and Kafka 2.1 through 2.4, although older releases are
|
||||
still likely to work.
|
||||
|
||||
Sarama follows semantic versioning and provides API stability via the gopkg.in service.
|
||||
You can import a version with a guaranteed stable API via http://gopkg.in/Shopify/sarama.v1.
|
||||
A changelog is available [here](CHANGELOG.md).
|
||||
|
||||
## Contributing
|
||||
|
||||
- Get started by checking our [contribution guidelines](https://github.com/Shopify/sarama/blob/master/.github/CONTRIBUTING.md).
|
||||
- Read the [Sarama wiki](https://github.com/Shopify/sarama/wiki) for more technical and design details.
|
||||
- The [Kafka Protocol Specification](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) contains a wealth of useful information.
|
||||
- For more general issues, there is [a google group](https://groups.google.com/forum/#!forum/kafka-clients) for Kafka client developers.
|
||||
- If you have any questions, just ask!
|
@ -0,0 +1,14 @@
|
||||
# We have 5 * 192MB ZK processes and 5 * 320MB Kafka processes => 2560MB
|
||||
MEMORY = 3072
|
||||
|
||||
Vagrant.configure("2") do |config|
|
||||
config.vm.box = "ubuntu/bionic64"
|
||||
|
||||
config.vm.provision :shell, path: "vagrant/provision.sh"
|
||||
|
||||
config.vm.network "private_network", ip: "192.168.100.67"
|
||||
|
||||
config.vm.provider "virtualbox" do |v|
|
||||
v.memory = MEMORY
|
||||
end
|
||||
end
|
@ -0,0 +1,138 @@
|
||||
package sarama
|
||||
|
||||
//Resource holds information about acl resource type
|
||||
type Resource struct {
|
||||
ResourceType AclResourceType
|
||||
ResourceName string
|
||||
ResourcePatternType AclResourcePatternType
|
||||
}
|
||||
|
||||
func (r *Resource) encode(pe packetEncoder, version int16) error {
|
||||
pe.putInt8(int8(r.ResourceType))
|
||||
|
||||
if err := pe.putString(r.ResourceName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if version == 1 {
|
||||
if r.ResourcePatternType == AclPatternUnknown {
|
||||
Logger.Print("Cannot encode an unknown resource pattern type, using Literal instead")
|
||||
r.ResourcePatternType = AclPatternLiteral
|
||||
}
|
||||
pe.putInt8(int8(r.ResourcePatternType))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Resource) decode(pd packetDecoder, version int16) (err error) {
|
||||
resourceType, err := pd.getInt8()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.ResourceType = AclResourceType(resourceType)
|
||||
|
||||
if r.ResourceName, err = pd.getString(); err != nil {
|
||||
return err
|
||||
}
|
||||
if version == 1 {
|
||||
pattern, err := pd.getInt8()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.ResourcePatternType = AclResourcePatternType(pattern)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//Acl holds information about acl type
|
||||
type Acl struct {
|
||||
Principal string
|
||||
Host string
|
||||
Operation AclOperation
|
||||
PermissionType AclPermissionType
|
||||
}
|
||||
|
||||
func (a *Acl) encode(pe packetEncoder) error {
|
||||
if err := pe.putString(a.Principal); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := pe.putString(a.Host); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pe.putInt8(int8(a.Operation))
|
||||
pe.putInt8(int8(a.PermissionType))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Acl) decode(pd packetDecoder, version int16) (err error) {
|
||||
if a.Principal, err = pd.getString(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if a.Host, err = pd.getString(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
operation, err := pd.getInt8()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
a.Operation = AclOperation(operation)
|
||||
|
||||
permissionType, err := pd.getInt8()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
a.PermissionType = AclPermissionType(permissionType)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//ResourceAcls is an acl resource type
|
||||
type ResourceAcls struct {
|
||||
Resource
|
||||
Acls []*Acl
|
||||
}
|
||||
|
||||
func (r *ResourceAcls) encode(pe packetEncoder, version int16) error {
|
||||
if err := r.Resource.encode(pe, version); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := pe.putArrayLength(len(r.Acls)); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, acl := range r.Acls {
|
||||
if err := acl.encode(pe); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ResourceAcls) decode(pd packetDecoder, version int16) error {
|
||||
if err := r.Resource.decode(pd, version); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n, err := pd.getArrayLength()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.Acls = make([]*Acl, n)
|
||||
for i := 0; i < n; i++ {
|
||||
r.Acls[i] = new(Acl)
|
||||
if err := r.Acls[i].decode(pd, version); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -0,0 +1,89 @@
|
||||
package sarama
|
||||
|
||||
//CreateAclsRequest is an acl creation request
|
||||
type CreateAclsRequest struct {
|
||||
Version int16
|
||||
AclCreations []*AclCreation
|
||||
}
|
||||
|
||||
func (c *CreateAclsRequest) encode(pe packetEncoder) error {
|
||||
if err := pe.putArrayLength(len(c.AclCreations)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, aclCreation := range c.AclCreations {
|
||||
if err := aclCreation.encode(pe, c.Version); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *CreateAclsRequest) decode(pd packetDecoder, version int16) (err error) {
|
||||
c.Version = version
|
||||
n, err := pd.getArrayLength()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.AclCreations = make([]*AclCreation, n)
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
c.AclCreations[i] = new(AclCreation)
|
||||
if err := c.AclCreations[i].decode(pd, version); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *CreateAclsRequest) key() int16 {
|
||||
return 30
|
||||
}
|
||||
|
||||
func (c *CreateAclsRequest) version() int16 {
|
||||
return c.Version
|
||||
}
|
||||
|
||||
func (c *CreateAclsRequest) headerVersion() int16 {
|
||||
return 1
|
||||
}
|
||||
|
||||
func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
|
||||
switch c.Version {
|
||||
case 1:
|
||||
return V2_0_0_0
|
||||
default:
|
||||
return V0_11_0_0
|
||||
}
|
||||
}
|
||||
|
||||
//AclCreation is a wrapper around Resource and Acl type
|
||||
type AclCreation struct {
|
||||
Resource
|
||||
Acl
|
||||
}
|
||||
|
||||
func (a *AclCreation) encode(pe packetEncoder, version int16) error {
|
||||
if err := a.Resource.encode(pe, version); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := a.Acl.encode(pe); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *AclCreation) decode(pd packetDecoder, version int16) (err error) {
|
||||
if err := a.Resource.decode(pd, version); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := a.Acl.decode(pd, version); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -0,0 +1,94 @@
|
||||
package sarama
|
||||
|
||||
import "time"
|
||||
|
||||
//CreateAclsResponse is a an acl response creation type
|
||||
type CreateAclsResponse struct {
|
||||
ThrottleTime time.Duration
|
||||
AclCreationResponses []*AclCreationResponse
|
||||
}
|
||||
|
||||
func (c *CreateAclsResponse) encode(pe packetEncoder) error {
|
||||
pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
|
||||
|
||||
if err := pe.putArrayLength(len(c.AclCreationResponses)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, aclCreationResponse := range c.AclCreationResponses {
|
||||
if err := aclCreationResponse.encode(pe); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *CreateAclsResponse) decode(pd packetDecoder, version int16) (err error) {
|
||||
throttleTime, err := pd.getInt32()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
|
||||
|
||||
n, err := pd.getArrayLength()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.AclCreationResponses = make([]*AclCreationResponse, n)
|
||||
for i := 0; i < n; i++ {
|
||||
c.AclCreationResponses[i] = new(AclCreationResponse)
|
||||
if err := c.AclCreationResponses[i].decode(pd, version); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *CreateAclsResponse) key() int16 {
|
||||
return 30
|
||||
}
|
||||
|
||||
func (c *CreateAclsResponse) version() int16 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c *CreateAclsResponse) headerVersion() int16 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
|
||||
return V0_11_0_0
|
||||
}
|
||||
|
||||
//AclCreationResponse is an acl creation response type
|
||||
type AclCreationResponse struct {
|
||||
Err KError
|
||||
ErrMsg *string
|
||||
}
|
||||
|
||||
func (a *AclCreationResponse) encode(pe packetEncoder) error {
|
||||
pe.putInt16(int16(a.Err))
|
||||
|
||||
if err := pe.putNullableString(a.ErrMsg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *AclCreationResponse) decode(pd packetDecoder, version int16) (err error) {
|
||||
kerr, err := pd.getInt16()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
a.Err = KError(kerr)
|
||||
|
||||
if a.ErrMsg, err = pd.getNullableString(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
package sarama
|
||||
|
||||
//DeleteAclsRequest is a delete acl request
|
||||
type DeleteAclsRequest struct {
|
||||
Version int
|
||||
Filters []*AclFilter
|
||||
}
|
||||
|
||||
func (d *DeleteAclsRequest) encode(pe packetEncoder) error {
|
||||
if err := pe.putArrayLength(len(d.Filters)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, filter := range d.Filters {
|
||||
filter.Version = d.Version
|
||||
if err := filter.encode(pe); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DeleteAclsRequest) decode(pd packetDecoder, version int16) (err error) {
|
||||
d.Version = int(version)
|
||||
n, err := pd.getArrayLength()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d.Filters = make([]*AclFilter, n)
|
||||
for i := 0; i < n; i++ {
|
||||
d.Filters[i] = new(AclFilter)
|
||||
d.Filters[i].Version = int(version)
|
||||
if err := d.Filters[i].decode(pd, version); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DeleteAclsRequest) key() int16 {
|
||||
return 31
|
||||
}
|
||||
|
||||
func (d *DeleteAclsRequest) version() int16 {
|
||||
return int16(d.Version)
|
||||
}
|
||||
|
||||
func (c *DeleteAclsRequest) headerVersion() int16 {
|
||||
return 1
|
||||
}
|
||||
|
||||
func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
|
||||
switch d.Version {
|
||||
case 1:
|
||||
return V2_0_0_0
|
||||
default:
|
||||
return V0_11_0_0
|
||||
}
|
||||
}
|
@ -0,0 +1,163 @@
|
||||
package sarama
|
||||
|
||||
import "time"
|
||||
|
||||
//DeleteAclsResponse is a delete acl response
|
||||
type DeleteAclsResponse struct {
|
||||
Version int16
|
||||
ThrottleTime time.Duration
|
||||
FilterResponses []*FilterResponse
|
||||
}
|
||||
|
||||
func (d *DeleteAclsResponse) encode(pe packetEncoder) error {
|
||||
pe.putInt32(int32(d.ThrottleTime / time.Millisecond))
|
||||
|
||||
if err := pe.putArrayLength(len(d.FilterResponses)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, filterResponse := range d.FilterResponses {
|
||||
if err := filterResponse.encode(pe, d.Version); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DeleteAclsResponse) decode(pd packetDecoder, version int16) (err error) {
|
||||
throttleTime, err := pd.getInt32()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
|
||||
|
||||
n, err := pd.getArrayLength()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.FilterResponses = make([]*FilterResponse, n)
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
d.FilterResponses[i] = new(FilterResponse)
|
||||
if err := d.FilterResponses[i].decode(pd, version); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DeleteAclsResponse) key() int16 {
|
||||
return 31
|
||||
}
|
||||
|
||||
func (d *DeleteAclsResponse) version() int16 {
|
||||
return d.Version
|
||||
}
|
||||
|
||||
func (d *DeleteAclsResponse) headerVersion() int16 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
|
||||
return V0_11_0_0
|
||||
}
|
||||
|
||||
//FilterResponse is a filter response type
|
||||
type FilterResponse struct {
|
||||
Err KError
|
||||
ErrMsg *string
|
||||
MatchingAcls []*MatchingAcl
|
||||
}
|
||||
|
||||
func (f *FilterResponse) encode(pe packetEncoder, version int16) error {
|
||||
pe.putInt16(int16(f.Err))
|
||||
if err := pe.putNullableString(f.ErrMsg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := pe.putArrayLength(len(f.MatchingAcls)); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, matchingAcl := range f.MatchingAcls {
|
||||
if err := matchingAcl.encode(pe, version); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FilterResponse) decode(pd packetDecoder, version int16) (err error) {
|
||||
kerr, err := pd.getInt16()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f.Err = KError(kerr)
|
||||
|
||||
if f.ErrMsg, err = pd.getNullableString(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n, err := pd.getArrayLength()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f.MatchingAcls = make([]*MatchingAcl, n)
|
||||
for i := 0; i < n; i++ {
|
||||
f.MatchingAcls[i] = new(MatchingAcl)
|
||||
if err := f.MatchingAcls[i].decode(pd, version); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//MatchingAcl is a matching acl type
|
||||
type MatchingAcl struct {
|
||||
Err KError
|
||||
ErrMsg *string
|
||||
Resource
|
||||
Acl
|
||||
}
|
||||
|
||||
func (m *MatchingAcl) encode(pe packetEncoder, version int16) error {
|
||||
pe.putInt16(int16(m.Err))
|
||||
if err := pe.putNullableString(m.ErrMsg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.Resource.encode(pe, version); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.Acl.encode(pe); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MatchingAcl) decode(pd packetDecoder, version int16) (err error) {
|
||||
kerr, err := pd.getInt16()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.Err = KError(kerr)
|
||||
|
||||
if m.ErrMsg, err = pd.getNullableString(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.Resource.decode(pd, version); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.Acl.decode(pd, version); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
package sarama
|
||||
|
||||
//DescribeAclsRequest is a secribe acl request type
|
||||
type DescribeAclsRequest struct {
|
||||
Version int
|
||||
AclFilter
|
||||
}
|
||||
|
||||
func (d *DescribeAclsRequest) encode(pe packetEncoder) error {
|
||||
d.AclFilter.Version = d.Version
|
||||
return d.AclFilter.encode(pe)
|
||||
}
|
||||
|
||||
func (d *DescribeAclsRequest) decode(pd packetDecoder, version int16) (err error) {
|
||||
d.Version = int(version)
|
||||
d.AclFilter.Version = int(version)
|
||||
return d.AclFilter.decode(pd, version)
|
||||
}
|
||||
|
||||
func (d *DescribeAclsRequest) key() int16 {
|
||||
return 29
|
||||
}
|
||||
|
||||
func (d *DescribeAclsRequest) version() int16 {
|
||||
return int16(d.Version)
|
||||
}
|
||||
|
||||
func (d *DescribeAclsRequest) headerVersion() int16 {
|
||||
return 1
|
||||
}
|
||||
|
||||
func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
|
||||
switch d.Version {
|
||||
case 1:
|
||||
return V2_0_0_0
|
||||
default:
|
||||
return V0_11_0_0
|
||||
}
|
||||
}
|
@ -0,0 +1,91 @@
|
||||
package sarama
|
||||
|
||||
import "time"
|
||||
|
||||
//DescribeAclsResponse is a describe acl response type
|
||||
type DescribeAclsResponse struct {
|
||||
Version int16
|
||||
ThrottleTime time.Duration
|
||||
Err KError
|
||||
ErrMsg *string
|
||||
ResourceAcls []*ResourceAcls
|
||||
}
|
||||
|
||||
func (d *DescribeAclsResponse) encode(pe packetEncoder) error {
|
||||
pe.putInt32(int32(d.ThrottleTime / time.Millisecond))
|
||||
pe.putInt16(int16(d.Err))
|
||||
|
||||
if err := pe.putNullableString(d.ErrMsg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := pe.putArrayLength(len(d.ResourceAcls)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, resourceAcl := range d.ResourceAcls {
|
||||
if err := resourceAcl.encode(pe, d.Version); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DescribeAclsResponse) decode(pd packetDecoder, version int16) (err error) {
|
||||
throttleTime, err := pd.getInt32()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
|
||||
|
||||
kerr, err := pd.getInt16()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.Err = KError(kerr)
|
||||
|
||||
errmsg, err := pd.getString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if errmsg != "" {
|
||||
d.ErrMsg = &errmsg
|
||||
}
|
||||
|
||||
n, err := pd.getArrayLength()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.ResourceAcls = make([]*ResourceAcls, n)
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
d.ResourceAcls[i] = new(ResourceAcls)
|
||||
if err := d.ResourceAcls[i].decode(pd, version); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DescribeAclsResponse) key() int16 {
|
||||
return 29
|
||||
}
|
||||
|
||||
func (d *DescribeAclsResponse) version() int16 {
|
||||
return d.Version
|
||||
}
|
||||
|
||||
func (d *DescribeAclsResponse) headerVersion() int16 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
|
||||
switch d.Version {
|
||||
case 1:
|
||||
return V2_0_0_0
|
||||
default:
|
||||
return V0_11_0_0
|
||||
}
|
||||
}
|
@ -0,0 +1,78 @@
|
||||
package sarama
|
||||
|
||||
type AclFilter struct {
|
||||
Version int
|
||||
ResourceType AclResourceType
|
||||
ResourceName *string
|
||||
ResourcePatternTypeFilter AclResourcePatternType
|
||||
Principal *string
|
||||
Host *string
|
||||
Operation AclOperation
|
||||
PermissionType AclPermissionType
|
||||
}
|
||||
|
||||
func (a *AclFilter) encode(pe packetEncoder) error {
|
||||
pe.putInt8(int8(a.ResourceType))
|
||||
if err := pe.putNullableString(a.ResourceName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if a.Version == 1 {
|
||||
pe.putInt8(int8(a.ResourcePatternTypeFilter))
|
||||
}
|
||||
|
||||
if err := pe.putNullableString(a.Principal); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := pe.putNullableString(a.Host); err != nil {
|
||||
return err
|
||||
}
|
||||
pe.putInt8(int8(a.Operation))
|
||||
pe.putInt8(int8(a.PermissionType))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *AclFilter) decode(pd packetDecoder, version int16) (err error) {
|
||||
resourceType, err := pd.getInt8()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
a.ResourceType = AclResourceType(resourceType)
|
||||
|
||||
if a.ResourceName, err = pd.getNullableString(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if a.Version == 1 {
|
||||
pattern, err := pd.getInt8()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
a.ResourcePatternTypeFilter = AclResourcePatternType(pattern)
|
||||
}
|
||||
|
||||
if a.Principal, err = pd.getNullableString(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if a.Host, err = pd.getNullableString(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
operation, err := pd.getInt8()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
a.Operation = AclOperation(operation)
|
||||
|
||||
permissionType, err := pd.getInt8()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
a.PermissionType = AclPermissionType(permissionType)
|
||||
|
||||
return nil
|
||||
}
|
@ -0,0 +1,55 @@
|
||||
package sarama
|
||||
|
||||
type (
|
||||
AclOperation int
|
||||
|
||||
AclPermissionType int
|
||||
|
||||
AclResourceType int
|
||||
|
||||
AclResourcePatternType int
|
||||
)
|
||||
|
||||
// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
|
||||
const (
|
||||
AclOperationUnknown AclOperation = iota
|
||||
AclOperationAny
|
||||
AclOperationAll
|
||||
AclOperationRead
|
||||
AclOperationWrite
|
||||
AclOperationCreate
|
||||
AclOperationDelete
|
||||
AclOperationAlter
|
||||
AclOperationDescribe
|
||||
AclOperationClusterAction
|
||||
AclOperationDescribeConfigs
|
||||
AclOperationAlterConfigs
|
||||
AclOperationIdempotentWrite
|
||||
)
|
||||
|
||||
// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
|
||||
const (
|
||||
AclPermissionUnknown AclPermissionType = iota
|
||||
AclPermissionAny
|
||||
AclPermissionDeny
|
||||
AclPermissionAllow
|
||||
)
|
||||
|
||||
// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
|
||||
const (
|
||||
AclResourceUnknown AclResourceType = iota
|
||||
AclResourceAny
|
||||
AclResourceTopic
|
||||
AclResourceGroup
|
||||
AclResourceCluster
|
||||
AclResourceTransactionalID
|
||||
)
|
||||
|
||||
// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java
|
||||
const (
|
||||
AclPatternUnknown AclResourcePatternType = iota
|
||||
AclPatternAny
|
||||
AclPatternMatch
|
||||
AclPatternLiteral
|
||||
AclPatternPrefixed
|
||||
)
|
@ -0,0 +1,57 @@
|
||||
package sarama
|
||||
|
||||
//AddOffsetsToTxnRequest adds offsets to a transaction request
|
||||
type AddOffsetsToTxnRequest struct {
|
||||
TransactionalID string
|
||||
ProducerID int64
|
||||
ProducerEpoch int16
|
||||
GroupID string
|
||||
}
|
||||
|
||||
func (a *AddOffsetsToTxnRequest) encode(pe packetEncoder) error {
|
||||
if err := pe.putString(a.TransactionalID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pe.putInt64(a.ProducerID)
|
||||
|
||||
pe.putInt16(a.ProducerEpoch)
|
||||
|
||||
if err := pe.putString(a.GroupID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *AddOffsetsToTxnRequest) decode(pd packetDecoder, version int16) (err error) {
|
||||
if a.TransactionalID, err = pd.getString(); err != nil {
|
||||
return err
|
||||
}
|
||||
if a.ProducerID, err = pd.getInt64(); err != nil {
|
||||
return err
|
||||
}
|
||||
if a.ProducerEpoch, err = pd.getInt16(); err != nil {
|
||||
return err
|
||||
}
|
||||
if a.GroupID, err = pd.getString(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *AddOffsetsToTxnRequest) key() int16 {
|
||||
return 25
|
||||
}
|
||||
|
||||
func (a *AddOffsetsToTxnRequest) version() int16 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (a *AddOffsetsToTxnRequest) headerVersion() int16 {
|
||||
return 1
|
||||
}
|
||||
|
||||
func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
|
||||
return V0_11_0_0
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue