Pushing K8s Cluster Logs to S3 Bucket & ES using Fluentd

Storing logs on Elastic search can be very costly, both, in terms of cost as well as in terms of time when you’re trying to retrieve them back. So, in order to save cost, we started sending our Kubernetes cluster logs to AWS S3 bucket, where we would store them till 6 months while keeping the log’s retention policy on Elastic search to only 1 month.

Apart from Devtron Helm Charts deployment feature, you can also use Helm charts to deploy fluent-bit (to collect logs from various pods/deployments on K8s nodes) and fluentd (as an aggregator and forwarder of the logs to s3 or Elastic Search)

Installing Fluent-bit Helm Chart for forwarding K8s Cluster Logs to Fleuntd

Let’s look at the stable/fluent-bit helm chart configurations, you can use it to directly forward the logs to Elastic Search or forward the logs to fluentd for further processing/enrichment.

Edit the fluent-bit-fd-values-2.8.11.yaml below to make the changes mentioned as below.

Forwarding Logs to Fluentd (Required for forwarding logs to S3): To forward Kubernetes cluster logs to fluentd for further enrichment and then forwarding the logs to Elastic search and/or S3 bucket, specify the in-cluster fluentd service as host in the forward section and set the type of the backend to “forward“

Forwarding logs to Elastic Search: To forwards Kubernetes cluster logs directly to Elastic Search, you can specify the in cluster elastic search client service name or a hosted elastic search endpoint in the es configurations and set the type of the backend to “es“.

Installing Fluent-bit using Helm

Once you’ve edited the fluent-bit values file according to your use case, use helm install command to install it.

If this is the first time you’re using Helm, please Install and configure Helm-Tiller in your cluster first.

helm install fluent-bit stable/fluent-bit --version 2.8.11 -f fluent-bit-fd-values-2.8.11.yaml

To Uninstall Fluent-bit

