AWS Deployment publish logs to central Log account with EFK.

Danushka Fernando
9 min readJan 31, 2020

Most of the time today people who deploy production systems in AWS use multiple accounts to host multiple environments such as development environment, staging environment, production environment and so on and so forth. AWS have came up with an solution finally to manage all of those accounts from a single place called AWS Control Tower. It made Dev op’s life easy by a huge margin. However there are missing pieces that can make this lot more easier. Don’t you like to look at logs of all of these accounts in a single place in an organized manner. If so today I am going to explain how to achieve it. And also I will provide cloud formation templates to deploy your solutions as well.

In this article I will be referring to a solution I developed in an EKS setup. But if you are using EFK then this solution will be applied to any of the use cases. Solution we use here is the one mentioned in the eks workshop[1]. Here fluentd runs in EKS setup and it publishes the logs to a cloud watch log group. And we can write a lambda to publish them to a Elastic Search domain. But here this solution talks about a single account. How do we deploy it across accounts.

Our goal here is to deploy EKS in one account and Cloud Watch, Lambda and Elastic Search in another.

First step is to setup fluentd to push logs to a second account and also we will need to publish it to a log group that we can uniquely identify. I’ll be using the same fluentd.yaml to deploy my fluentd in EKS. I’ll be changing following there. To make log group identifiable on the environment and EKS cluster I change the log group name as below.

log_group_name /<env name>/eks/<cluster name>/containers

Here fluentd uses the plugin[2] to push logs to cloud watch. This plugin is using AWS SDK. So to push the logs to second account what we have to do is just create IAM user and get access keys and set them as environmental variables in fluntd daemon. The documentation of plugin[2] describes a more complicated way of sending logs to second account but with EKS that doesn’t work because EKS pods are running assuming EKS node group role and it cannot assume a role from another account. and we have to use this. Here for simplicity I set them as envs. But we can set them as secrets actually. Here is the full fluentd.yaml. Checkout balded out sections.

apiVersion: v1
kind: ServiceAccount
metadata:
name: fluentd
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
name: fluentd
namespace: kube-system
rules:
- apiGroups: [""]
resources:
- namespaces
- pods
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: fluentd
namespace: kube-system
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: fluentd
subjects:
- kind: ServiceAccount
name: fluentd
namespace: kube-system
---
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-config
namespace: kube-system
labels:
k8s-app: fluentd-cloudwatch
data:
fluent.conf: |
@include containers.conf
@include systemd.conf
<match fluent.**>
@type null
</match>
containers.conf: |
<source>
@type tail
@id in_tail_container_logs
@label @containers
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
tag *
read_from_head true
<parse>
@type json
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
<label @containers>
<filter **>
@type kubernetes_metadata
@id filter_kube_metadata
</filter>
<filter **>
@type record_transformer
@id filter_containers_stream_transformer
<record>
stream_name ${tag_parts[3]}
</record>
</filter>
<match **>
@type cloudwatch_logs
@id out_cloudwatch_logs_containers
region {{ .Values.region }}
log_group_name /{{ .Values.environmentName }}/eks/{{ .Values.clusterName }}/containers
log_stream_name_key stream_name
remove_log_stream_name_key true
auto_create_stream true
<buffer>
flush_interval 5
chunk_limit_size 2m
queued_chunks_limit_size 32
retry_forever true
</buffer>
</match>
</label>
systemd.conf: |
<source>
@type systemd
@id in_systemd_kubelet
@label @systemd
filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]
<entry>
field_map {"MESSAGE": "message", "_HOSTNAME": "hostname", "_SYSTEMD_UNIT": "systemd_unit"}
field_map_strict true
</entry>
path /run/log/journal
pos_file /var/log/fluentd-journald-kubelet.pos
read_from_head true
tag kubelet.service
</source>
<source>
@type systemd
@id in_systemd_kubeproxy
@label @systemd
filters [{ "_SYSTEMD_UNIT": "kubeproxy.service" }]
<entry>
field_map {"MESSAGE": "message", "_HOSTNAME": "hostname", "_SYSTEMD_UNIT": "systemd_unit"}
field_map_strict true
</entry>
path /run/log/journal
pos_file /var/log/fluentd-journald-kubeproxy.pos
read_from_head true
tag kubeproxy.service
</source>
<source>
@type systemd
@id in_systemd_docker
@label @systemd
filters [{ "_SYSTEMD_UNIT": "docker.service" }]
<entry>
field_map {"MESSAGE": "message", "_HOSTNAME": "hostname", "_SYSTEMD_UNIT": "systemd_unit"}
field_map_strict true
</entry>
path /run/log/journal
pos_file /var/log/fluentd-journald-docker.pos
read_from_head true
tag docker.service
</source>
<label @systemd>
<filter **>
@type record_transformer
@id filter_systemd_stream_transformer
<record>
stream_name ${tag}-${record["hostname"]}
</record>
</filter>
<match **>
@type cloudwatch_logs
@id out_cloudwatch_logs_systemd
region {{ .Values.region }}
log_group_name /eks/{{ .Values.clusterName }}/systemd
log_stream_name_key stream_name
auto_create_stream true
remove_log_stream_name_key true
<buffer>
flush_interval 5
chunk_limit_size 2m
queued_chunks_limit_size 32
retry_forever true
</buffer>
</match>
</label>
---
apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
name: fluentd-cloudwatch
namespace: kube-system
labels:
k8s-app: fluentd-cloudwatch
spec:
template:
metadata:
labels:
k8s-app: fluentd-cloudwatch
spec:
serviceAccountName: fluentd
terminationGracePeriodSeconds: 30
# Because the image's entrypoint requires to write on /fluentd/etc but we mount configmap there which is read-only,
# this initContainers workaround or other is needed.
# See https://github.com/fluent/fluentd-kubernetes-daemonset/issues/90
initContainers:
- name: copy-fluentd-config
image: busybox
command: ['sh', '-c', 'cp /config-volume/..data/* /fluentd/etc']
volumeMounts:
- name: config-volume
mountPath: /config-volume
- name: fluentdconf
mountPath: /fluentd/etc
containers:
- name: fluentd-cloudwatch
image: fluent/fluentd-kubernetes-daemonset:{{ .Values.fluentdVersion }}
env:
- name: REGION
value: {{ .Values.region }}
- name: CLUSTER_NAME
value: {{ .Values.clusterName }}
- name: AWS_ACCESS_KEY_ID
value: {{ .Values.sharedLogUserKey }}
- name: AWS_SECRET_ACCESS_KEY
value: {{ .Values.sharedLogUserSecret }}

resources:
limits:
memory: 200Mi
requests:
cpu: 100m
memory: 200Mi
volumeMounts:
- name: config-volume
mountPath: /config-volume
- name: fluentdconf
mountPath: /fluentd/etc
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: runlogjournal
mountPath: /run/log/journal
readOnly: true
volumes:
- name: config-volume
configMap:
name: fluentd-config
- name: fluentdconf
emptyDir: {}
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: runlogjournal
hostPath:
path: /run/log/journal

Now your logs will be published to a central log account in to different logs groups which are grouped according to the environment. As the next step lets create a central Elastic Search Domain. In our solution there will be one elastic search domain. You can use the cloud formation template below.

'AWSTemplateFormatVersion': '2010-09-09'
'Description': 'Amazon Elastic Search Domain Creation'
'Parameters':
'EsDomainName':
'Type': 'String'
'Default': 'central-logs'
'Description': 'EsDomainName'
'EsInstanceCount':
'Type': 'Number'
'Default': !!int '2'
'Description': 'EsInstanceCount'
'EsAvailZoneCount':
'Type': 'Number'
'Default': !!int '2'
'Description': 'EsAvailZoneCount'
'EBSSize':
'Type': 'Number'
'Default': !!int '30'
'Description': 'EBSSize'
'Metadata':
'AWS::CloudFormation::Interface':
'ParameterGroups':
- 'Label':
'default': 'Configurations'
'Parameters':
- 'EsDomainName'
- 'EBSSize'
- 'EsInstanceCount'
- 'EsAvailZoneCount'
'Resources':
'ElasticsearchDomain':
'Type': 'AWS::Elasticsearch::Domain'
'Properties':
'DomainName':
'Ref': 'EsDomainName'
'ElasticsearchVersion': !!float '7.1'
'ElasticsearchClusterConfig':
'InstanceCount':
'Ref': 'EsInstanceCount'
'ZoneAwarenessEnabled': 'true'
'ZoneAwarenessConfig':
'AvailabilityZoneCount':
'Ref': 'EsAvailZoneCount'
'InstanceType': 'm4.large.elasticsearch'
'EBSOptions':
'EBSEnabled': !!bool 'true'
'VolumeSize':
'Ref': 'EBSSize'
'VolumeType': 'standard'
'SnapshotOptions':
'AutomatedSnapshotStartHour': '0'
'AccessPolicies':
'Version': '2012-10-17'
'Statement':
- 'Effect': 'Allow'
'Principal':
'AWS': '*'
'Action': 'es:*'
'Resource': '*'
'Rules': {}

Now we need the lambda function to transfer logs to the Elastic Search from Cloud Watch. There are two parts of Lambda creation. First the code of the lambda. And then the Cloudformation template to create lambda. Lets look at function code. I used node js here. You will need to fill elastic search domain name and also fill out the environment details in line 67 of the code according to the environment. You will need to have separate version of the code for each environment with that specified. After changing those zip it and upload it to a S3 location.

