Source code for qiskit_aws_braket_provider.awsjob

# Copyright 2020 Carsten Blank
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from collections import Counter
from datetime import datetime
from typing import List, Optional, Dict

from braket.aws import AwsQuantumTask
from braket.tasks import GateModelQuantumTaskResult
from qiskit.providers import BaseJob, JobStatus
from qiskit.qobj import QasmQobj, QasmQobjExperiment, QasmQobjInstruction
from qiskit.result import Result
from qiskit.result.models import ExperimentResult, ExperimentResultData

from . import awsbackend

logger = logging.getLogger(__name__)


def _reverse_and_map(bit_string: str, mapping: Dict[int, int]):
    result_bit_string = len(mapping) * ['x']
    for i, c in enumerate(bit_string):
        if i in mapping:
            result_bit_string[mapping[i]] = c
    # qiskit is Little Endian, braket is Big Endian, so we don't do a re-reversed here
    result = "".join(reversed(result_bit_string))
    return result


[docs]def map_measurements(counts: Counter, qasm_experiment: QasmQobjExperiment) -> Dict[str, int]: # Need to get measure mapping instructions: List[QasmQobjInstruction] = [i for i in qasm_experiment.instructions if i.name == 'measure'] mapping = dict([(q, m) for i in instructions for q, m in zip(i.qubits, i.memory)]) mapped_counts = [(_reverse_and_map(k, mapping), v) for k, v in counts.items()] keys = set(k for k, _ in mapped_counts) new_map = [(key, sum([v for k, v in mapped_counts if k == key])) for key in keys] return dict(new_map)
[docs]class AWSJob(BaseJob): _extra_data: dict _s3_bucket: str _qobj: QasmQobj _job_id: str _tasks: List[AwsQuantumTask] _backend: 'awsbackend.AWSBackend' def __init__(self, job_id: str, qobj: QasmQobj, backend: 'awsbackend.AWSBackend', tasks: List[AwsQuantumTask], extra_data: Optional[dict] = None, s3_bucket: str = None) -> None: super().__init__(backend, job_id) self._tasks = tasks self._extra_data = extra_data self._date_of_creation = datetime.now() self._qobj = qobj self._job_id = job_id self._s3_bucket = s3_bucket @property def shots(self) -> int: return self._qobj.config.shots @property def extra_data(self) -> dict: return self._extra_data @property def date_of_creation(self) -> datetime: return self._date_of_creation @property def tasks(self) -> List[AwsQuantumTask]: return self._tasks
[docs] def submit(self): logger.warning("job.submit() is deprecated. Please use AWSBackend.run() to submit a job.", DeprecationWarning, stacklevel=2)
[docs] def result(self): experiment_results: List[ExperimentResult] = [] task: AwsQuantumTask qasm_experiment: QasmQobjExperiment for task, qasm_experiment in zip(self._tasks, self._qobj.experiments): result: GateModelQuantumTaskResult = task.result() counts: Dict[str, int] = map_measurements(result.measurement_counts, qasm_experiment) data = ExperimentResultData( counts=dict(counts) ) experiment_result = ExperimentResult( shots=self.shots, success=task.state() == 'COMPLETED', header=qasm_experiment.header, status=task.state(), data=data ) experiment_results.append(experiment_result) qiskit_result = Result( backend_name=self._backend.name(), backend_version=self._backend.version(), qobj_id=self._qobj.qobj_id, job_id=self._job_id, success=self.status(), results=experiment_results ) return qiskit_result
[docs] def cancel(self): for task in self._tasks: try: task.cancel() except Exception as ex: logger.error(f"While cancelling Job {self.job_id()}, could not cancel task {task.id}. Reason: {ex}")
[docs] def status(self): # FIXME: this is likely to change soon states = [t.state() for t in self._tasks] status: JobStatus = JobStatus.INITIALIZING if all([s == 'CREATED' for s in states]): status = JobStatus.INITIALIZING elif all([s == 'QUEUED' for s in states]): status = JobStatus.QUEUED elif any([s == 'RUNNING' for s in states]): status = JobStatus.RUNNING elif any([s == AwsQuantumTask.RESULTS_READY_STATES for s in states]) and any([s in ['QUEUED', 'CREATED', 'RUNNING'] for s in states]): status = JobStatus.RUNNING elif all([s == 'COMPLETED' for s in states]): status = JobStatus.DONE elif any([s == 'FAILED' for s in states]): status = JobStatus.ERROR elif any([s == 'CANCELLED' for s in states]): status = JobStatus.CANCELLED return status