helm delete fluent-bit --purge 


    # Minikube stores its logs in a separate directory.
	# Enable if you install the chart in minikube.
	on_minikube: false

	    repository: fluent/fluent-bit
	    tag: 1.3.7
	  pullPolicy: Always

	  image: "dduportal/bats"
	  tag: "0.4.0"

	nameOverride: ""
	fullnameOverride: ""

	# When enabled, exposes json and prometheus metrics on {{ .Release.Name }}-metrics service
	  enabled: false
	    # labels:
	    #   key: value
	    annotations: {}
	    # In order for Prometheus to consume metrics automatically use the following annotations:
	    # prometheus.io/path: "/api/v1/metrics/prometheus"
	    # prometheus.io/port: "2020"
	    # prometheus.io/scrape: "true"
	    port: 2020
	    type: ClusterIP
	    enabled: false
	    additionalLabels: {}
	    # namespace: monitoring
	    # interval: 30s
	    # scrapeTimeout: 10s

	# When enabled, fluent-bit will keep track of tailing offsets across pod restarts.
	trackOffsets: false

	## PriorityClassName
	## Ref: https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/#priorityclass
	priorityClassName: ""

	  type: forward 
	#You can change type to es and specify es configs to forward logs directly to Elastic Search
	    host: fluentd
	    port: 24284
	    tls: "off"
	    tls_verify: "on"
	    tls_debug: 1
	    host: elasticsearch
	    port: 9200
	    # Elastic Index Name
	    index: kubernetes_cluster
	    type: flb_type
	    logstash_prefix: kubernetes_cluster
	    replace_dots: "On"
	    logstash_format: "On"
	    retry_limit: "False"
	    time_key: "@timestamp"
	    # Optional username credential for Elastic X-Pack access
	    # Password for user defined in HTTP_User
	    # Optional TLS encryption to ElasticSearch instance
	    tls: "off"
	    tls_verify: "on"
	    # TLS certificate for the Elastic (in PEM format). Use if tls=on and tls_verify=on.
	    tls_ca: ""
	    # TLS debugging levels = 1-4
	    tls_debug: 1
	    port: 8088
	    token: ""
	    send_raw: "on"
	    tls: "on"
	    tls_verify: "off"
	    tls_debug: 1
	    message_key: "kubernetes"
	  stackdriver: {}

	  ## Ref: https://fluentbit.io/documentation/current/output/http.html
	    port: 80
	    uri: "/"
	    tls: "off"
	    tls_verify: "on"
	    tls_debug: 1
	    ## Specify the data format to be used in the HTTP request body
	    ## Can be either 'msgpack' or 'json'
	    format: msgpack
	    # json_date_format: double or iso8601
	    headers: []

	  enabled: false
	  ## List the respective parsers in key: value format per entry
	  ## Regex required fields are name and regex. JSON and Logfmt required field
	  ## is name.
	  regex: []
	  logfmt: []
	  ##  json parser config can be defined by providing an extraEntries field.
	  ##  The following entry:
	  ## json:
	  ##   - extraEntries: |
	  ##       Decode_Field_As  escaped log do_next
	  ##       Decode_Field_As  json log
	  ##  translates into
	  ##   Command       |  Decoder  | Field | Optional Action   |
	  ##   ==============|===========|=======|===================|
	  ##   Decode_Field_As  escaped   log  do_next
	  ##   Decode_Field_As  json log
	  json: []

	env: []

	## Annotations to add to the DaemonSet's Pods
	podAnnotations: {}

	## By default there different 'files' provides in the config
	## (fluent-bit.conf, custom_parsers.conf). This defeats
	## changing a configmap (since it uses subPath). If this
	## variable is set, the user is assumed to have provided,
	## in 'existingConfigMap' the entire config (etc/*) of fluent-bit,
	## parsers and system config. In this case, no subPath is
	## used
	fullConfigMap: false

	## ConfigMap override where fullname is {{.Release.Name}}-{{.Values.existingConfigMap}}
	## Defining existingConfigMap will cause templates/config.yaml
	## to NOT generate a ConfigMap resource
	existingConfigMap: ""


	# NOTE If you want to add extra sections, add them here, inbetween the includes,
	# wherever they need to go. Sections order matters.

	rawConfig: |-
	  @INCLUDE fluent-bit-service.conf
	  @INCLUDE fluent-bit-input.conf
	  @INCLUDE fluent-bit-filter.conf
	  @INCLUDE fluent-bit-output.conf
	# WARNING!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
	# This is to add extra entries to an existing section, NOT for adding new sections
	# Do not submit bugs against indent being wrong. Add your new sections to rawConfig
	# instead.
	  input: |-
	#     # >=1 additional Key/Value entrie(s) for existing Input section
	  audit: |-
	#     # >=1 additional Key/Value entrie(s) for existing Input section
	  filter: |-
	#     # >=1 additional Key/Value entrie(s) for existing Filter section
	  output: |-
	#     # >=1 additional Key/Value entrie(s) for existing Ouput section
	# WARNING!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!


	## Extra ports to add to the daemonset ports section
	extraPorts: []

	## Extra volumes containing additional files required for fluent-bit to work
	## (eg. CA certificates)
	## Ref: https://kubernetes.io/docs/concepts/storage/volumes/
	extraVolumes: []

	## Extra volume mounts for the fluent-bit pod.
	## Ref: https://kubernetes.io/docs/tasks/configure-pod-container/configure-volume-storage/
	extraVolumeMounts: []

	resources: {}
	  # limits:
	  #   cpu: 100m
	  #   memory: 128Mi
	  # requests:
	  #   cpu: 10m
	  #   memory: 8Mi

	# When enabled, pods will bind to the node's network namespace.
	hostNetwork: false

	# Which DNS policy to use for the pod.
	# Consider switching to 'ClusterFirstWithHostNet' when 'hostNetwork' is enabled.
	dnsPolicy: ClusterFirst

	## Node tolerations for fluent-bit scheduling to nodes with taints
	## Ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
	tolerations: []
	# - key: "key"
	#  operator: "Equal|Exists"
	#  value: "value"
	#  effect: "NoSchedule|PreferNoSchedule|NoExecute(1.6 only)"

	## Node labels for fluent-bit pod assignment
	## Ref: https://kubernetes.io/docs/user-guide/node-selection/
	nodeSelector: {}
	affinity: {}

	  flush: 1
	  logLevel: info

	    memBufLimit: 5MB
	    parser: docker
	    path: /var/log/containers/*.log
	    ignore_older: ""
	    enabled: false
	        - docker.service
	        - kubelet.service
	        - node-problem-detector.service
	    maxEntries: 1000
	    readFromTail: true
	    stripUnderscores: false
	    tag: host.*

	  enable: false
	    memBufLimit: 35MB
	    parser: docker
	    tag: audit.*
	    path: /var/log/kube-apiserver-audit.log
	    bufferChunkSize: 2MB
	    bufferMaxSize: 10MB
	    skipLongLines: On
	    key: kubernetes-audit

	  kubeURL: https://kubernetes.default.svc:443
	  kubeCAFile: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
	  kubeTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token
	  kubeTag: kube
	  kubeTagPrefix: kube.var.log.containers.

	# If true, check to see if the log field content is a JSON string map, if so,
	# it append the map fields as part of the log structure.
	  mergeJSONLog: true

	# If set, all unpacked keys from mergeJSONLog (Merge_Log) will be packed under
	# the key name specified on mergeLogKey (Merge_Log_Key)
	  mergeLogKey: ""

	# If true, enable the use of monitoring for a pod annotation of
	# fluentbit.io/parser: parser_name. parser_name must be the name
	# of a parser contained within parsers.conf
	  enableParser: true

	# If true, enable the use of monitoring for a pod annotation of
	# fluentbit.io/exclude: true. If present, discard logs from that pod.
	  enableExclude: true

	# If true, the filter reads logs coming in Journald format.
	  useJournal: false

	  # Specifies whether RBAC resources should be created
	  create: true
	  # Specifies whether a PodSecurityPolicy should be created
	  pspEnabled: false

	  directory: /var/lib/fluent-bit

	  # Specifies whether a ServiceAccount should be created
	  create: true
	  # Annotations to add to the service account
	  annotations: {}
	  # The name of the ServiceAccount to use.
	  # If not set and create is true, a name is generated using the fullname template

	## Specifies security settings for a container
	## Ref: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-container
	securityContext: {}
	  # securityContext:
	  #   privileged: true

	## Specifies security settings for a pod
	## Ref: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-pod
	podSecurityContext: {}
	  # podSecurityContext:
	  #   runAsUser: 1000

Now that we have configured fluent-bit to collect logs from various pods/deployments in our Kubernetes cluster, we now need an aggregator that aggregates all the logs and writes/pushes them to the required place (files, RDBMS, NoSQL, IaaS, SaaS, Hadoop, elastic search, AWS S3).

Installing Fluentd Helm Chart for forwarding Logs to Elastic Search as well as S3

Let’s look at the stable/fluentd helm chart configurations, we will configure fluentd to send logs collected from fluentbit (or other data sources) to Elastic Search (for shorter retention) as well as to AWS S3 bucket (for longer retention/archive).

Edit the following blocks in the sample fluentd-es-s3-values-2.3.2.yaml file provided below.

Elastic Search Configurations Block

Set the Elastic search configurations in the Elastic Search configuration block. You can set the in-cluster elastic search client service name or a hosted elastic search endpoint as host. (when using in-cluster service, do append the namespace of the elastic search client service separated by a dot, for example – a service elasticsearch-client in a namespace logging will be written as elasticsearch-client.logging in the host)

  host: <elasticsearch-client>.<namespace>
  port: 9200
  scheme: http
  sslVersion: TLSv1
  buffer_chunk_limit: 2M
  buffer_queue_limit: 8

Fluentd Plugins Block

Enable the fluentd plugins and import fluent-plugin-s3 and fluent-plugin-rewrite-tag-filter

  enabled: true
    - fluent-plugin-s3
    - fluent-plugin-rewrite-tag-filter

S3 Bucket Configurations Block

Set the S3 configurations in the S3 configurations block. Set the s3_bucket, s3_region, path.

    <match **>
       @type s3
       s3_bucket <k8s-logs-bucket>
       s3_region <ap-southeast-1>
       s3_object_key_format "${tag}/%{time_slice}-events_%{index}.%{file_extension}"
       time_slice_format %Y/%m/%d/%H
       time_slice_wait 10m
       path cluster1-logs
       # if you want to use ${tag} or %Y/%m/%d/ like syntax in path / s3_object_key_format,
       # need to specify tag for ${tag} and time for %Y/%m/%d in <buffer> argument.
       <buffer tag,time>
         @type file
         flush_mode interval
         flush_interval 30s
         path /var/log/fluent/s3
         timekey 300 # 1 hour partition
         timekey_wait 1m
         timekey_use_utc true # use utc
         chunk_limit_size 100m
         @type json

Installing Fluentd using Helm

Once you’ve made the changes mentioned above, use the helm install command mentioned below to install the fluentd in your cluster.

helm install fluentd-es-s3 stable/fluentd --version 2.3.2 -f fluentd-es-s3-values.yaml

Uninstalling Fluentd

helm delete fluentd-es-s3 --purge 


    # Default values for fluentd.
	# This is a YAML-formatted file.
	# Declare variables to be passed into your templates.
	replicaCount: 3
	  repository: gcr.io/google-containers/fluentd-elasticsearch
	  tag: v2.4.0
	  pullPolicy: IfNotPresent
	  # pullSecrets:
	  #   - secret1
	  #   - secret2

	  host: <elasticsearch-client>.<namespace>
	  port: 9200
	  scheme: http
	  sslVersion: TLSv1
	  buffer_chunk_limit: 2M
	  buffer_queue_limit: 8

	env: {}

	# Extra Environment Values - allows yaml definitions
	#    valueFrom:
	#      secretKeyRef:
	#        name: secret_name
	#        key: secret_key

	# extraVolumes:
	#   - name: es-certs
	#     secret:
	#       defaultMode: 420
	#       secretName: es-certs
	# extraVolumeMounts:
	#   - name: es-certs
	#     mountPath: /certs
	#     readOnly: true

	  enabled: true
	    - fluent-plugin-s3
	    - fluent-plugin-rewrite-tag-filter

	  annotations: {}
	  type: ClusterIP
	  # loadBalancerIP:
	  # type: NodePort
	  # nodePort:
	  # Used to create Service records
	    - name: "monitor-agent"
	      protocol: TCP
	      containerPort: 24220
	    - name: "forward"  
	      protocol: TCP
	      containerPort: 24224

	  enabled: false
	    port: 24231
	    enabled: false
	    additionalLabels: {}
	    # namespace: monitoring
	    # interval: 30s
	    # scrapeTimeout: 10s

	annotations: {}
	#  prometheus.io/scrape: "true"
	#  prometheus.io/port: "24231"

	  enabled: false
	    kubernetes.io/ingress.class: nginx
	#    kubernetes.io/tls-acme: "true"
	#    # Depending on which version of ingress controller you may need to configure properly - https://kubernetes.github.io/ingress-nginx/examples/rewrite/#rewrite-target
	#    nginx.ingress.kubernetes.io/rewrite-target: /
	  labels: []
	  # If doing TCP or UDP ingress rule don't forget to update your Ingress Controller to accept TCP connections - https://kubernetes.github.io/ingress-nginx/user-guide/exposing-tcp-udp-services/
	#     - name: "http-input.local"
	#       protocol: TCP
	#       servicePort: 9880
	#       path: /
	  tls: {}
	  # Secrets must be manually created in the namespace.
	#    - secretName: http-input-tls
	#      hosts:
	#        - http-input.local

	  general.conf: |
	    # Prevent fluentd from handling records containing its own logs. Otherwise
	    # it can lead to an infinite loop, when error in sending one message generates
	    # another message which also fails to be sent and so on.
	    <match fluentd.**>
	      @type null
	    # Used for health checking
	      @type http
	      port 9880
	    # Emits internal metrics to every minute, and also exposes them on port
	    # 24220. Useful for determining if an output plugin is retryring/erroring,
	    # or determining the buffer queue length.
	      @type monitor_agent
	      port 24220
	      tag fluentd.monitor.metrics
	  system.conf: |-
	      root_dir /tmp/fluentd-buffers/
	  forward-input.conf: |
	      @type forward
	      port 24224
	  output.conf: |
	    <filter kube.**>
	       @type record_transformer
	         kubernetes_tag ${"%s" % [record["kubernetes"]["labels"]["app"] || record["kubernetes"]["labels"]["k8s-app"] || record["kubernetes"]["labels"]["name"] || "unspecified-app-label"]}
	    <match kube.**>
	       @type rewrite_tag_filter
	         key     kubernetes_tag
	         pattern ^(.+)$
	         tag     $1
	    <match **>
	       @type s3
	       s3_bucket <k8s-logs-bucket>
	       s3_region <ap-southeast-1>
	       s3_object_key_format "${tag}/%{time_slice}-events_%{index}.%{file_extension}"
	       time_slice_format %Y/%m/%d/%H
	       time_slice_wait 10m
	       path test-logs
	       # if you want to use ${tag} or %Y/%m/%d/ like syntax in path / s3_object_key_format,
	       # need to specify tag for ${tag} and time for %Y/%m/%d in <buffer> argument.
	       <buffer tag,time>
	         @type file
	         flush_mode interval
	         flush_interval 30s
	         path /var/log/fluent/s3
	         timekey 300 # 1 hour partition
	         timekey_wait 1m
	         timekey_use_utc true # use utc
	         chunk_limit_size 100m
	         @type json
	    cpu: 500m
	    memory: 512Mi
	    cpu: 500m
	    memory: 512Mi  
	  # We usually recommend not to specify default resources and to leave this as a conscious
	  # choice for the user. This also increases chances charts run on environments with little
	  # resources, such as Minikube. If you do want to specify resources, uncomment the following
	  # lines, adjust them as necessary, and remove the curly braces after 'resources:'.
	  # limits:
	  #  cpu: 500m
	  #  memory: 200Mi
	  # requests:
	  #  cpu: 500m
	  #  memory: 200Mi

	  # Specifies whether RBAC resources should be created
	  create: true

	  # Specifies whether a ServiceAccount should be created
	  create: true
	  # The name of the ServiceAccount to use.
	  # If not set and create is true, a name is generated using the fullname template

	## Persist data to a persistent volume
	  enabled: false

	  ## If defined, storageClassName: <storageClass>
	  ## If set to "-", storageClassName: "", which disables dynamic provisioning
	  ## If undefined (the default) or set to null, no storageClassName spec is
	  ##   set, choosing the default provisioner.  (gp2 on AWS, standard on
	  ##   GKE, AWS & OpenStack)
	  # storageClass: "-"
	  # annotations: {}
	  accessMode: ReadWriteOnce
	  size: 10Gi

	nodeSelector: {}

	tolerations: []

	affinity: {}

	# Enable autoscaling using HorizontalPodAutoscaler
	  enabled: false
	  minReplicas: 2
	  maxReplicas: 5
	    - type: Resource
	        name: cpu
	          type: Utilization
	          averageUtilization: 90
	    - type: Resource
	        name: memory
	          type: Utilization
	          averageUtilization: 80

	# Consider to set higher value when using in conjuction with autoscaling
	# Full description about this field: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.15/#pod-v1-core
	terminationGracePeriodSeconds: 30