// v1.1.2 
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');

var endpoint = 'xxxxxxxxxxxxxxx.es.amazonaws.com';

// Set this to true if you want to debug why data isn't making it to
// your Elasticsearch cluster. This will enable logging of failed items
// to CloudWatch Logs.
var logFailedResponses = false;

exports.handler = function(input, context) {
// decode input from base64
var zippedInput = new Buffer.from(input.awslogs.data, 'base64');

// decompress the input
zlib.gunzip(zippedInput, function(error, buffer) {
if (error) { context.fail(error); return; }

// parse the input from JSON
var awslogsData = JSON.parse(buffer.toString('utf8'));

// transform the input to Elasticsearch documents
var elasticsearchBulkData = transform(awslogsData);

// skip control messages
if (!elasticsearchBulkData) {
console.log('Received a control message');
context.succeed('Control message handled successfully');
return;
}

// post documents to the Amazon Elasticsearch Service
post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
console.log('Response: ' + JSON.stringify({
"statusCode": statusCode
}));

if (error) {
logFailure(error, failedItems);
context.fail(JSON.stringify(error));
} else {
console.log('Success: ' + JSON.stringify(success));
context.succeed('Success');
}
});
});
};

function transform(payload) {
if (payload.messageType === 'CONTROL_MESSAGE') {
return null;
}

var bulkRequestBody = '';

payload.logEvents.forEach(function(logEvent) {
var timestamp = new Date(1 * logEvent.timestamp);

var jsonMessage = JSON.parse(logEvent.message);
var namespace = jsonMessage['kubernetes']['namespace_name'];
var serviceName = jsonMessage['kubernetes']['labels']['app'];

var indexName = [
'development_' + namespace + '_' + serviceName + '_logs_write'
].join('.');


var source = buildSource(logEvent.message, logEvent.extractedFields);
source['@id'] = logEvent.id;
source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
source['@message'] = logEvent.message;
source['@owner'] = payload.owner;
source['@log_group'] = payload.logGroup;
source['@log_stream'] = payload.logStream;

var action = { "index": {} };
action.index._index = indexName;
action.index._type = payload.logGroup;
action.index._id = logEvent.id;

bulkRequestBody += [
JSON.stringify(action),
JSON.stringify(source),
].join('\n') + '\n';
});
return bulkRequestBody;
}

function buildSource(message, extractedFields) {
if (extractedFields) {
var source = {};

for (var key in extractedFields) {
if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
var value = extractedFields[key];

if (isNumeric(value)) {
source[key] = 1 * value;
continue;
}

jsonSubString = extractJson(value);
if (jsonSubString !== null) {
source['$' + key] = JSON.parse(jsonSubString);
}

source[key] = value;
}
}
return source;
}

jsonSubString = extractJson(message);
if (jsonSubString !== null) {
return JSON.parse(jsonSubString);
}

return {};
}

function extractJson(message) {
var jsonStart = message.indexOf('{');
if (jsonStart < 0) return null;
var jsonSubString = message.substring(jsonStart);
return isValidJson(jsonSubString) ? jsonSubString : null;
}

function isValidJson(message) {
try {
JSON.parse(message);
} catch (e) { return false; }
return true;
}

function isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n);
}

function post(body, callback) {
var requestParams = buildRequest(endpoint, body);

var request = https.request(requestParams, function(response) {
var responseBody = '';
response.on('data', function(chunk) {
responseBody += chunk;
});

response.on('end', function() {
var info = JSON.parse(responseBody);
var failedItems;
var success;
var error;

if (response.statusCode >= 200 && response.statusCode < 299) {
failedItems = info.items.filter(function(x) {
return x.index.status >= 300;
});

success = {
"attemptedItems": info.items.length,
"successfulItems": info.items.length - failedItems.length,
"failedItems": failedItems.length
};
}

if (response.statusCode !== 200 || info.errors === true) {
// prevents logging of failed entries, but allows logging
// of other errors such as access restrictions
delete info.items;
error = {
statusCode: response.statusCode,
responseBody: info
};
}

callback(error, success, response.statusCode, failedItems);
});
}).on('error', function(e) {
callback(e);
});
request.end(requestParams.body);
}

function buildRequest(endpoint, body) {
var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
var region = endpointParts[2];
var service = endpointParts[3];
var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
var date = datetime.substr(0, 8);
var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
var kRegion = hmac(kDate, region);
var kService = hmac(kRegion, service);
var kSigning = hmac(kService, 'aws4_request');

var request = {
host: endpoint,
method: 'POST',
path: '/_bulk',
body: body,
headers: {
'Content-Type': 'application/json',
'Host': endpoint,
'Content-Length': Buffer.byteLength(body),
'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
'X-Amz-Date': datetime
}
};

var canonicalHeaders = Object.keys(request.headers)
.sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })
.map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
.join('\n');

var signedHeaders = Object.keys(request.headers)
.map(function(k) { return k.toLowerCase(); })
.sort()
.join(';');

var canonicalString = [
request.method,
request.path, '',
canonicalHeaders, '',
signedHeaders,
hash(request.body, 'hex'),
].join('\n');

var credentialString = [ date, region, service, 'aws4_request' ].join('/');

var stringToSign = [
'AWS4-HMAC-SHA256',
datetime,
credentialString,
hash(canonicalString, 'hex')
] .join('\n');

request.headers.Authorization = [
'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
'SignedHeaders=' + signedHeaders,
'Signature=' + hmac(kSigning, stringToSign, 'hex')
].join(', ');

return request;
}

function hmac(key, str, encoding) {
return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}

function hash(str, encoding) {
return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}

function logFailure(error, failedItems) {
if (logFailedResponses) {
console.log('Error: ' + JSON.stringify(error, null, 2));

if (failedItems && failedItems.length > 0) {
console.log("Failed Items: " +
JSON.stringify(failedItems, null, 2));
}
}
}

So next step is to create lambda with above code. You can use the Cloudformation template below.

'AWSTemplateFormatVersion': '2010-09-09'
'Description': 'Lambda for Centralized Logging'
'Parameters':
'EnvironmentName':
'Type': 'String'
'Default': 'development'
'Description': 'Name Of The Environment'
'ClusterName':
'Type': 'String'
'Default': 'development-eks'
'Description': 'Name Of The EKS Cluster'
'Region':
'Type': 'String'
'Default': 'us-east-1'
'Description': 'Name Of The Region'
'S3BucketName':
'Type': 'String'
'Default': 'sample-bucket'
'Description': 'Lambda Code zip file S3 bucket name'
'S3FileName':
'Type': 'String'
'Default': 'logstoes-development.zip'
'Description': 'Lambda Code zip filename in S3'
'Metadata':
'AWS::CloudFormation::Interface':
'ParameterGroups':
- 'Label':
'default': 'Configurations'
'Parameters':
- 'EnvironmentName'
- 'ClusterName'
- 'Region'
- 'S3BucketName'
- 'S3FileName'
'Resources':
'LambdaBasicExecution':
'Type': 'AWS::IAM::Role'
'Properties':
'AssumeRolePolicyDocument':
'Version': '2012-10-17'
'Statement':
- 'Effect': 'Allow'
'Principal':
'Service':
- 'lambda.amazonaws.com'
'Action':
- 'sts:AssumeRole'
'ManagedPolicyArns':
- 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
'Path': '/'
'LogsToElasticsearch':
'Type': 'AWS::Lambda::Function'
'Properties':
'FunctionName':
'Fn::Sub': 'LogsToElasticsearch-${EnvironmentName}-${ClusterName}'
'Runtime': 'nodejs10.x'
'Role':
'Fn::GetAtt':
- 'LambdaBasicExecution'
- 'Arn'
'Handler': 'index.handler'
'Code':
'S3Bucket':
'Fn::Sub': '${S3BucketName}'
'S3Key':
'Fn::Sub': '${S3FileName}'
'Description': 'Invoke a function during stack creation.'
'CloudWatchSubscriptionFunctionPermission':
'Type': 'AWS::Lambda::Permission'
'Properties':
'FunctionName':
'Fn::GetAtt':
- 'LogsToElasticsearch'
- 'Arn'
'Action': 'lambda:InvokeFunction'
'Principal':
'Fn::Sub': 'logs.${Region}.amazonaws.com'
'SourceArn':
'Fn::Sub': 'arn:aws:logs:us-east-1:453740448243:log-group:/${EnvironmentName}/eks/${ClusterName}/containers:*'
'SubscriptionFilter':
'Type': 'AWS::Logs::SubscriptionFilter'
'DependsOn': 'CloudWatchSubscriptionFunctionPermission'
'Properties':
'LogGroupName':
'Fn::Sub': '/${EnvironmentName}/eks/${ClusterName}/containers'
'FilterPattern': ''
'DestinationArn':
'Fn::GetAtt':
- 'LogsToElasticsearch'
- 'Arn'
'Rules': {}

Now go to kibana url and list down all indices and you will see that there are indices created separately for each environment’s each namespace.

Hope I saved someone’s day. Good luck!!!

References

[1] https://eksworkshop.com/intermediate/230_logging/

[2] https://github.com/fluent-plugins-nursery/fluent-plugin-cloudwatch-logs

--

--