Advertisement
aldikhan13

rabbitmq rcp new 2025

Jun 6th, 2025 (edited)
1,251
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 10.86 KB | None | 0 0
  1. package pkg
  2.  
  3. import (
  4.     "context"
  5.     "errors"
  6.     "runtime"
  7.     "sync"
  8.     "time"
  9.  
  10.     "github.com/lithammer/shortuuid"
  11.     amqp "github.com/wagslane/go-rabbitmq"
  12.  
  13.     sdk_cons "github.com/PT-UMKM-Pintar-Indonesia/shared-sdk/constants"
  14.     sdk_dto "github.com/PT-UMKM-Pintar-Indonesia/shared-sdk/dtos"
  15.     sdk_helper "github.com/PT-UMKM-Pintar-Indonesia/shared-sdk/helpers"
  16.     sdk_inf "github.com/PT-UMKM-Pintar-Indonesia/shared-sdk/interfaces"
  17. )
  18.  
  19. type rabbitmq struct {
  20.     ctx      context.Context
  21.     rabbitmq *amqp.Conn
  22. }
  23.  
  24. var (
  25.     rabbitmqOptions []sdk_dto.RabbitOptions = []sdk_dto.RabbitOptions{}
  26.     delivery        chan map[string][]byte  = make(chan map[string][]byte, 1000)
  27. )
  28.  
  29. func NewRabbitMQ(ctx context.Context, con *amqp.Conn) sdk_inf.IRabbitMQ {
  30.     return rabbitmq{ctx: ctx, rabbitmq: con}
  31. }
  32.  
  33. func (p rabbitmq) Publisher(req sdk_dto.Request[sdk_dto.RabbitOptions]) error {
  34.     if req.Option.ContentType == sdk_cons.EMPTY {
  35.         req.Option.ContentType = "application/json"
  36.     }
  37.  
  38.     if req.Option.AppID == sdk_cons.EMPTY {
  39.         req.Option.AppID = shortuuid.New()
  40.     }
  41.  
  42.     if time.Until(req.Option.Timestamp) < 1 {
  43.         req.Option.Timestamp = time.Now().Local()
  44.     }
  45.  
  46.     publisher, err := amqp.NewPublisher(p.rabbitmq,
  47.         amqp.WithPublisherOptionsExchangeName(req.Option.ExchangeName),
  48.         amqp.WithPublisherOptionsExchangeKind(req.Option.ExchangeType),
  49.         amqp.WithPublisherOptionsExchangeDeclare,
  50.         amqp.WithPublisherOptionsExchangeDurable,
  51.         amqp.WithPublisherOptionsExchangeNoWait,
  52.         amqp.WithPublisherOptionsExchangeArgs(req.Option.Args),
  53.         amqp.WithPublisherOptionsLogging,
  54.     )
  55.  
  56.     if err != nil {
  57.         publisher.Close()
  58.         return err
  59.     }
  60.  
  61.     bodyByte, err := sdk_helper.NewParser().Marshal(&req.Option.Body)
  62.     if err != nil {
  63.         return err
  64.     }
  65.  
  66.     publisher.NotifyPublish(func(r amqp.Confirmation) {
  67.         if !r.Confirmation.Ack {
  68.             Logrus(sdk_cons.ERROR, "Failed message delivery to: %s", req.Option.QueueName)
  69.             return
  70.         }
  71.  
  72.         Logrus(sdk_cons.INFO, "Success message delivery to: %s", req.Option.QueueName)
  73.     })
  74.  
  75.     err = publisher.PublishWithContext(p.ctx, bodyByte, []string{req.Option.QueueName},
  76.         amqp.WithPublishOptionsPersistentDelivery,
  77.         amqp.WithPublishOptionsExchange(req.Option.ExchangeName),
  78.         amqp.WithPublishOptionsContentType(req.Option.ContentType),
  79.         amqp.WithPublishOptionsTimestamp(req.Option.Timestamp),
  80.         amqp.WithPublishOptionsHeaders(req.Option.Args),
  81.     )
  82.     defer publisher.Close()
  83.  
  84.     if err != nil {
  85.         return err
  86.     }
  87.  
  88.     return nil
  89. }
  90.  
  91. func (p rabbitmq) Consumer(req sdk_dto.Request[sdk_dto.RabbitOptions], callback func(d amqp.Delivery) (action amqp.Action)) {
  92.     if req.Option.ConsumerID == sdk_cons.EMPTY {
  93.         req.Option.ConsumerID = shortuuid.New()
  94.     }
  95.  
  96.     if req.Option.Concurrency < 1 {
  97.         req.Option.Concurrency = runtime.NumCPU() / 2
  98.     }
  99.  
  100.     if req.Option.Prefetch < 1 {
  101.         req.Option.Prefetch = 5
  102.     }
  103.  
  104.     consumer, err := amqp.NewConsumer(p.rabbitmq, callback, req.Option.QueueName,
  105.         amqp.WithConsumerOptionsExchangeName(req.Option.ExchangeName),
  106.         amqp.WithConsumerOptionsExchangeKind(req.Option.ExchangeType),
  107.         amqp.WithConsumerOptionsBinding(amqp.Binding{
  108.             RoutingKey: req.Option.QueueName,
  109.             BindingOptions: amqp.BindingOptions{
  110.                 Declare: true,
  111.                 NoWait:  true,
  112.                 Args:    req.Option.Args,
  113.             },
  114.         }),
  115.         amqp.WithConsumerOptionsExchangeDurable,
  116.         amqp.WithConsumerOptionsExchangeDeclare,
  117.         amqp.WithConsumerOptionsQueueDurable,
  118.         amqp.WithConsumerOptionsConsumerName(req.Option.ConsumerID),
  119.         amqp.WithConsumerOptionsConsumerAutoAck(req.Option.Ack),
  120.         amqp.WithConsumerOptionsConcurrency(req.Option.Concurrency),
  121.         amqp.WithConsumerOptionsQOSPrefetch(req.Option.Prefetch),
  122.         amqp.WithConsumerOptionsQueueArgs(req.Option.Args),
  123.         amqp.WithConsumerOptionsLogging,
  124.     )
  125.  
  126.     if err != nil {
  127.         Logrus(sdk_cons.ERROR, err)
  128.         consumer.Close()
  129.         return
  130.     }
  131. }
  132.  
  133. func (p rabbitmq) listeningConsumerRPC(mutex *sync.RWMutex, req sdk_dto.RabbitOptions) (*amqp.Consumer, error) {
  134.     if req.ConsumerID == sdk_cons.EMPTY {
  135.         req.ConsumerID = shortuuid.New()
  136.     }
  137.  
  138.     if req.Concurrency < 1 {
  139.         req.Concurrency = runtime.NumCPU() / 2
  140.     }
  141.  
  142.     if req.Prefetch < 1 {
  143.         req.Prefetch = 5
  144.     }
  145.  
  146.     consumer, err := amqp.NewConsumer(p.rabbitmq, func(d amqp.Delivery) (action amqp.Action) {
  147.         if d.CorrelationId == sdk_cons.EMPTY {
  148.             return amqp.NackDiscard
  149.         }
  150.  
  151.         for _, opt := range rabbitmqOptions {
  152.             if opt.CorrelationID != d.CorrelationId {
  153.                 return amqp.NackDiscard
  154.             }
  155.         }
  156.  
  157.         mutex.Lock()
  158.         defer mutex.Unlock()
  159.  
  160.         delivery <- map[string][]byte{d.CorrelationId: d.Body}
  161.         return amqp.Ack
  162.  
  163.     }, req.ReplyTo,
  164.         amqp.WithConsumerOptionsExchangeName(req.ExchangeName),
  165.         amqp.WithConsumerOptionsExchangeKind(req.ExchangeType),
  166.         // amqp.WithConsumerOptionsExchangeDurable,
  167.         // amqp.WithConsumerOptionsExchangeDeclare,
  168.         // amqp.WithConsumerOptionsQueueDurable,
  169.         amqp.WithConsumerOptionsConsumerExclusive,
  170.         amqp.WithConsumerOptionsQueueAutoDelete,
  171.         amqp.WithConsumerOptionsConsumerName(req.ConsumerID),
  172.         amqp.WithConsumerOptionsConsumerAutoAck(req.Ack),
  173.         amqp.WithConsumerOptionsConcurrency(req.Concurrency),
  174.         amqp.WithConsumerOptionsQOSPrefetch(req.Prefetch),
  175.         amqp.WithConsumerOptionsQueueArgs(req.Args),
  176.         amqp.WithConsumerOptionsLogging,
  177.     )
  178.  
  179.     if err != nil {
  180.         consumer.Close()
  181.         return nil, err
  182.     }
  183.  
  184.     return consumer, nil
  185. }
  186.  
  187. func (p rabbitmq) PublisherRPC(req sdk_dto.Request[sdk_dto.RabbitOptions]) ([]byte, error) {
  188.     if req.Option.ContentType == sdk_cons.EMPTY {
  189.         req.Option.ContentType = "application/json"
  190.     }
  191.  
  192.     if req.Option.AppID == sdk_cons.EMPTY {
  193.         req.Option.AppID = shortuuid.New()
  194.     }
  195.  
  196.     if req.Option.CorrelationID == sdk_cons.EMPTY {
  197.         req.Option.CorrelationID = shortuuid.New()
  198.     }
  199.  
  200.     if req.Option.ReplyTo == sdk_cons.EMPTY {
  201.         req.Option.ReplyTo = req.Option.CorrelationID
  202.     }
  203.  
  204.     if time.Until(req.Option.Timestamp) < 1 {
  205.         req.Option.Timestamp = time.Now().Local()
  206.     }
  207.  
  208.     if req.Option.Expired == sdk_cons.EMPTY {
  209.         req.Option.Expired = "10"
  210.     }
  211.  
  212.     if len(rabbitmqOptions) > 0 {
  213.         rabbitmqOptions = []sdk_dto.RabbitOptions{}
  214.     }
  215.  
  216.     rabbitmqOptions = append(rabbitmqOptions, req.Option)
  217.     mutex := new(sync.RWMutex)
  218.  
  219.     consumer, err := p.listeningConsumerRPC(mutex, req.Option)
  220.     if err != nil {
  221.         return nil, err
  222.     }
  223.     defer consumer.Close()
  224.  
  225.     publisher, err := amqp.NewPublisher(p.rabbitmq,
  226.         amqp.WithPublisherOptionsExchangeName(req.Option.ExchangeName),
  227.         amqp.WithPublisherOptionsExchangeKind(req.Option.ExchangeType),
  228.         amqp.WithPublisherOptionsExchangeDeclare,
  229.         amqp.WithPublisherOptionsExchangeDurable,
  230.         amqp.WithPublisherOptionsExchangeNoWait,
  231.         amqp.WithPublisherOptionsExchangeArgs(req.Option.Args),
  232.         amqp.WithPublisherOptionsLogging,
  233.     )
  234.  
  235.     if err != nil {
  236.         publisher.Close()
  237.         return nil, err
  238.     }
  239.  
  240.     bodyByte, err := sdk_helper.NewParser().Marshal(&req.Option.Body)
  241.     if err != nil {
  242.         return nil, err
  243.     }
  244.  
  245.     publisher.NotifyPublish(func(r amqp.Confirmation) {
  246.         if !r.Confirmation.Ack {
  247.             Logrus(sdk_cons.ERROR, "Failed message delivery to: %s", req.Option.QueueName)
  248.             return
  249.         }
  250.  
  251.         Logrus(sdk_cons.INFO, "Success message delivery to: %s", req.Option.QueueName)
  252.     })
  253.  
  254.     err = publisher.PublishWithContext(p.ctx, bodyByte, []string{req.Option.QueueName},
  255.         amqp.WithPublishOptionsPersistentDelivery,
  256.         amqp.WithPublishOptionsExchange(req.Option.ExchangeName),
  257.         amqp.WithPublishOptionsCorrelationID(req.Option.CorrelationID),
  258.         amqp.WithPublishOptionsReplyTo(req.Option.ReplyTo),
  259.         amqp.WithPublishOptionsAppID(req.Option.AppID),
  260.         amqp.WithPublishOptionsUserID(req.Option.UserID),
  261.         amqp.WithPublishOptionsContentType(req.Option.ContentType),
  262.         amqp.WithPublishOptionsTimestamp(req.Option.Timestamp),
  263.         amqp.WithPublishOptionsExpiration(req.Option.Expired),
  264.         amqp.WithPublishOptionsHeaders(req.Option.Args),
  265.     )
  266.     defer publisher.Close()
  267.  
  268.     if err != nil {
  269.         return nil, err
  270.     }
  271.  
  272.     select {
  273.     case d := <-delivery:
  274.         return d[req.Option.CorrelationID], nil
  275.  
  276.     case <-time.After(time.Second * time.Duration(10)):
  277.         return nil, errors.New("Timeout waiting for RPC response")
  278.     }
  279. }
  280.  
  281. func (p rabbitmq) ConsumerRPC(req sdk_dto.Request[sdk_dto.RabbitOptions], handler func(delivery amqp.Delivery) (action amqp.Action)) {
  282.     if req.Option.ConsumerID == sdk_cons.EMPTY {
  283.         req.Option.ConsumerID = shortuuid.New()
  284.     }
  285.  
  286.     if req.Option.Concurrency < 1 {
  287.         req.Option.Concurrency = runtime.NumCPU() / 2
  288.     }
  289.  
  290.     if req.Option.Prefetch < 1 {
  291.         req.Option.Prefetch = 5
  292.     }
  293.  
  294.     consumer, err := amqp.NewConsumer(p.rabbitmq, handler, req.Option.QueueName,
  295.         amqp.WithConsumerOptionsExchangeName(req.Option.ExchangeName),
  296.         amqp.WithConsumerOptionsExchangeKind(req.Option.ExchangeType),
  297.         amqp.WithConsumerOptionsBinding(amqp.Binding{
  298.             RoutingKey: req.Option.QueueName,
  299.             BindingOptions: amqp.BindingOptions{
  300.                 Declare: true,
  301.                 NoWait:  true,
  302.                 Args:    req.Option.Args,
  303.             },
  304.         }),
  305.         amqp.WithConsumerOptionsExchangeDurable,
  306.         amqp.WithConsumerOptionsQueueDurable,
  307.         amqp.WithConsumerOptionsConsumerName(req.Option.ConsumerID),
  308.         amqp.WithConsumerOptionsConsumerAutoAck(req.Option.Ack),
  309.         amqp.WithConsumerOptionsConcurrency(req.Option.Concurrency),
  310.         amqp.WithConsumerOptionsQOSPrefetch(req.Option.Prefetch),
  311.         amqp.WithConsumerOptionsQueueArgs(req.Option.Args),
  312.         amqp.WithConsumerOptionsLogging,
  313.     )
  314.  
  315.     if err != nil {
  316.         Logrus(sdk_cons.ERROR, err)
  317.         consumer.Close()
  318.         return
  319.     }
  320. }
  321.  
  322. func (p rabbitmq) ReplyToDeliveryPublisher(req sdk_dto.Request[sdk_dto.RabbitOptions]) error {
  323.     if req.Option.Delivery.Body == nil {
  324.         return errors.New("Delivery body cannot be nil")
  325.     }
  326.  
  327.     publisher, err := amqp.NewPublisher(p.rabbitmq,
  328.         amqp.WithPublisherOptionsExchangeName(req.Option.ExchangeName),
  329.         amqp.WithPublisherOptionsExchangeKind(req.Option.ExchangeType),
  330.         amqp.WithPublisherOptionsExchangeDeclare,
  331.         amqp.WithPublisherOptionsExchangeDurable,
  332.         amqp.WithPublisherOptionsExchangeNoWait,
  333.         amqp.WithPublisherOptionsExchangeArgs(req.Option.Args),
  334.         amqp.WithPublisherOptionsLogging,
  335.     )
  336.  
  337.     if err != nil {
  338.         return err
  339.     }
  340.  
  341.     publisher.NotifyPublish(func(r amqp.Confirmation) {
  342.         if !r.Confirmation.Ack {
  343.             Logrus(sdk_cons.ERROR, "Failed message delivery to: %s", req.Option.Delivery.ReplyTo)
  344.             return
  345.         }
  346.  
  347.         Logrus(sdk_cons.INFO, "Success message delivery to: %s", req.Option.Delivery.ReplyTo)
  348.     })
  349.  
  350.     bodyByte, err := sdk_helper.NewParser().Marshal(&req.Option.Body)
  351.     if err != nil {
  352.         return err
  353.     }
  354.  
  355.     err = publisher.Publish(bodyByte, []string{req.Option.Delivery.ReplyTo},
  356.         amqp.WithPublishOptionsPersistentDelivery,
  357.         amqp.WithPublishOptionsCorrelationID(req.Option.Delivery.CorrelationId),
  358.         amqp.WithPublishOptionsAppID(req.Option.Delivery.AppId),
  359.         amqp.WithPublishOptionsUserID(req.Option.Delivery.UserId),
  360.         amqp.WithPublishOptionsContentType(req.Option.Delivery.ContentType),
  361.         amqp.WithPublishOptionsTimestamp(req.Option.Delivery.Timestamp),
  362.         amqp.WithPublishOptionsExpiration(req.Option.Expired),
  363.         amqp.WithPublishOptionsHeaders(req.Option.Args),
  364.     )
  365.     defer publisher.Close()
  366.  
  367.     if err != nil {
  368.         return err
  369.     }
  370.  
  371.     return nil
  372. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement